1. 程式人生 > >epoll+執行緒池實現http檔案下載

epoll+執行緒池實現http檔案下載

server.c

#include<stdio.h>
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<fcntl.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<sys/epoll.h>
#include<sys/socket.h>
#include<arpa/inet.h>
#include"threadpool.h"
#include<dlfcn.h>
#include<time.h>
#define MAX_LEN 1024
#define MAX_SIZE 100

void *worker(void *arg){
	char buf[MAX_LEN];
	int sock = *(int *)arg;
	free(arg);
	int len = recv(sock,&buf,sizeof(buf),0);
	send(sock,&buf,sizeof(buf),0);
	close(sock);
}

int main(){
	int server_sockfd;
	int client_sockfd;
	struct sockaddr_in server_addr;
	struct sockaddr_in client_addr;
	int sin_size;
	int epoll_fd;
	struct epoll_event ev;
	struct epoll_event events[MAX_SIZE];
	int nfds;
	struct threadpool *pool;
	pool = threadpool_init(16,100);
	memset(&server_addr,0,sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	server_addr.sin_addr.s_addr = INADDR_ANY;
	server_addr.sin_port = htons(6665);
	if( (server_sockfd = socket(PF_INET,SOCK_STREAM,0)) < 0 ){
		perror("socket");
		exit(EXIT_FAILURE);
	}
	int on=1;
	setsockopt(server_sockfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on));
	if( bind(server_sockfd,(struct sockaddr *)&server_addr,sizeof(struct sockaddr)) < 0 ){
		perror("bind");
		exit(EXIT_FAILURE);
	}
	listen(server_sockfd,5);
	int old_option = fcntl(server_sockfd,F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(server_sockfd,F_SETFL,new_option);

	if( (epoll_fd = epoll_create(MAX_SIZE)) < 0 ){
		perror("create");
		exit(EXIT_FAILURE);
	}
	ev.events = EPOLLIN;
	ev.data.fd = server_sockfd;
	if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_sockfd,&ev) < 0 ){
		perror("ctl");
		exit(EXIT_FAILURE);
	}
	while(1){
		nfds = epoll_wait(epoll_fd,events,MAX_SIZE,-1);
		if( nfds < 0 ){
			perror("wait");
			exit(EXIT_FAILURE);
		}
		int i;
		for( i = 0 ; i < nfds ; ++i ){
			if( events[i].data.fd == server_sockfd ){
				sin_size = sizeof(struct sockaddr_in);
				if( (client_sockfd = accept(server_sockfd,(struct sockaddr *)&client_addr,&sin_size)) < 0 ){
					perror("accept");
					exit(EXIT_FAILURE);
				}
				fcntl(client_sockfd,F_SETFL,fcntl(client_sockfd,F_GETFD,0)|O_NONBLOCK);
				ev.events = EPOLLIN|EPOLLET;
				ev.data.fd = client_sockfd;
				if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,client_sockfd,&ev) < 0 ){
					perror("CTL");
					exit(EXIT_FAILURE);
				}
				printf("accpet!\n");
			}
			else{
				int *ptr = malloc(sizeof(int));
				*ptr = events[i].data.fd;
				threadpool_add_job(pool,worker,(void *)ptr);
			}
		}
	}
	threadpool_destroy(pool);
	close(server_sockfd);
	return 0;
}

threadpool.h
#ifndef threadpool_H_
#define threadpool_H_

#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>


struct job{
	void* (*callback_func)(void *arg);
	void *arg;
	struct job *next;
};

struct threadpool{
	int thread_num;
	int queue_max_num;
	int queue_cur_num;
	int queue_close;
	int pool_close;
	struct job *head;
	struct job *rear;
	pthread_t *pthreads;
	pthread_mutex_t mutex;
	pthread_cond_t queue_empty;
	pthread_cond_t queue_not_empty;
	pthread_cond_t queue_not_full;
};

void* threadpool_func(void *arg);

struct threadpool* threadpool_init(int thread_num,int queue_max_num);

int threadpool_add_job(struct threadpool *pool,void* (*callback_func)(void *arg),void *arg);

int threadpool_destroy(struct threadpool *pool);

#endif
threadpool.c
#include"threadpool.h"
#include<pthread.h>
#include<stdio.h>
#include<stdlib.h>
#include<assert.h>

