epoll+執行緒池實現http檔案下載
阿新 • • 發佈:2019-02-19
server.c
#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<fcntl.h> #include<sys/types.h> #include<netinet/in.h> #include<sys/epoll.h> #include<sys/socket.h> #include<arpa/inet.h> #include"threadpool.h" #include<dlfcn.h> #include<time.h> #define MAX_LEN 1024 #define MAX_SIZE 100 void *worker(void *arg){ char buf[MAX_LEN]; int sock = *(int *)arg; free(arg); int len = recv(sock,&buf,sizeof(buf),0); send(sock,&buf,sizeof(buf),0); close(sock); } int main(){ int server_sockfd; int client_sockfd; struct sockaddr_in server_addr; struct sockaddr_in client_addr; int sin_size; int epoll_fd; struct epoll_event ev; struct epoll_event events[MAX_SIZE]; int nfds; struct threadpool *pool; pool = threadpool_init(16,100); memset(&server_addr,0,sizeof(server_addr)); server_addr.sin_family = AF_INET; server_addr.sin_addr.s_addr = INADDR_ANY; server_addr.sin_port = htons(6665); if( (server_sockfd = socket(PF_INET,SOCK_STREAM,0)) < 0 ){ perror("socket"); exit(EXIT_FAILURE); } int on=1; setsockopt(server_sockfd,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)); if( bind(server_sockfd,(struct sockaddr *)&server_addr,sizeof(struct sockaddr)) < 0 ){ perror("bind"); exit(EXIT_FAILURE); } listen(server_sockfd,5); int old_option = fcntl(server_sockfd,F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(server_sockfd,F_SETFL,new_option); if( (epoll_fd = epoll_create(MAX_SIZE)) < 0 ){ perror("create"); exit(EXIT_FAILURE); } ev.events = EPOLLIN; ev.data.fd = server_sockfd; if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,server_sockfd,&ev) < 0 ){ perror("ctl"); exit(EXIT_FAILURE); } while(1){ nfds = epoll_wait(epoll_fd,events,MAX_SIZE,-1); if( nfds < 0 ){ perror("wait"); exit(EXIT_FAILURE); } int i; for( i = 0 ; i < nfds ; ++i ){ if( events[i].data.fd == server_sockfd ){ sin_size = sizeof(struct sockaddr_in); if( (client_sockfd = accept(server_sockfd,(struct sockaddr *)&client_addr,&sin_size)) < 0 ){ perror("accept"); exit(EXIT_FAILURE); } fcntl(client_sockfd,F_SETFL,fcntl(client_sockfd,F_GETFD,0)|O_NONBLOCK); ev.events = EPOLLIN|EPOLLET; ev.data.fd = client_sockfd; if( epoll_ctl(epoll_fd,EPOLL_CTL_ADD,client_sockfd,&ev) < 0 ){ perror("CTL"); exit(EXIT_FAILURE); } printf("accpet!\n"); } else{ int *ptr = malloc(sizeof(int)); *ptr = events[i].data.fd; threadpool_add_job(pool,worker,(void *)ptr); } } } threadpool_destroy(pool); close(server_sockfd); return 0; }
threadpool.h
threadpool.c#ifndef threadpool_H_ #define threadpool_H_ #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<assert.h> struct job{ void* (*callback_func)(void *arg); void *arg; struct job *next; }; struct threadpool{ int thread_num; int queue_max_num; int queue_cur_num; int queue_close; int pool_close; struct job *head; struct job *rear; pthread_t *pthreads; pthread_mutex_t mutex; pthread_cond_t queue_empty; pthread_cond_t queue_not_empty; pthread_cond_t queue_not_full; }; void* threadpool_func(void *arg); struct threadpool* threadpool_init(int thread_num,int queue_max_num); int threadpool_add_job(struct threadpool *pool,void* (*callback_func)(void *arg),void *arg); int threadpool_destroy(struct threadpool *pool); #endif
#include"threadpool.h" #include<pthread.h> #include<stdio.h> #include<stdlib.h> #include<assert.h> void* threadpool_func(void* arg){ struct threadpool *pool = (struct threadpool*)arg; struct job *pjob = NULL; while(1){ pthread_mutex_lock(&(pool->mutex)); while( (pool->queue_cur_num == 0) && !pool->pool_close ){ pthread_cond_wait(&(pool->queue_not_empty),&(pool->mutex)); } if( pool->pool_close ){ pthread_mutex_unlock(&(pool->mutex)); pthread_exit(NULL); } pool->queue_cur_num--; pjob = pool->head; if( pool->queue_cur_num == 0 ){ pool->head = pool->rear = NULL; pthread_cond_signal(&(pool->queue_empty)); } else{ pool->head = pjob->next; } if( pool->queue_cur_num == pool->queue_max_num-1 ){ pthread_cond_broadcast(&(pool->queue_not_full)); } pthread_mutex_unlock(&(pool->mutex)); (*(pjob->callback_func))(pjob->arg); free(pjob); pjob = NULL; } } struct threadpool* threadpool_init(int thread_num,int queue_max_num){ struct threadpool *pool = NULL; pool = malloc(sizeof(struct threadpool)); if( pool == NULL ){ perror("malloc threadpool failed"); return NULL; } pool->thread_num = thread_num; pool->queue_max_num = queue_max_num; pool->queue_cur_num = 0; pool->head = NULL; pool->rear = NULL; if( pthread_mutex_init(&(pool->mutex),NULL) ){ perror("init mutex failed"); return NULL; } if( pthread_cond_init(&(pool->queue_empty),NULL) ){ perror("init queue_empty failed"); return NULL; } if( pthread_cond_init(&(pool->queue_not_empty),NULL) ){ perror("init queue_not_empty failed"); return NULL; } if( pthread_cond_init(&(pool->queue_not_full),NULL) ){ perror("init queue_not_full failed"); return NULL; } pool->pthreads = malloc(sizeof(pthread_t)*thread_num); if( pool->pthreads == NULL ){ perror("malloc pthreads failed"); return NULL; } pool->queue_close = 0; pool->pool_close = 0; int i; for( i = 0 ; i < pool->thread_num ; ++i ){ pthread_create(&(pool->pthreads[i]),NULL,threadpool_func,(void *)pool); } return pool; } int threadpool_add_job(struct threadpool* pool,void* (*callback_func)(void *arg),void *arg){ assert(pool!=NULL); assert(callback_func!=NULL); assert(arg!=NULL); pthread_mutex_lock(&(pool->mutex)); while( (pool->queue_cur_num == pool->queue_max_num) && !(pool->queue_close || pool->pool_close) ){ pthread_cond_wait(&(pool->queue_not_full),&(pool->mutex)); } if( pool->queue_close || pool->pool_close ){ pthread_mutex_unlock(&(pool->mutex)); return -1; } struct job *pjob = (struct job*)malloc(sizeof(struct job)); if( pjob == NULL ){ pthread_mutex_unlock(&(pool->mutex)); return -1; } pjob->callback_func = callback_func; pjob->arg = arg; pjob->next = NULL; if( pool->head == NULL ){ pool->head = pool->rear = pjob; pthread_cond_broadcast(&(pool->queue_not_empty)); } else{ pool->rear->next = pjob; pool->rear = pjob; } pool->queue_cur_num++; pthread_mutex_unlock(&(pool->mutex)); return 0; } int threadpool_destroy(struct threadpool *pool){ assert(pool!=NULL); pthread_mutex_lock(&(pool->mutex)); if( pool->queue_close || pool->pool_close ){ pthread_mutex_unlock(&(pool->mutex)); return -1; } pool->queue_close = 1; while( pool->queue_cur_num != 0 ){ pthread_cond_wait(&(pool->queue_empty),&(pool->mutex)); } pool->pool_close = 1; pthread_mutex_unlock(&(pool->mutex)); pthread_cond_broadcast(&(pool->queue_not_empty)); pthread_cond_broadcast(&(pool->queue_not_full)); int i; for( i = 0 ; i < pool->thread_num ; ++i ){ pthread_join(pool->pthreads[i],NULL); } pthread_mutex_destroy(&(pool->mutex)); pthread_cond_destroy(&(pool->queue_empty)); pthread_cond_destroy(&(pool->queue_not_empty)); pthread_cond_destroy(&(pool->queue_not_full)); free(pool->pthreads); struct job *p; while( pool->rear != NULL ){ p = pool->head; pool->head = p->next; free(p); } free(pool); return 0; }