1. 程式人生 > >利用epoll和多程序解決高併發問題

利用epoll和多程序解決高併發問題

1、服務端程式碼,開啟8個工作程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/epoll.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <errno.h> 
#include <unistd.h>

#define WORKER_MAX        1024
#define EVENT_LIST_MAX    128
#define EVENT_MAX         12
#define WORK_REAL         8
#define SERVER_PORT       8080 

static int workers[WORKER_MAX];
static int icEpollFd = -1;
static int cur_pid;

typedef int (*PFCALLBACL)(struct epoll_event *);
typedef struct EPOLL_DATA_S
{
    int iEpoll_Fd;
    int iEvent_Fd;
    PFCALLBACL pfCallBack;
}Epoll_Data_S;

/* 互斥量 */
static pthread_mutex_t *mutex;

/* 建立共享的mutex */
static void initMutex(void)
{
    pthread_mutexattr_t attr;
    int ret;
    
    //設定互斥量為程序間共享
    mutex=(pthread_mutex_t*)mmap(NULL, sizeof(pthread_mutex_t), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANON, -1, 0);
    if( MAP_FAILED==mutex) {
        perror("mutex mmap failed");
        return;
    }
    
    //設定attr的屬性
    pthread_mutexattr_init(&attr);
    ret = pthread_mutexattr_setpshared(&attr,PTHREAD_PROCESS_SHARED);
    if(ret != 0) {
        fprintf(stderr, "mutex set shared failed");
        return;
    }
    pthread_mutex_init(mutex, &attr);

    return;
}

