LinuxQuestions.org
Welcome to the most active Linux Forum on the web.
Home Forums Tutorials Articles Register
Go Back   LinuxQuestions.org > Forums > Non-*NIX Forums > Programming
User Name
Password
Programming This forum is for all programming questions.
The question does not have to be directly related to Linux and any language is fair game.

Notices


Reply
  Search this Thread
Old 03-16-2005, 07:03 AM   #1
Parahat Melayev
LQ Newbie
 
Registered: Mar 2005
Posts: 5

Rep: Reputation: 0
multi-threaded server, pthreads, sleep


I am trying to writa a multi-client & multi-threaded TCP server.
There is a thread pool. Each thread in the pool will handle
requests of multiple clients.

But here I have a problem. I find a solution but it is not how
it must be... i think. When threads working without sleep(1)
I can't get response from server but when I put sleep(1) in
thread function as you will see in code, everything works fine
and server can make echo nearly for 40000 clients between
1 or 2 seconds. What am I doing wrong here?

thanks,
parahat


------------ server.c -------------
$ cc server.c -o server -lpthread
-----------------------------------
Code:
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 300
#define MAX_THREAD 200
#define PORT 3355
#define MAX_CLIENT_BUFFER 256
/*#define DEBUG*/

int listenfd;

typedef struct {
	pthread_t tid;
	int client_count;
	int clients[MAX_CLIENT_PER_THREAD];
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}

void *thread_init_func(void *arg)
{
	int tid = (int) arg;
	
	int readsocks;
	int i;
	char buffer[MAX_CLIENT_BUFFER];
	char c;
	int n;
#ifdef DEBUG
	printf("thread %d created\n", tid);
	printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
	memset((int *) &threads[tid].clients, 0, sizeof(threads[tid].clients));
	memset((char *) &buffer, 0, sizeof(buffer));	
	while(1)
	{
#ifdef DEBUG
		printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
		sleep(3);
#endif
		sleep(1); /* <-- it works ??? :-| */

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[tid].clients[i] != 0)
			{
				n = recv(threads[tid].clients[i], buffer, MAX_CLIENT_BUFFER, 0);
				if(n == 0)
				{
#ifdef DEBUG
					printf("client %d closed connection 0\n", threads[tid].clients[i]);
#endif
					threads[tid].clients[i] = 0;
					threads[tid].client_count--;
					memset((char *) &buffer, 0, strlen(buffer));
				}
				else if(n < 0)
				{
					if(errno == EAGAIN)
					{
#ifdef DEBUG
						printf("errno: EAGAIN\n");
#endif
					}
					else {
#ifdef DEBUG
						printf("errno: %d\n", errno);
#endif
						threads[tid].clients[i] = 0;
						threads[tid].client_count--;
						memset( (char *) &buffer, 0, strlen(buffer));
#ifdef DEBUG
						printf("client %d closed connection -1\n", threads[tid].clients[i]);
#endif
					}
				}
				else {
#ifdef DEBUG
					printf("%d bytes received from %d - %s\n", n, threads[tid].clients[i], buffer);
#endif
					
					send(threads[tid].clients[i], buffer, strlen(buffer), 0);
					memset((char *) &buffer, 0, strlen(buffer));
				}
			}
		}
	}
}

int choose_thread()
{
	int i=MAX_THREAD-1;
	int min = 0;
	while(i > -1)
	{
		if(threads[i].client_count < threads[i-1].client_count)
		{
			min = i;
			break;
		}
		i--;
	}		
	return min;
}

int main()
{
	char c;
	struct sockaddr_in srv, cli;
	int clifd;
	int tid;
	int i;
	int choosen;

	signal(SIGPIPE, SIG_IGN);
	
	if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror("sockfd\n");
		exit(1);
	}
	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_addr.s_addr = INADDR_ANY;
	srv.sin_port = htons(PORT);

	if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		perror("bind\n");
		exit(1);
	}
	
	listen(listenfd, 1024);

	
	/* create threads  */
	for(i = 0; i < MAX_THREAD; i++)
	{
		pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i);
		threads[i].client_count = 0;
	}
	

	for( ; ; )
	{
		clifd = accept(listenfd, NULL, NULL);
		nonblock(clifd);

		pthread_mutex_lock(&new_connection_mutex);
		
		choosen = choose_thread();

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[choosen].clients[i] == 0)
			{
#ifdef DEBUG
				printf("before threads clifd\n");
#endif
				threads[choosen].clients[i] = clifd;
#ifdef DEBUG
				printf("after threads clifd\n");
#endif
				threads[choosen].client_count++;
				break;
			}
		}

