1. 程式人生 > >處理大併發之一 對epoll的理解,epoll客戶端服務端程式碼

處理大併發之一 對epoll的理解,epoll客戶端服務端程式碼

處理大併發之一

epoll的理解epoll客戶端服務端程式碼

序言:

該部落格是一系列的部落格,首先從最基礎的epoll說起,然後研究libevent原始碼及使用方法,最後研究nginx和node.js,關於select,poll這裡不做說明,只說明其相對於epoll的不足,其實select和poll我也沒用過,因為我選擇了epoll。

說起epoll,做過大併發的估計都不陌生,之前做了個STB的工具,用的就是epoll處理併發,測試1.5W的併發(非簡單的發訊息,包括多個服務端和客戶端的處理)是通過的,epoll的功能強大,讓我有一定的體會,在nginx和node.js中利用的就是libevent,而libevent使用的就是epoll,如果系統不支援epoll,會選擇最佳的方式(select,poll或kqueue中的一種)。

基礎資料:

epoll名詞解釋:

看下百科名片是怎麼解釋epoll的吧,(http://baike.baidu.com/view/1385104.htm)epoll是Linux核心為處理大批量控制代碼而作了改進的poll,是Linux下多路複用IO介面select/poll的增強版本,它能顯著減少程式在大量併發連線中只有少量活躍的情況下的系統CPU利用率。

關於具體的說明可以參考網上資料,我這裡羅列下epoll相對於其他多路複用機制(select,poll)的優點吧:

epoll優點:

1. 支援一個程序開啟大數目的socket描述符。

2. IO效率不隨FD數目增加而線性下降,傳統的select/poll每次呼叫都會線性掃描全部的集合,導致效率呈現線性下降。

3. 使用mmap加速核心與使用者空間的訊息傳遞。無論是select,poll還是epoll都需要核心把FD訊息通知給使用者空間,如何避免不必要的記憶體拷貝就很重要,在這點上,epoll是通過核心於使用者空間mmap同一塊記憶體實現的。

select和poll的缺點:

1. 每次呼叫時要重複地從使用者態讀入引數。

2. 每次呼叫時要重複地掃描檔案描述符。

3. 每次在呼叫開始時,要把當前程序放入各個檔案描述符的等待佇列。在呼叫結束後,又把程序從各個等待佇列中刪除。

epoll用的的資料結構和函式

資料結構 

typedef union epoll_data { 
                void *ptr; 
                int fd; 
                __uint32_t u32; 
                __uint64_t u64; 
        } epoll_data_t; 

        struct epoll_event { 
                __uint32_t events;      /* epoll events */ 
                epoll_data_t data;      /* user data variable */ 
        }; 

結構體epoll_event 被用於註冊所感興趣的事件和回傳所發生待處理的事件. 
其中epoll_data 聯合體用來儲存觸發事件的某個檔案描述符相關的資料. 
例如一個client連線到伺服器,伺服器通過呼叫accept函式可以得到於這個client對應的socket檔案描述符,可以把這檔案描述符賦給epoll_data的fd欄位以便後面的讀寫操作在這個檔案描述符上進行。epoll_event 結構體的events欄位是表示感興趣的事件和被觸發的事件可能的取值為: 
EPOLLIN :表示對應的檔案描述符可以讀; 
EPOLLOUT:表示對應的檔案描述符可以寫; 
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀 
EPOLLERR:表示對應的檔案描述符發生錯誤; 
EPOLLHUP:表示對應的檔案描述符被結束通話; 
EPOLLET:表示對應的檔案描述符有事件發生; 

ET和LT模式

LT(level triggered)是預設的工作方式,並且同時支援block和no-block socket.在這種做法中,核心告訴你一個檔案描述符是否就緒了,然後你可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,所以,這種模式程式設計出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表。

ET (edge-triggered)是高速工作方式,只支援no-block socket。在這種模式下,當描述符從未就緒變為就緒時,核心通過epoll告訴你。然後它會假設你知道檔案描述符已經就緒,並且不會再為那個檔案描述符傳送更多的就緒通知,直到你做了某些操作導致那個檔案描述符不再為就緒狀態了(比如,你在傳送,接收或者接收請求,或者傳送接收的資料少於一定量時導致了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),核心不會發送更多的通知(only once),不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。

ET和LT的區別在於LT事件不會丟棄,而是隻要讀buffer裡面有資料可以讓使用者讀,則不斷的通知你。而ET則只在事件發生之時通知。可以簡單理解為LT是水平觸發,而ET則為邊緣觸發。
ET模式僅當狀態發生變化的時候才獲得通知,這裡所謂的狀態的變化並不包括緩衝區中還有未處理的資料,也就是說,如果要採用ET模式,需要一直read/write直到出錯為止,很多人反映為什麼採用ET模式只接收了一部分資料就再也得不到通知了,大多因為這樣;LT模式是隻要有資料沒有處理就會一直通知下去的.

函式

  1. 1. int epoll_create(int size);  
  2. 2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  
  3. 3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);  