void* threadpool_func(void* arg){
	struct threadpool *pool = (struct threadpool*)arg;
	struct job *pjob = NULL;
	while(1){
		pthread_mutex_lock(&(pool->mutex));
		while( (pool->queue_cur_num == 0) && !pool->pool_close ){
			pthread_cond_wait(&(pool->queue_not_empty),&(pool->mutex));
		}
		if( pool->pool_close ){
			pthread_mutex_unlock(&(pool->mutex));
			pthread_exit(NULL);
		}
		pool->queue_cur_num--;
		pjob = pool->head;
		if( pool->queue_cur_num == 0 ){
			pool->head = pool->rear = NULL;
			pthread_cond_signal(&(pool->queue_empty));
		}
		else{
			pool->head = pjob->next;
		}
		if( pool->queue_cur_num == pool->queue_max_num-1 ){
			pthread_cond_broadcast(&(pool->queue_not_full));
		}
		pthread_mutex_unlock(&(pool->mutex));
		(*(pjob->callback_func))(pjob->arg);
		free(pjob);
		pjob = NULL;
	}
}

struct threadpool* threadpool_init(int thread_num,int queue_max_num){
	struct threadpool *pool = NULL;
	pool = malloc(sizeof(struct threadpool));
	if( pool == NULL ){
		perror("malloc threadpool failed");
		return NULL;
	}
	pool->thread_num = thread_num;
	pool->queue_max_num = queue_max_num;
	pool->queue_cur_num = 0;
	pool->head = NULL;
	pool->rear = NULL;
	if( pthread_mutex_init(&(pool->mutex),NULL) ){
		perror("init mutex failed");
		return NULL;
	}
	if( pthread_cond_init(&(pool->queue_empty),NULL) ){
		perror("init queue_empty failed");
		return NULL;
	}
	if( pthread_cond_init(&(pool->queue_not_empty),NULL) ){
		perror("init queue_not_empty failed");
		return NULL;
	}
	if( pthread_cond_init(&(pool->queue_not_full),NULL) ){
		perror("init queue_not_full failed");
		return NULL;
	}
	pool->pthreads = malloc(sizeof(pthread_t)*thread_num);
	if( pool->pthreads == NULL ){
		perror("malloc pthreads failed");
		return NULL;
	}
	pool->queue_close = 0;
	pool->pool_close = 0;
	int i;
	for( i = 0 ; i < pool->thread_num ; ++i ){
		pthread_create(&(pool->pthreads[i]),NULL,threadpool_func,(void *)pool);
	}
	return pool;
}

int threadpool_add_job(struct threadpool* pool,void* (*callback_func)(void *arg),void *arg){
	assert(pool!=NULL);
	assert(callback_func!=NULL);
	assert(arg!=NULL);
	pthread_mutex_lock(&(pool->mutex));
	while( (pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close) ){
		pthread_cond_wait(&(pool->queue_not_full),&(pool->mutex));
	}
	if( pool->queue_close || pool->pool_close ){
		pthread_mutex_unlock(&(pool->mutex));
		return -1;
	}
	struct job *pjob = (struct job*)malloc(sizeof(struct job));
	if( pjob == NULL ){
		pthread_mutex_unlock(&(pool->mutex));
		return -1;
	}
	pjob->callback_func = callback_func;
	pjob->arg = arg;
	pjob->next = NULL;
	if( pool->head == NULL ){
		pool->head = pool->rear = pjob;
		pthread_cond_broadcast(&(pool->queue_not_empty));
	}
	else{
		pool->rear->next = pjob;
		pool->rear = pjob;
	}
	pool->queue_cur_num++;
	pthread_mutex_unlock(&(pool->mutex));
	return 0;
}

int threadpool_destroy(struct threadpool *pool){
	assert(pool!=NULL);
	pthread_mutex_lock(&(pool->mutex));
	if( pool->queue_close || pool->pool_close ){
		pthread_mutex_unlock(&(pool->mutex));
		return -1;
	}
	pool->queue_close = 1;
	while( pool->queue_cur_num != 0 ){
		pthread_cond_wait(&(pool->queue_empty),&(pool->mutex));
	}
	pool->pool_close = 1;
	pthread_mutex_unlock(&(pool->mutex));
	pthread_cond_broadcast(&(pool->queue_not_empty));
	pthread_cond_broadcast(&(pool->queue_not_full));
	int i;
	for( i = 0 ; i < pool->thread_num ; ++i ){
		pthread_join(pool->pthreads[i],NULL);
	}
	pthread_mutex_destroy(&(pool->mutex));
	pthread_cond_destroy(&(pool->queue_empty));
	pthread_cond_destroy(&(pool->queue_not_empty));
	pthread_cond_destroy(&(pool->queue_not_full));
	free(pool->pthreads);
	struct job *p;
	while( pool->rear != NULL ){
		p = pool->head;
		pool->head = p->next;
		free(p);
	}
	free(pool);
	return 0;
}