#ifdef DEBUG
		printf("choosen: %d\n", choosen);

		for(i = 0; i < MAX_THREAD; i++)
		{
			printf("threads[%d].client_count:%d\n", i, threads[i].client_count);
		}
#endif

		pthread_mutex_unlock(&new_connection_mutex);
	}

	if(errno)
	{
		printf("errno: %d", errno);
	}
	
	return 0;
}

===========================================================

--------------- test.c --------------------
$ cc test.c -o test
-------------------------------------------
Code:
#include <stdio.h>
#include <netdb.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>


#define MAX_CLIENT 1000 
/*#define DEBUG*/

int PORT = 3355;
char *IP = "192.168.222.22";

enum {
	U_INT_SIZE = sizeof(unsigned int)
};

int list[MAX_CLIENT];
fd_set fdset;
int highfd = MAX_CLIENT;

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}


void cli_con()
{	
	struct sockaddr_in srv;
	struct hostent *h_name;
	int clifd;
	int n;

	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_port = htons(PORT);
	inet_pton(AF_INET, IP, &srv.sin_addr);	
	
	if( (clifd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		printf("ERROR clifd\n");
		exit(1);
	}

	if(connect(clifd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		printf("ERROR connect\n");
		exit(1);
	}

	nonblock(clifd);
	for(n = 0; (n < MAX_CLIENT) && (clifd != -1); n++)
	{
		if(list[n] == 0) {
#ifdef DEBUG
			printf("connected %d\n", clifd);
#endif
			list[n] = clifd;
			clifd = -1;
		}
	}
	
	if(clifd != -1) {
		printf("list FULL");
	}
}

void set_list() {
	int n;
	FD_ZERO(&fdset);

	for(n = 0; n < MAX_CLIENT; n++)
	{
		if(list[n] != 0) {
			FD_SET(list[n], &fdset);
			if(list[n] > highfd)
				highfd = list[n];
		}
	}
}

void recv_data(int num)
{
	int n;
	char buffer[1024];
	n = recv(list[num], buffer, 1023, 0);
	if(n > 0)
	{
#ifdef DEBUG
		printf("%d bytes from %d\n", n, list[num]);
		printf("%s", buffer);
#endif
	}
	else if(n < 0)
	{
		if(errno != EAGAIN)
			printf("client %d disconnected\n", list[num]);
	}
	else if(n == 0)
	{
		printf("client %d disconnected\n", list[num]);
	}
}


void send_data(int num)
{
	int n;
	char buffer[1024];
	sprintf(buffer, "TEST list[%d]=%d \r\n", num, list[num]);
	if((n = send(list[num], buffer, strlen(buffer), 0)) > 0)
	{
#ifdef DEBUG
		printf("%d bytes sent from %d\n", n, list[num]);
#endif
	}
}

void scan_clients()
{
#ifdef DEBUG
	printf("scan_clients\n");
#endif
	int num;
	
	for(num = 0; num < MAX_CLIENT; num++) {
		if(FD_ISSET(list[num], &fdset))
			recv_data(num);
	}

	for(num = 0; num < MAX_CLIENT; num++)
		send_data(num);
}

int main()
{
	int readsocks;
	int i=0;
	struct timeval tv;
	tv.tv_sec = 0;
	tv.tv_usec = 10;
	
	while(i < MAX_CLIENT)
	{
		cli_con();
		i++;	
	}
	i=0;
	while(i < MAX_CLIENT)
	{
		printf("list[%d] = %d\n",i,list[i]);
		i++;
	}
	i = 0;
	while(1)
	{
		set_list();
		readsocks = select(highfd+1, &fdset, NULL, NULL, &tv);
		if(readsocks < 0) {
			perror("select\n");
			exit(1);
		}
		 
		/*printf("else scan_clients\n");*/
		scan_clients();
#ifdef DEBUG
		sleep(4);
#endif
	}
	return 0;
}

Last edited by Parahat Melayev; 03-16-2005 at 11:59 AM.
 
Old 03-16-2005, 03:30 PM   #2
Mara
Moderator
 
Registered: Feb 2002
Location: Grenoble
Distribution: Debian
Posts: 9,696

Rep: Reputation: 232Reputation: 232Reputation: 232
From what I see you're running in active loop. Why not to use select() to be notified when client has actually something to process? Also, if I see it correctly, you use mutex only in the main() function, not in the threads, so it may be that the thread table is written at the same time by main() and thread function.
 
Old 03-16-2005, 05:38 PM   #3
Parahat Melayev
LQ Newbie
 
Registered: Mar 2005
Posts: 5

Original Poster
Rep: Reputation: 0
actually I started with select.
here is the code that using select but it is completely not working
when I connect multiple clients.

Code:
#include <pthread.h>
#include <stdio.h>
#include <sys/timeb.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#include <string.h>
#include <fcntl.h>
#include <signal.h>
#include <errno.h>

#define MAX_CLIENT_PER_THREAD 100
#define MAX_THREAD 100
#define PORT 3355
/*#define DEBUG*/
#define HIGHFD MAX_CLIENT_PER_THREAD * MAX_THREAD
int listenfd;
int which_thread = 0;


typedef struct {
	pthread_t tid;
	int tnumber;
	long client_count;
	int clients[MAX_CLIENT_PER_THREAD];
	fd_set clientset;
} Thread;

pthread_cond_t new_connection_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t new_connection_mutex = PTHREAD_MUTEX_INITIALIZER;


Thread threads[MAX_THREAD];

void nonblock(int sockfd)
{
	int opts;
	opts = fcntl(sockfd, F_GETFL);
	if(opts < 0)
	{
		perror("fcntl(F_GETFL)\n");
		exit(1);
	}
	opts = (opts | O_NONBLOCK);
	if(fcntl(sockfd, F_SETFL, opts) < 0) 
	{
		perror("fcntl(F_SETFL)\n");
		exit(1);
	}
}

void *thread_init_func(void *arg)
{
	int tid = (int) arg;
	int readsocks;
	int i;
	char buffer[1024];
	int n;
	fd_set readset;
	int clifd;

	struct timeval tv;
	tv.tv_sec = 0;
	tv.tv_usec = 1;
#ifdef DEBUG
	printf("thread %d created\n", tid);
	printf("sizeof thread.clients: %d\n", sizeof(threads[tid].clients));
#endif
	memset( (int *) &threads[tid].clients, 0, sizeof(threads[tid].clients));
	
	while(1)
	{
#ifdef DEBUG
		printf("thread %d running, client count: %d\n", tid, threads[tid].client_count);
		sleep(3);
#endif
		sleep(1);	
		readset = threads[tid].clientset;
#ifdef DEBUG
		printf("before select\n");
#endif
		readsocks = select(HIGHFD + 1, &readset, NULL, NULL, &tv);
#ifdef DEBUG
		printf("after select\n");
#endif

		
		for(i = 0; i < threads[tid].client_count; i++)
		{
			if( threads[tid].clients[i] == 0)
				continue;
			if(FD_ISSET(threads[tid].clients[i], &readset)) 
			{
				n = Readline(threads[tid].clients[i], buffer, sizeof(buffer));
				if(n == 0)
				{
#ifdef DEBUG
					printf("client %d closed connection 0\n", threads[tid].clients[i]);
#endif
					FD_CLR(threads[tid].clients[i], &threads[tid].clientset);
					threads[tid].clients[i] = 0;
					threads[tid].client_count--;
				}
				else if(n < 0)
				{
					if(errno == EAGAIN)
					{
#ifdef DEBUG
						printf("errno: EAGAIN\n");
#endif
					}
					else {
#ifdef DEBUG
						printf("errno: %d\n", errno);
#endif
						FD_CLR(threads[tid].clients[i], &threads[tid].clientset);
						threads[tid].clients[i] = 0;
						threads[tid].client_count--;
#ifdef DEBUG
						printf("client %d closed connection -1\n", threads[tid].clients[i]);
#endif
					}
				}
				else {
#ifdef DEBUG
					printf("\n %d bytes received from %d - %s\n", n,
						threads[tid].clients[i], buffer);
#endif
					send(threads[tid].clients[i], buffer, strlen(buffer), 0);
				}
				       
			}
		}
	}
}

int choose_thread()
{
	int i=MAX_THREAD-1;
	int min = 0;
	while(i > -1)
	{
		if(threads[i].client_count < threads[i-1].client_count)
		{
			min = i;
			break;
		}
		i--;
	}		
	return min;
}

int main()
{
	char c;
	struct sockaddr_in srv, cli;
	int clifd;
	int tid;
	int i;
	int choosen;
	
	if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
	{
		perror("sockfd\n");
		exit(1);
	}
	bzero(&srv, sizeof(srv));
	srv.sin_family = AF_INET;
	srv.sin_addr.s_addr = INADDR_ANY;
	srv.sin_port = htons(PORT);

	if( bind(listenfd, (struct sockaddr *) &srv, sizeof(srv)) < 0)
	{
		perror("bind\n");
		exit(1);
	}
	
	listen(listenfd, 1024);

	
	/* create threads  */
	for(i = 0; i < MAX_THREAD; i++)
	{
		pthread_create(&threads[i].tid, NULL, &thread_init_func, (void *) i);
		threads[i].client_count = 0;
	}
	

	for( ; ; )
	{
		clifd = accept(listenfd, NULL, NULL);
		nonblock(clifd);

		pthread_mutex_lock(&new_connection_mutex);
		
		choosen = choose_thread();;

		for(i = 0; i < MAX_CLIENT_PER_THREAD; i++)
		{
			if(threads[choosen].clients[i] == 0)
			{
				threads[choosen].clients[i] = clifd;
				threads[choosen].client_count++;
				FD_SET(threads[choosen].clients[i], &threads[choosen].clientset);
				break;
			}
		}

#ifdef DEBUG
		printf("choosen: %d\n", choosen);

		for(i = 0; i < MAX_THREAD; i++)
		{
			printf("threads[%d].client_count:%d\n", i, threads[i].client_count);
		}
#endif

		pthread_mutex_unlock(&new_connection_mutex);
	}
	
	return 0;
}
 
Old 03-18-2005, 05:11 PM   #4
Mara
Moderator
 
Registered: Feb 2002
Location: Grenoble
Distribution: Debian
Posts: 9,696

Rep: Reputation: 232Reputation: 232Reputation: 232
It doesn't work, because the set is not filled the right way. Use FD_SET macro to socket to set and then (after select() ), FD_ISSET to check if the socket is in the result (the sets you give as parameters change during select() !). If socket is in set after select, something interesting (new data received) happened with it.
 
Old 03-20-2005, 04:01 PM   #5
Parahat Melayev
LQ Newbie
 
Registered: Mar 2005
Posts: 5

Original Poster
Rep: Reputation: 0
It will be same with FD_SET inside thread. I mean initializing readset from threads[i].allset will do same thing. I thing using select inside thread is wrong idea. I don't know if select is thread-safe...
 
Old 01-17-2012, 01:57 AM   #6
ankit,garg
LQ Newbie
 
Registered: Jan 2012
Location: Noida,India
Posts: 20

Rep: Reputation: Disabled
Maximum number of threads.

Hi Prabhat,

First of all thanks for posting your problem as your program helped me to achieve one of work related to the same topic of this post.

When I execute your program I am seeing that server is creating only 303 threads where I am creating 50000 threads with one client on each thread.

I have also checked the thread limit of my system and it is far more than 303,Do you have any idea why is this happening?

Your help will be appreciated.

Thanks
Ankit
 
Old 01-17-2012, 06:20 AM   #7
dwhitney67
Senior Member
 
Registered: Jun 2006
Location: Maryland
Distribution: Kubuntu, Fedora, RHEL
Posts: 1,541

Rep: Reputation: 335Reputation: 335Reputation: 335Reputation: 335
Quote:
Originally Posted by ankit,garg View Post
Hi Prabhat,

First of all thanks for posting your problem as your program helped me to achieve one of work related to the same topic of this post.

When I execute your program I am seeing that server is creating only 303 threads where I am creating 50000 threads with one client on each thread.

I have also checked the thread limit of my system and it is far more than 303,Do you have any idea why is this happening?

Your help will be appreciated.

Thanks
Ankit
You might have more success seeking help if you opened a new thread, rather than re-opening a (nearly) 7-year old one.
 
  


Reply



Posting Rules
You may not post new threads
You may not post replies
You may not post attachments
You may not edit your posts

BB code is On
Smilies are On
[IMG] code is Off
HTML code is Off



Similar Threads
Thread Thread Starter Forum Replies Last Post
pthreads - multi-threaded server elitecodex Programming 1 08-20-2005 03:58 PM
how to do waitpid() using a multi-threaded server? Thinking Linux - Software 1 04-13-2005 09:50 PM
is there any way to sleep threads using pthreads.... gajaykrishnan Programming 2 03-26-2005 02:20 PM
Segementation fault on Multi threaded server on ia-64 Linux Latha Linux - Software 3 09-09-2004 06:40 PM
Multi-Threaded C pragti Programming 1 06-01-2004 10:50 AM

LinuxQuestions.org > Forums > Non-*NIX Forums > Programming

All times are GMT -5. The time now is 03:37 AM.

Main Menu
Advertisement
My LQ
Write for LQ
LinuxQuestions.org is looking for people interested in writing Editorials, Articles, Reviews, and more. If you'd like to contribute content, let us know.
Main Menu
Syndicate
RSS1  Latest Threads
RSS1  LQ News
Twitter: @linuxquestions
Open Source Consulting | Domain Registration