詳解:

1. int epoll_create(int size);

建立一個epoll的控制代碼,size用來告訴核心這個監聽的數目一共有多大。需要注意的是,當建立好epoll控制代碼後,它就是會佔用一個fd值,在linux下如果檢視/proc/程序id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須呼叫close()關閉,否則可能導致fd被耗盡。

2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

epoll的事件註冊函式,它不同與select()是在監聽事件時告訴核心要監聽什麼型別的事件,而是在這裡先註冊要監聽的事件型別。第一個引數是epoll_create()的返回值,第二個引數表示動作,用三個巨集來表示:EPOLL_CTL_ADD:註冊新的fdepfd中;EPOLL_CTL_MOD:修改已經註冊的fd的監聽事件;EPOLL_CTL_DEL:從epfd中刪除一個fd第三個引數是需要監聽的fd,第四個引數是告訴核心需要監聽什麼事

3. int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

等待事件的產生,引數events用來從核心得到事件的集合,maxevents告之核心這個events有多大,這個 maxevents的值不能大於建立epoll_create()時的size,引數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函式返回需要處理的事件數目,如返回0表示已超時。

資料總結這麼多,其實都是一些理論,關鍵還在於程式碼,下面附上我使用epoll開發的服務端和客戶端,該程式碼我會上傳到我的資源裡,可以免費下載。連結:

如果程式碼有bug,可以告訴我。

程式demo

首先對服務端和客戶端做下說明:

我想實現的是客戶端和服務端併發的程式,客戶端通過配置併發數,說明有多少個使用者去連線服務端。

客戶端會發送訊息:"Client: i send message Hello Server!”,其中i表示哪一個客戶端;收到訊息:"Recv Server Msg Content:%s\n"。

例如:

傳送:Client: 1 send message "Hello Server!"

接收:Recv Derver Msg Content:Hello, client fd: 6

服務端收到後給客戶端回覆訊息:"Hello, client fd: i",其中i表示服務端接收的fd,使用者區別是哪一個客戶端。接收客戶端訊息:"Terminal Received Msg Content:%s\n"

例如:

傳送:Hello, client fd: 6

接收:Terminal Received Msg Content:Client: 1 send message "Hello Server!"

備註:這裡在接收到訊息後,直接打印出訊息,如果需要對訊息進行處理(如果訊息處理比較佔用時間,不能立即返回,可以將該訊息放入一個佇列中,然後開啟一個執行緒從佇列中取訊息進行處理,這樣的話不會因為訊息處理而阻塞epoll)。libenent好像對這種有2中處理方式,一個就是回撥,要求回撥函式,不佔用太多的時間,基本能立即返回,另一種好像也是一個佇列實現的,這個還需要研究。

服務端程式碼說明:

服務端在繫結監聽後,開啟了一個執行緒,用於負責接收客戶端連線,加入到epoll中,這樣只要accept到客戶端的連線,就將其add EPOLLIN到epoll中,然後進入迴圈呼叫epoll_wait,監聽到讀事件,接收資料,並將事件修改為EPOLLOUT;反之監聽到寫事件,傳送資料,並將事件修改為EPOLLIN。

  1. cepollserver.h  
  2. #ifndef  C_EPOLL_SERVER_H
  3. #define  C_EPOLL_SERVER_H
  4. #include <sys/epoll.h>
  5. #include <sys/socket.h>
  6. #include <netinet/in.h>
  7. #include <fcntl.h>
  8. #include <arpa/inet.h>
  9. #include <stdio.h>
  10. #include <stdlib.h>
  11. #include <iostream>
  12. #include <pthread.h>
  13. #define _MAX_SOCKFD_COUNT 65535
  14. class CEpollServer  
  15. {  
  16.         public:  
  17.                 CEpollServer();  
  18.                 ~CEpollServer();  
  19.                 bool InitServer(constchar* chIp, int iPort);  
  20.                 void Listen();  
  21.                 staticvoid ListenThread( void* lpVoid );  
  22.                 void Run();  
  23.         private:  
  24.                 int        m_iEpollFd;  
  25.                 int        m_isock;  
  26.                 pthread_t       m_ListenThreadId;// 監聽執行緒控制代碼
  27. };  
  28. #endif