static int startup(unsigned short port)
{
    struct sockaddr_in servAddr;
    unsigned value = 1;
    int listenFd;

    memset(&servAddr, 0, sizeof(servAddr));
    
    //協議域(ip地址和埠)
    servAddr.sin_family = AF_INET;
    
    //繫結預設網絡卡
    servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    
    //埠
    servAddr.sin_port = htons(port);
    
    //建立套接字
    if ((listenFd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
        printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    
    setsockopt(listenFd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
    
    //繫結套接字
    if (bind(listenFd, (struct sockaddr *)&servAddr, sizeof(servAddr))) {
        printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    //開始監聽,設定最大連線請求
    if (listen(listenFd, 10) == -1) {
        printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
        return 0;
    }
    
    return listenFd;
}

static int create_workers(unsigned int worker_num)
{
    unsigned int i;
    unsigned int real_num = worker_num;
    int pid;

    if (real_num > WORKER_MAX)
    {
        real_num = WORKER_MAX;
    }

    for (i = 0; i < real_num; i++)
    {
        pid = fork();
        if (0 == pid)
        {
            return 0;
        }
        else if (0 < pid)
        {
            workers[i] = pid;
            continue;
        }
        else
        {
            printf("fork error\r\n");
            return 0;
        }
    }

    return 1;
}

/* 建立epoll */
static int create_epoll(unsigned int event_num)
{
    int epoll_fd;

    epoll_fd = epoll_create(event_num);
    if (-1 == epoll_fd)
    {
        printf("epoll create failed\r\n");
        return -1;
    }

    return epoll_fd;
}

/* 將事件加到epoll */
static void add_event_epoll(int iEpoll_Fd, int iEvent_Fd, PFCALLBACL pfCallBack)
{
    int                  op = EPOLL_CTL_ADD;
    struct epoll_event   ee;
    Epoll_Data_S *data;

    data = malloc(sizeof(Epoll_Data_S));
    if (NULL == data)
    {
        return;
    }

    /* 設定私有資料 */
    data->iEpoll_Fd = iEpoll_Fd;
    data->iEvent_Fd = iEvent_Fd;
    data->pfCallBack = pfCallBack;

    ee.events = EPOLLIN | EPOLLOUT | EPOLLHUP;
    //ee.events = EPOLLIN|EPOLLET;
    ee.data.ptr = (void *)data;

    if (epoll_ctl(iEpoll_Fd, op, iEvent_Fd, &ee) == -1) 
    {
        printf("epoll_ctl(%d, %d) failed", op, iEvent_Fd);
        return;
    }
    
    return;
}

/* 從epoll刪除事件 */
static void del_event_epoll(int iEpoll_Fd, int iEvent_Fd)
{
    int op = EPOLL_CTL_DEL;
    
    if (epoll_ctl(iEpoll_Fd, op, iEvent_Fd, NULL) == -1) 
    {
        printf("epoll_ctl(%d, %d) failed", op, iEvent_Fd);
    }

    return;
}

static int make_socket_non_blocking(int fd)
{
  int flags, s;

  flags = fcntl (fd, F_GETFL, 0);
  if (flags == -1)
    {
      perror ("fcntl");
      return -1;
    }

  flags |= O_NONBLOCK;
  s = fcntl (fd, F_SETFL, flags);
  if (s == -1)
    {
      perror ("fcntl");
      return -1;
    }

  return 0;
}

/* 處理Receive事件 */
static int proc_receive(struct epoll_event *pstEvent)
{
    char buff[4096];   /* 快取 */
    int len;
    Epoll_Data_S *data = (Epoll_Data_S *)(pstEvent->data.ptr);
    int iEpoll_Fd = data->iEpoll_Fd;
    int iEvent_Fd = data->iEvent_Fd;
    
    if (pstEvent->events & EPOLLIN)
    {
        while(1)
        {
		    /* 讀取資料 */
		    len = (int)recv(iEvent_Fd, buff, sizeof(buff), 0);  
		    //printf("proc_receive iEvent_Fd = %d pid = %d len = %d\r\n", iEvent_Fd, getpid(), len);
		    if (len <= 0) 
		    {
		        if (errno == EINTR)
		        {
                   continue;
	            }
	            
	            del_event_epoll(iEpoll_Fd, iEvent_Fd);
	            close(iEvent_Fd); 
	            free(data);
		    }
		    else if (len > 0)
		    {  
		        buff[len] = '\0';  
		        printf("pid %d receive data: %s\r\n", cur_pid, buff);
		        //usleep(10000);
		    }

		    break;
	    }
	}
	else if (pstEvent->events & EPOLLHUP)
	{
	    printf("receive EPOLLHUP or EPOLLOUT\r\n");
	    
	    del_event_epoll(iEpoll_Fd, iEvent_Fd);
	    close(iEvent_Fd);
	    free(data);
	}
	else
	{
	   // printf("receive others pstEvent->events=%d\r\n", pstEvent->events);
	}
    
    return 0;
}

/* 處理Accept事件 */
static int proc_accept(struct epoll_event *pstEvent)
{
    int newFd;
    Epoll_Data_S *data = (Epoll_Data_S *)(pstEvent->data.ptr);
    int iEpoll_Fd = data->iEpoll_Fd;
    int iEvent_Fd = data->iEvent_Fd;
    
    if (pthread_mutex_trylock(mutex)==0) 
    {
        while(-1 != (newFd = accept(iEvent_Fd, (struct sockaddr *)NULL, NULL)))
        {
	        make_socket_non_blocking(newFd);
	        //printf("accept pid = %d\r\n", getpid());
	        add_event_epoll(icEpollFd, newFd, proc_receive);
        }

        pthread_mutex_unlock(mutex);
    }

    return 0;
}

static void handleterm(int sig)
{
    int i;

    for (i = 0; i < WORK_REAL; i++)
    {
        /* 殺掉子程序 */
        kill(workers[i], SIGTERM);
    }
    
    return;
}

static void proc_epoll(int iEpollFd, int timeout)
{
    int iEventNum;
    int i;
    struct epoll_event events[EVENT_LIST_MAX];
    
    iEventNum = epoll_wait(iEpollFd, events, EVENT_LIST_MAX, timeout);
    for (i = 0; i < iEventNum; i++)
    {
	  	Epoll_Data_S *data = (Epoll_Data_S *)(events[i].data.ptr);
    	data->pfCallBack(&(events[i]));
    }

    return;
}

int main()
{
    int iServerFd = -1;
    int bParent;
    int iEpollFd = -1;

    /* 初始化互斥量 */
    initMutex();
    
    /* 初始化,建立監聽埠 */
    iServerFd = startup(SERVER_PORT);
    if (-1 == iServerFd)
    {
        return 0;
    }

    make_socket_non_blocking(iServerFd);

    /* 父程序建立epoll */
    iEpollFd = create_epoll(EVENT_MAX);
    if (-1 == iEpollFd)
    {
        close(iServerFd);
        return 0;
    }

    /* 將監聽埠加到epoll */
    add_event_epoll(iEpollFd, iServerFd, proc_accept);

    /* 建立子程序  */
    bParent = create_workers(WORK_REAL);

    /* 主程序 */
    if (bParent)
    {
        while(1)
        {
            signal(SIGTERM, handleterm);
            pause();
        }
    }
    else
    {
          /* 子程序建立epoll */
	      icEpollFd = create_epoll(EVENT_MAX);    
	      if (-1 == icEpollFd)
	      {
	         close(iServerFd);
	         return 0;
	      }

	      cur_pid = getpid();
 
    	  while (1)
    	  {
    	    /* 處理父epoll訊息 */
	        proc_epoll(iEpollFd, 50);

	        /* 處理子epoll訊息 */
	        proc_epoll(icEpollFd, 50);
        }
    }

    return 0;
}

2、客戶端程式碼,發起高併發連線,暫定連線數為2000,可以自己調
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <netdb.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include<arpa/inet.h>
#include <fcntl.h>
#include <errno.h>

const int MAXLINE = 5;
int count = 1;

static int make_socket_non_blocking(int fd)
{
  int flags, s;

  flags = fcntl (fd, F_GETFL, 0);
  if (flags == -1)
    {
      perror ("fcntl");
      return -1;
    }

  flags |= O_NONBLOCK;
  s = fcntl (fd, F_SETFL, flags);
  if (s == -1)
    {
      perror ("fcntl");
      return -1;
    }

  return 0;
}

void sockconn()
{
	int sockfd;
	struct sockaddr_in server_addr;
	struct hostent *host;
	char buf[100];
	unsigned int value = 1;

	host = gethostbyname("127.0.0.1");
	sockfd = socket(AF_INET, SOCK_STREAM, 0);
	if (sockfd == -1) {
		perror("socket error\r\n");
		return;
	}
	
	//setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
	
	//make_socket_non_blocking(sockfd);

	bzero(&server_addr, sizeof(server_addr));

	server_addr.sin_family = AF_INET;
	server_addr.sin_port = htons(8080);
	server_addr.sin_addr = *((struct in_addr*) host->h_addr);

	int cn = connect(sockfd, (struct sockaddr *) &server_addr,
			sizeof(server_addr));
	if (cn == -1) {
		printf("connect error errno=%d\r\n", errno);
		return;

	}
//	char *buf = "h";
	sprintf(buf, "%d", count);
	count++;
	write(sockfd, buf, strlen(buf));
	close(sockfd);
	
	printf("client send %s\r\n", buf);
	
	return;
}

int main(void) {
 
 int i;
 for (i = 0; i < 2000; i++)
 {
 	  sockconn();
 }
 
 return 0;
}

3、測試結果

1秒多時間內,處理了2000個連線

4、需要注意的問題連線太多時,open files too many問題
localhost:/share/code # ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 7900
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) 865300
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 7900
virtual memory          (kbytes, -v) 2491600
file locks                      (-x) unlimited
localhost:/share/code # 

解決方法:
localhost:/share/code # ulimit -n 100000
localhost:/share/code # ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 7900
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) 865300
open files                      (-n) 100000
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 7900
virtual memory          (kbytes, -v) 2491600
file locks                      (-x) unlimited
localhost:/share/code # 

2)客戶端connect錯誤

原因:處於TIME-WAIT狀態的socket太多,socket不夠用

解決:減少TIME-WAIT時間

localhost:/share/code # echo "1" > /proc/sys/net/ipv4/tcp_tw_recycle 
localhost:/share/code # echo "1" > /proc/sys/net/ipv4/tcp_tw_reuse 
localhost:/share/code # 

5、參考文章