cepollserver.cpp

  1. #include "cepollserver.h"
  2. usingnamespace std;  
  3. CEpollServer::CEpollServer()  
  4. {  
  5. }  
  6. CEpollServer::~CEpollServer()  
  7. {  
  8.     close(m_isock);  
  9. }  
  10. bool CEpollServer::InitServer(constchar* pIp, int iPort)  
  11. {  
  12.     m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);  
  13.     //設定非阻塞模式
  14.     int opts = O_NONBLOCK;  
  15.     if(fcntl(m_iEpollFd,F_SETFL,opts)<0)  
  16.     {  
  17.         printf("設定非阻塞模式失敗!\n");  
  18.         returnfalse;  
  19.     }  
  20.     m_isock = socket(AF_INET,SOCK_STREAM,0);  
  21.     if ( 0 > m_isock )  
  22.     {  
  23.         printf("socket error!\n");  
  24.         returnfalse;  
  25.   }  
  26.   sockaddr_in listen_addr;  
  27.       listen_addr.sin_family=AF_INET;  
  28.       listen_addr.sin_port=htons ( iPort );  
  29.       listen_addr.sin_addr.s_addr=htonl(INADDR_ANY);  
  30.       listen_addr.sin_addr.s_addr=inet_addr(pIp);  
  31.       int ireuseadd_on = 1;//支援埠複用
  32.       setsockopt(m_isock, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on) );  
  33.       if ( bind ( m_isock, ( sockaddr * ) &listen_addr,sizeof ( listen_addr ) ) !=0 )  
  34.       {  
  35.           printf("bind error\n");  
  36.           returnfalse;  
  37.       }  
  38.       if ( listen ( m_isock, 20) <0 )  
  39.       {  
  40.           printf("listen error!\n");  
  41.           returnfalse;  
  42.       }  
  43.       else
  44.       {  
  45.           printf("服務端監聽中...\n");  
  46.       }  
  47.       // 監聽執行緒,此執行緒負責接收客戶端連線,加入到epoll中
  48.       if ( pthread_create( &m_ListenThreadId, 0, ( void * ( * ) ( void * ) ) ListenThread, this ) != 0 )  
  49.       {  
  50.           printf("Server 監聽執行緒建立失敗!!!");  
  51.           returnfalse;  
  52.       }  
  53.   }  
  54.   // 監聽執行緒
  55.   void CEpollServer::ListenThread( void* lpVoid )  
  56.   {  
  57.       CEpollServer *pTerminalServer = (CEpollServer*)lpVoid;  
  58.       sockaddr_in remote_addr;  
  59.       int len = sizeof (remote_addr);  
  60.       while ( true )  
  61.       {  
  62.           int client_socket = accept (pTerminalServer->m_isock, ( sockaddr * ) &remote_addr,(socklen_t*)&len );  
  63.           if ( client_socket < 0 )  
  64.           {  
  65.               printf("Server Accept失敗!, client_socket: %d\n", client_socket);  
  66.               continue;  
  67.           }  
  68.           else
  69.           {  
  70.               struct epoll_event    ev;  
  71.               ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;  
  72.               ev.data.fd = client_socket;     //記錄socket控制代碼
  73.               epoll_ctl(pTerminalServer->m_iEpollFd, EPOLL_CTL_ADD, client_socket, &ev);  
  74.           }  
  75.       }  
  76.   }  
  77.   void CEpollServer::Run()  
  78.   {  
  79.       while ( true )  
  80.       {  
  81.           struct epoll_event    events[_MAX_SOCKFD_COUNT];  
  82.           int nfds = epoll_wait( m_iEpollFd, events,  _MAX_SOCKFD_COUNT, -1 );  
  83.           for (int i = 0; i < nfds; i++)  
  84.           {  
  85.               int client_socket = events[i].data.fd;  
  86.               char buffer[1024];//每次收發的位元組數小於1024位元組
  87.               memset(buffer, 0, 1024);  
  88.               if (events[i].events & EPOLLIN)//監聽到讀事件,接收資料
  89.               {  
  90.                   int rev_size = recv(events[i].data.fd,buffer, 1024,0);  
  91.                   if( rev_size <= 0 )  
  92.                   {  
  93.                       cout << "recv error: recv size: " << rev_size << endl;  
  94.                       struct epoll_event event_del;  
  95.                       event_del.data.fd = events[i].data.fd;  
  96.                       event_del.events = 0;  
  97.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);  
  98.                   }  
  99.                   else
  100.                   {  
  101.                       printf("Terminal Received Msg Content:%s\n",buffer);  
  102.                       struct epoll_event    ev;  
  103.                       ev.events = EPOLLOUT | EPOLLERR | EPOLLHUP;  
  104.                       ev.data.fd = client_socket;     //記錄socket控制代碼
  105.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);  
  106.                   }  
  107.               }  
  108.   elseif(events[i].events & EPOLLOUT)//監聽到寫事件,傳送資料
  109.               {  
  110.                   char sendbuff[1024];  
  111.                   sprintf(sendbuff, "Hello, client fd: %d\n", client_socket);  
  112.                   int sendsize = send(client_socket, sendbuff, strlen(sendbuff)+1, MSG_NOSIGNAL);  
  113.                   if(sendsize <= 0)  
  114.                   {  
  115.                       struct epoll_event event_del;  
  116.                       event_del.data.fd = events[i].data.fd;  
  117.                       event_del.events = 0;  
  118.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, event_del.data.fd, &event_del);  
  119.                   }  
  120.                   else
  121.                   {  
  122.                       printf("Server reply msg ok! buffer: %s\n", sendbuff);  
  123.                       struct epoll_event    ev;  
  124.                       ev.events = EPOLLIN | EPOLLERR | EPOLLHUP;  
  125.                       ev.data.fd = client_socket;     //記錄socket控制代碼
  126.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, client_socket, &ev);  
  127.                   }  
  128.               }  
  129.               else
  130.               {  
  131.                   cout << "EPOLL ERROR\n" <<endl;  
  132.                   epoll_ctl(m_iEpollFd, EPOLL_CTL_DEL, events[i].data.fd, &events[i]);  
  133.               }  
  134.           }  
  135.       }  
  136.   }  

main.cpp

  1. #include <iostream>
  2. #include "cepollserver.h"
  3. usingnamespace std;  
  4. int main()  
  5. {  
  6.         CEpollServer  theApp;  
  7.         theApp.InitServer("127.0.0.1", 8000);  
  8.         theApp.Run();  
  9.         return 0;  
  10. }  

客戶端程式碼:

說明:測試是兩個併發進行測試,每一個客戶端都是一個長連線。程式碼中在連線伺服器(ConnectToServer)時將使用者IDsocketid關聯起來。使用者IDsocketid是一一對應的關係。

cepollclient.h

  1. #ifndef _DEFINE_EPOLLCLIENT_H_
  2. #define _DEFINE_EPOLLCLIENT_H_
  3. #define _MAX_SOCKFD_COUNT 65535
  4. #include<iostream>
  5. #include <sys/epoll.h>
  6. #include <sys/socket.h>
  7. #include <netinet/in.h>
  8. #include <fcntl.h>
  9. #include <arpa/inet.h>
  10. #include <errno.h>
  11. #include <sys/ioctl.h>
  12. #include <sys/time.h>
  13. #include <string>
  14. usingnamespace std;  
  15. /** 
  16.  * @brief 使用者狀態 
  17.  */
  18. typedefenum _EPOLL_USER_STATUS_EM  
  19. {  
  20.         FREE = 0,  
  21.         CONNECT_OK = 1,//連線成功
  22.         SEND_OK = 2,//傳送成功
  23.         RECV_OK = 3,//接收成功
  24. }EPOLL_USER_STATUS_EM;  
  25. /*@brief 
  26.  *@CEpollClient class 使用者狀態結構體 
  27.  */
  28. struct UserStatus  
  29. {  
  30.         EPOLL_USER_STATUS_EM iUserStatus;//使用者狀態
  31.         int iSockFd;//使用者狀態關聯的socketfd
  32.         char cSendbuff[1024];//傳送的資料內容
  33.         int iBuffLen;//傳送資料內容的長度
  34.         unsigned int uEpollEvents;//Epoll events
  35. };  
  36. class CEpollClient  
  37. {  
  38.         public:  
  39.                 /** 
  40.                  * @brief 
  41.                  * 函式名:CEpollClient 
  42.                  * 描述:建構函式 
  43.                  * @param [in] iUserCount  
  44.                  * @param [in] pIP IP地址 
  45.                  * @param [in] iPort 埠號 
  46.                  * @return 無返回 
  47.                  */
  48.                 CEpollClient(int iUserCount, constchar* pIP, int iPort);  
  49. /** 
  50.                  * @brief 
  51.                  * 函式名:CEpollClient 
  52.                  * 描述:解構函式 
  53.                  * @return 無返回 
  54.                  */
  55.                 ~CEpollClient();  
  56.                 /** 
  57.                  * @brief 
  58.                  * 函式名:RunFun 
  59.                  * 描述:對外提供的介面,執行epoll類 
  60.                  * @return 無返回值 
  61.                  */
  62.                 int RunFun();  
  63.         private:  
  64.                 /** 
  65.                  * @brief 
  66.                  * 函式名:ConnectToServer 
  67.                  * 描述:連線到伺服器 
  68.                  * @param [in] iUserId 使用者ID 
  69.                  * @param [in] pServerIp 連線的伺服器IP 
  70.                  * @param [in] uServerPort 連線的伺服器埠號 
  71.                  * @return 成功返回socketfd,失敗返回的socketfd為-1 
  72.                  */
  73.                 int ConnectToServer(int iUserId,constchar *pServerIp,unsigned short uServerPort);  
  74. /** 
  75.                  * @brief 
  76.                  * 函式名:SendToServerData 
  77.                  * 描述:給伺服器傳送使用者(iUserId)的資料 
  78.                  * @param [in] iUserId 使用者ID 
  79.                  * @return 成功返回傳送資料長度 
  80.                  */
  81.                 int SendToServerData(int iUserId);  
  82.                 /** 
  83.                  * @brief 
  84.                  * 函式名:RecvFromServer 
  85.                  * 描述:接收使用者回覆訊息 
  86.                  * @param [in] iUserId 使用者ID 
  87.                  * @param [in] pRecvBuff 接收的資料內容 
  88.                  * @param [in] iBuffLen 接收的資料長度 
  89.                  * @return 成功返回接收的資料長度,失敗返回長度為-1 
  90.                  */
  91.                 int RecvFromServer(int iUserid,char *pRecvBuff,int iBuffLen);  
  92.                 /** 
  93.                  * @brief 
  94.                  * 函式名:CloseUser 
  95.                  * 描述:關閉使用者 
  96.                  * @param [in] iUserId 使用者ID 
  97.                  * @return 成功返回true 
  98.                  */
  99.                 bool CloseUser(int iUserId);  
  100. /** 
  101.                  * @brief 
  102.                  * 函式名:DelEpoll 
  103.                  * 描述:刪除epoll事件 
  104.                  * @param [in] iSockFd socket FD 
  105.                  * @return 成功返回true 
  106.                  */
  107.                 bool DelEpoll(int iSockFd);  
  108.         private:  
  109.                 int    m_iUserCount;//使用者數量;
  110.                 struct UserStatus *m_pAllUserStatus;//使用者狀態陣列
  111.                 int    m_iEpollFd;//需要建立epollfd
  112.                 int    m_iSockFd_UserId[_MAX_SOCKFD_COUNT];//將使用者ID和socketid關聯起來
  113.                 int    m_iPort;//埠號
  114.                 char   m_ip[100];//IP地址
  115. };  
  116. #endif

cepollclient.cpp

  1. #include "cepollclient.h"
  2. CEpollClient::CEpollClient(int iUserCount, constchar* pIP, int iPort)  
  3. {  
  4.     strcpy(m_ip, pIP);  
  5.     m_iPort = iPort;  
  6.     m_iUserCount = iUserCount;  
  7.     m_iEpollFd = epoll_create(_MAX_SOCKFD_COUNT);  
  8.     m_pAllUserStatus = (struct UserStatus*)malloc(iUserCount*sizeof(struct UserStatus));  
  9.     for(int iuserid=0; iuserid<iUserCount ; iuserid++)  
  10.     {  
  11.         m_pAllUserStatus[iuserid].iUserStatus = FREE;  
  12.         sprintf(m_pAllUserStatus[iuserid].cSendbuff, "Client: %d send message \"Hello Server!\"\r\n", iuserid);  
  13.         m_pAllUserStatus[iuserid].iBuffLen = strlen(m_pAllUserStatus[iuserid].cSendbuff) + 1;  
  14.         m_pAllUserStatus[iuserid].iSockFd = -1;  
  15.     }  
  16.     memset(m_iSockFd_UserId, 0xFF, sizeof(m_iSockFd_UserId));  
  17. }  
  18. CEpollClient::~CEpollClient()  
  19. {  
  20.     free(m_pAllUserStatus);  
  21. }  
  22. int CEpollClient::ConnectToServer(int iUserId,constchar *pServerIp,unsigned short uServerPort)  
  23. {  
  24.     if( (m_pAllUserStatus[iUserId].iSockFd = socket(AF_INET,SOCK_STREAM,0) ) < 0 )  
  25.     {  
  26.         cout <<"[CEpollClient error]: init socket fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<<endl;  
  27.         m_pAllUserStatus[iUserId].iSockFd = -1;  
  28.         return  m_pAllUserStatus[iUserId].iSockFd;  
  29.     }  
  30.     struct sockaddr_in addr;  
  31.     bzero(&addr, sizeof(addr));  
  32.     addr.sin_family = AF_INET;  
  33.     addr.sin_port = htons(uServerPort);  
  34.     addr.sin_addr.s_addr = inet_addr(pServerIp);  
  35.     int ireuseadd_on = 1;//支援埠複用
  36.     setsockopt(m_pAllUserStatus[iUserId].iSockFd, SOL_SOCKET, SO_REUSEADDR, &ireuseadd_on, sizeof(ireuseadd_on));  
  37.     unsigned long ul = 1;  
  38.     ioctl(m_pAllUserStatus[iUserId].iSockFd, FIONBIO, &ul); //設定為非阻塞模式
  39.     connect(m_pAllUserStatus[iUserId].iSockFd, (const sockaddr*)&addr, sizeof(addr));  
  40.     m_pAllUserStatus[iUserId].iUserStatus = CONNECT_OK;  
  41.     m_pAllUserStatus[iUserId].iSockFd = m_pAllUserStatus[iUserId].iSockFd;  
  42.     return m_pAllUserStatus[iUserId].iSockFd;  
  43. }  
  44. int CEpollClient::SendToServerData(int iUserId)  
  45. {  
  46.     sleep(1);//此處控制傳送頻率,避免狂打日誌,正常使用中需要去掉
  47.     int isendsize = -1;  
  48.     if( CONNECT_OK == m_pAllUserStatus[iUserId].iUserStatus || RECV_OK == m_pAllUserStatus[iUserId].iUserStatus)  
  49.     {  
  50.         isendsize = send(m_pAllUserStatus[iUserId].iSockFd, m_pAllUserStatus[iUserId].cSendbuff, m_pAllUserStatus[iUserId  
  51. ].iBuffLen, MSG_NOSIGNAL);  
  52.         if(isendsize < 0)  
  53.         {  
  54.             cout <<"[CEpollClient error]: SendToServerData, send fail, reason is:"<<strerror(errno)<<",errno is:"<<errno<  
  55. <endl;  
  56.         }  
  57.         else
  58.         {  
  59.             printf("[CEpollClient info]: iUserId: %d Send Msg Content:%s\n", iUserId, m_pAllUserStatus[iUserId].cSendbuff  
  60. );  
  61.             m_pAllUserStatus[iUserId].iUserStatus = SEND_OK;  
  62.         }  
  63.     }  
  64.     return isendsize;  
  65. }  
  66. int CEpollClient::RecvFromServer(int iUserId,char *pRecvBuff,int iBuffLen)  
  67. {  
  68.     int irecvsize = -1;  
  69.     if(SEND_OK == m_pAllUserStatus[iUserId].iUserStatus)  
  70.     {  
  71.         irecvsize = recv(m_pAllUserStatus[iUserId].iSockFd, pRecvBuff, iBuffLen, 0);  
  72.         if(0 > irecvsize)  
  73.         {  
  74.             cout <<"[CEpollClient error]: iUserId: " << iUserId << "RecvFromServer, recv fail, reason is:"<<strerror(errn  
  75. o)<<",errno is:"<<errno<<endl;  
  76.         }  
  77.         elseif(0 == irecvsize)  
  78.         {  
  79.             cout <<"[warning:] iUserId: "<< iUserId << "RecvFromServer, STB收到資料為0,表示對方斷開連線,irecvsize:"<<ire  
  80. cvsize<<",iSockFd:"<< m_pAllUserStatus[iUserId].iSockFd << endl;  
  81.         }  
  82.         else
  83.         {  
  84.             printf("Recv Server Msg Content:%s\n", pRecvBuff);  
  85.             m_pAllUserStatus[iUserId].iUserStatus = RECV_OK;  
  86.         }  
  87.     }  
  88.     return irecvsize;  
  89. }  
  90. bool CEpollClient::CloseUser(int iUserId)  
  91. {  
  92.     close(m_pAllUserStatus[iUserId].iSockFd);  
  93.     m_pAllUserStatus[iUserId].iUserStatus = FREE;  
  94.     m_pAllUserStatus[iUserId].iSockFd = -1;  
  95.     returntrue;  
  96. }  
  97. int CEpollClient::RunFun()  
  98. {  
  99.     int isocketfd = -1;  
  100.     for(int iuserid=0; iuserid<m_iUserCount; iuserid++)  
  101.     {  
  102.         struct epoll_event event;  
  103.         isocketfd = ConnectToServer(iuserid, m_ip, m_iPort);  
  104.         if(isocketfd < 0)  
  105.             cout <<"[CEpollClient error]: RunFun, connect fail" <<endl;  
  106.         m_iSockFd_UserId[isocketfd] = iuserid;//將使用者ID和socketid關聯起來
  107.         event.data.fd = isocketfd;  
  108.         event.events = EPOLLIN|EPOLLOUT|EPOLLERR|EPOLLHUP;  
  109.         m_pAllUserStatus[iuserid].uEpollEvents = event.events;  
  110.         epoll_ctl(m_iEpollFd, EPOLL_CTL_ADD, event.data.fd, &event);  
  111.   }  
  112.   while(1)  
  113.       {  
  114.           struct epoll_event events[_MAX_SOCKFD_COUNT];  
  115.           char buffer[1024];  
  116.           memset(buffer,0,1024);  
  117.           int nfds = epoll_wait(m_iEpollFd, events, _MAX_SOCKFD_COUNT, 100 );//等待epoll事件的產生
  118.           for (int ifd=0; ifd<nfds; ifd++)//處理所發生的所有事件
  119.           {  
  120.               struct epoll_event event_nfds;  
  121.               int iclientsockfd = events[ifd].data.fd;  
  122.               cout << "events[ifd].data.fd: " << events[ifd].data.fd << endl;  
  123.               int iuserid = m_iSockFd_UserId[iclientsockfd];//根據socketfd得到使用者ID
  124.               if( events[ifd].events & EPOLLOUT )  
  125.               {  
  126.                   int iret = SendToServerData(iuserid);  
  127.                   if( 0 < iret )  
  128.                   {  
  129.                       event_nfds.events = EPOLLIN|EPOLLERR|EPOLLHUP;  
  130.                       event_nfds.data.fd = iclientsockfd;  
  131.                       epoll_ctl(m_iEpollFd, EPOLL_CTL_MOD, event_nfds.data.fd, &event_nfds);  
  132.                   }  
  133.                   else
  134.                   {  
  135.                       cout <<"[CEpollClient error:] EpollWait, SendToServerData fail, send iret:"<<iret<<",iuserid:"<<iuser  
  136.   id<<",fd:"<<events[ifd].data.fd<<endl;  
  137.                       DelEpoll(events[ifd].data.fd);  
  138.                       CloseUser(iuserid);  
  139.                   }  
  140.               }  
  141.   elseif( events[ifd].events &