1. 程式人生 > >Linux I/O多路複用技術-epoll

Linux I/O多路複用技術-epoll

 Linux I/O多路複用技術在比較多的TCP網路伺服器中有使用,即比較多的用到select函式。Linux 2.6核心中有提高網路I/O效能的新方法,即epoll 。

1、為什麼select落後
    首先,在Linux核心中,select所用到的FD_SET是有限的,即核心中有個引數__FD_SETSIZE定義了每個FD_SET的控制代碼個數,在我用的2.6.15-25-386核心中,該值預設是1024,搜尋核心原始碼得到:
include/linux/posix_types.h:
#define __FD_SETSIZE         1024
    也就是說,如果想要同時檢測1025個控制代碼的可讀狀態是不可能用select實現的。或者同時檢測1025個控制代碼的可寫狀態也是不可能的。

    其次,核心中實現select是使用輪詢方法,即每次檢測都會遍歷所有FD_SET中的控制代碼,顯然,select函式的執行時間與FD_SET中控制代碼的個數有一個比例關係,即select要檢測的控制代碼數越多就會越費時。當然,在前文中我並沒有提及poll方法,事實上用select的朋友一定也試過poll,我個人覺得select和poll大同小異,個人偏好於用select而已。

2、核心中提高I/O效能的新方法 epoll
    epoll是什麼?按照man手冊的說法:是為處理大批量控制代碼而作了改進的poll。要使用epoll只需要以下的三個系統函式呼叫:epoll_create(2), epoll_ctl(2), epoll_wait(2)。

Linux2.6核心epoll介紹
    先介紹2本書《The

 Linux Networking Architecture--Design and Implementation of Network Protocols in the Linux Kernel》,以2.4核心講解Linux TCP/IP實現,相當不錯。作為一個現實世界中的實現,很多時候你必須作很多權衡,這時候參考一個久經考驗的系統更有實際意義。舉個例子,linux核心中sk_buff結構為了追求速度和安全,犧牲了部分記憶體,所以在傳送TCP包的時候,無論應用層資料多大,sk_buff最小也有272的位元組。其實對於socket應用層程式來說,另外一本書《UNIX Network Programming Volume 1》
意義更大一點。2003年的時候,這本書出了最新的第3版本,不過主要還是修訂第2版本。其中第6章《I/O Multiplexing》是最重要的,Stevens給出了網路IO的基本模型。在這裡最重要的莫過於select模型和Asynchronous I/O模型。從理論上說,AIO似乎是最高效的,你的IO操作可以立即返回,然後等待os告訴你IO操作完成。但是一直以來,如何實現就沒有一個完美的方案。最著名的windows完成埠實現的AIO,實際上也只是內部用執行緒池實現的罷了,最後的結果是IO有個執行緒池,你的應用程式也需要一個執行緒池...... 很多文件其實已經指出了這引發的執行緒context-switch所帶來的代價。在linux 平臺上,關於網路AIO一直是改動最多的地方,2.4的年代就有很多AIO核心patch,最著名的應該算是SGI。但是一直到2.6核心釋出,網路模組的AIO一直沒有進入穩定核心版本(大部分都是使用使用者執行緒模擬方法,在使用了NPTL的linux上面其實和windows的完成埠基本上差不多了)。2.6核心所支援的AIO特指磁碟的AIO---支援io_submit(),io_getevents()以及對Direct IO的支援(即:就是繞過VFS系統buffer直接寫硬碟,對於流伺服器在記憶體平穩性上有相當的幫助)。
    所以,剩下的select模型基本上就成為我們在linux上面的唯一選擇,其實,如果加上no-block socket的配置,可以完成一個"偽"AIO的實現,只不過推動力在於你而不是os而已。不過傳統的select/poll函式有著一些無法忍受的缺點,所以改進一直是2.4-2.5開發版本核心的任務,包括/dev/poll,realtime signal等等。最終,Davide Libenzi開發的epoll進入2.6核心成為正式的解決方案。

3、epoll的優點
 1> 支援一個程序開啟大數目的socket描述符(FD)
    select 最不能忍受的是一個程序所開啟的FD是有一定限制的,由FD_SETSIZE設定,預設值是2048。對於那些需要支援上萬連線數目的IM伺服器來說顯然太少了。這時候你一是可以選擇修改這個巨集然後重新編譯核心,不過資料也同時指出這樣會帶來網路效率的下降;二是可以選擇多程序的解決方案(傳統的Apache方案),不過雖然linux上面建立程序的代價比較小,但仍舊是不可忽視的,加上程序間資料同步遠比不上執行緒間同步高效,所以這也不是一種完美的方案。不過epoll 沒有這個限制,它所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於select 所支援的2048。舉個例子,在1GB記憶體的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。

    2> IO效率不隨FD數目增加而線性下降
    傳統select/poll的另一個致命弱點就是當你擁有一個很大的socket集合,由於網路得延時,使得任一時間只有部分的socket是"活躍"的,而select/poll每次呼叫都會線性掃描全部的集合,導致效率呈現線性下降。但是epoll不存在這個問題,它只會對"活躍"的socket進行操作---這是因為在核心實現中epoll是根據每個fd上面的callback函式實現的。於是,只有"活躍"的socket才會主動去呼叫callback函式,其他idle狀態的socket則不會,在這點上,epoll實現了一個"偽"AIO,因為這時候推動力在os核心。在一些 benchmark中,如果所有的socket基本上都是活躍的---比如一個高速LAN環境,epoll也不比select/poll低多少效率,但若過多使用的呼叫epoll_ctl,效率稍微有些下降。然而一旦使用idle connections模擬WAN環境,那麼epoll的效率就遠在select/poll之上了。

 3> 使用mmap加速核心與使用者空間的訊息傳遞
    這點實際上涉及到epoll的具體實現。無論是select,poll還是epoll都需要核心把FD訊息通知給使用者空間,如何避免不必要的記憶體拷貝就顯得很重要,在這點上,epoll是通過核心於使用者空間mmap同一塊記憶體實現的。而如果你像我一樣從2.5核心就開始關注epoll的話,一定不會忘記手工mmap這一步的。

   4> 核心微調
    這一點其實不算epoll的優點,而是整個linux平臺的優點。也許你可以懷疑linux平臺,但是你無法迴避linux平臺賦予你微調核心的能力。比如,核心TCP/IP協議棧使用記憶體池管理sk_buff結構,可以在執行期間動態地調整這個記憶體pool(skb_head_pool)的大小---通過echo XXXX>/proc/sys/net/core/hot_list_length來完成。再比如listen函式的第2個引數(TCP完成3次握手的資料包佇列長度),也可以根據你平臺記憶體大小來動態調整。甚至可以在一個數據包面數目巨大但同時每個資料包本身大小卻很小的特殊系統上嘗試最新的NAPI網絡卡驅動架構。

4、epoll的工作模式
    令人高興的是,linux2.6核心的epoll比其2.5開發版本的/dev/epoll簡潔了許多,所以,大部分情況下,強大的東西往往是簡單的。唯一有點麻煩的是epoll有2種工作方式:LT和ET
    LT(level triggered)是預設的工作方式,並且同時支援block和no-block socket。在這種做法中,核心告訴你一個檔案描述符是否就緒了,然後你可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,所以,這種模式程式設計出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表
    ET (edge-triggered) 是高速工作方式,只支援no-block socket。在這種模式下,當描述符從未就緒變為就緒時,核心就通過epoll告訴你,然後它會假設你知道檔案描述符已經就緒,並且不會再為那個檔案描述符傳送更多的就緒通知,直到你做了某些操作而導致那個檔案描述符不再是就緒狀態(比如 你在傳送,接收或是接受請求,或者傳送接收的資料少於一定量時導致了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),核心就不會發送更多的通知(only once)。不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。
    epoll只有epoll_create,epoll_ctl,epoll_wait 3個系統呼叫,具體用法請參考http://www.xmailserver.org/linux-patches/nio-improve.html,在http://www.kegel.com/rn/也有一個完整的例子,大家一看就知道如何使用了。

5、 epoll的使用方法

epoll用到的所有函式都是在標頭檔案sys/epoll.h中宣告的,下面簡要說明所用到的資料結構和函式:
所用到的資料結構:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;

struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
結構體epoll_event 被用於註冊所感興趣的事件和回傳所發生待處理的事件,而epoll_data 聯合體用來儲存觸發事件的某個檔案描述符相關的資料。例如一個client連線到伺服器,伺服器通過呼叫accept函式可以得到於這個client對應的socket檔案描述符,可以把這檔案描述符賦給epoll_data的fd欄位,以便後面的讀寫操作在這個檔案描述符上進行。epoll_event 結構體的events欄位是表示感興趣的事件和被觸發的事件,可能的取值為:
EPOLLIN:表示對應的檔案描述符可以讀;
EPOLLOUT:表示對應的檔案描述符可以寫;
EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀;
EPOLLERR:表示對應的檔案描述符發生錯誤;
EPOLLHUP:表示對應的檔案描述符被結束通話;
EPOLLET:表示對應的檔案描述符有事件發生;
所用到的函式:
1)、epoll_create函式
函式宣告:int epoll_create(int size)
該函式生成一個epoll專用的檔案描述符,其中的引數是指定生成描述符的最大範圍。
2)、epoll_ctl函式
函式宣告:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
該函式用於控制某個檔案描述符上的事件,可以註冊事件,修改事件,刪除事件。
引數:
epfd:由 epoll_create 生成的epoll專用的檔案描述符;
op:要進行的操作,可能的取值EPOLL_CTL_ADD 註冊、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 刪除;
fd:關聯的檔案描述符;
event:指向epoll_event的指標;
如果呼叫成功則返回0,不成功則返回-1。
3)、epoll_wait函式
函式宣告:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
該函式用於輪詢I/O事件的發生。
引數:
epfd:由epoll_create 生成的epoll專用的檔案描述符;
epoll_event:用於回傳代處理事件的陣列;
maxevents:每次能處理的事件數;
timeout:等待I/O事件發生的超時值;
返回發生事件數。

    首先通過create_epoll(int maxfds)來建立一個epoll的控制代碼,其中maxfds為你的epoll所支援的最大控制代碼數。這個函式會返回一個新的epoll控制代碼,之後的所有操作都將通過這個控制代碼來進行操作。在用完之後,記得用close()來關閉這個創建出來的epoll控制代碼

    之後在你的網路主迴圈裡面呼叫epoll_wait(int epfd, epoll_event events, int max_events, int timeout)來查詢所有的網路介面,看哪一個可以讀,哪一個可以寫。基本的語法為: 
nfds = epoll_wait(kdpfd, events, maxevents, -1); 
其中kdpfd為用epoll_create建立之後的控制代碼,events是一個epoll_event*的指標,當epoll_wait函式操作成功之後,events裡面將儲存所有的讀寫事件。max_events是當前需要監聽的所有socket控制代碼數。最後一個timeout引數指示 epoll_wait的超時條件,為0時表示馬上返回;為-1時表示函式會一直等下去直到有事件返回;為任意正整數時表示等這麼長的時間,如果一直沒有事件,則會返回。一般情況下如果網路主迴圈是單執行緒的話,可以用-1來等待,這樣可以保證一些效率,如果是和主迴圈在同一個執行緒的話,則可以用0來保證主迴圈的效率。epoll_wait返回之後,應該進入一個迴圈,以便遍歷所有的事件。

    對epoll 的操作就這麼簡單,總共不過4個API:epoll_create, epoll_ctl, epoll_wait和close。以下是man中的一個例子。

struct epoll_event ev, *events;
for(;;) {
 nfds = epoll_wait(kdpfd, events, maxevents, -1); //等待I/O事件
 for(n = 0; n < nfds; ++n) {
  if(events[n].data.fd == listener) { //如果是主socket的事件,則表示有新連線進入,需要進行新連線的處理。
    client = accept(listener, (struct sockaddr *) &local,  &addrlen);
    if(client < 0){
      perror("accept error");
      continue;
    }
    setnonblocking(client); // 將新連線置於非阻塞模式
    ev.events = EPOLLIN | EPOLLET; 
                                   //注意這裡的引數EPOLLIN | EPOLLET並沒有設定對寫socket的監聽,
                                   //如果有寫操作的話,這個時候epoll是不會返回事件的,
                                   //如果要對寫操作也監聽的話,應該是EPOLLIN | EPOLLOUT | EPOLLET。
    ev.data.fd = client; // 並且將新連線也加入EPOLL的監聽佇列
    if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {  // 設定好event之後,將這個新的event通過epoll_ctl
                                                             //加入到epoll的監聽佇列裡,這裡用EPOLL_CTL_ADD
                                                             //來加一個新的 epoll事件。可以通過EPOLL_CTL_DEL來減少
                                                             //一個epoll事件,通過EPOLL_CTL_MOD來改變一個事件的監聽方式。
      fprintf(stderr, "epoll set insertion error: fd=%d"0, client);
      return -1;
    }
  }  else // 如果不是主socket的事件的話,則代表這是一個使用者的socket的事件,
            // 則用來處理這個使用者的socket的事情是,比如說read(fd,xxx)之類,或者一些其他的處理。
    do_use_fd(events[n].data.fd);
 }
}

6、Linux下epoll程式設計例項
    epoll 模型似乎只有一種格式,所以大家只要參考下面的程式碼,就能夠對epoll有所瞭解了。

while (TRUE) {
  int nfds = epoll_wait (m_epoll_fd, m_events, MAX_EVENTS, EPOLL_TIME_OUT); //等待EPOLL事件的發生
                                                                                    //至於相關的埠,則需要在初始化EPOLL的時候繫結。
  if (nfds <= 0)    continue;
  m_bOnTimeChecking = FALSE;
  g_CurTime = time(NULL);
  for (int i=0; i<nfds; i++) {
    try {
      if (m_events[i].data.fd == m_listen_http_fd) //如果新監測到一個HTTP使用者連線到繫結的HTTP埠則建立新連線。
      {
        OnAcceptHttpEpoll ();
      } else if (m_events[i].data.fd == m_listen_sock_fd) //如果新監測到一個SOCKET使用者連線到了繫結的SOCKET埠則
                                                                                     //建立新的連線。
      {
        OnAcceptSockEpoll ();
      } else if (m_events[i].events & EPOLLIN) //如果是已經連線的使用者,並且收到資料,那麼進行讀入操作。
      {
        OnReadEpoll (i);
      }
      OnWriteEpoll (i); //檢視當前的活動連線是否有需要寫出的資料。
    } catch (int) {
      PRINTF ("CATCH捕獲錯誤\n");
      continue;
    }
  }
  m_bOnTimeChecking = TRUE;
  OnTimer (); //進行一些定時的操作,主要就是刪除一些斷線使用者等。
}


 *****************************************************************************************************************************************




    Epoll模型主要負責對大量併發使用者的請求進行及時處理,完成伺服器與客戶端的資料互動。其具體的實現步驟如下:
(a) 使用epoll_create()函式建立檔案描述,設定可管理的最大socket描述符數目。
(b) 建立與epoll關聯的接收執行緒,應用程式可以建立多個接收執行緒來處理epoll上的讀通知事件,執行緒的數量依賴於程式的具體需要。
(c) 建立一個偵聽socket的描述符ListenSock,並將該描述符設定為非阻塞模式,呼叫Listen()函式在該套接字上偵聽有無新的連線請求,在epoll_event結構中設定要處理的事件型別EPOLLIN,工作方式為 epoll_ET,以提高工作效率,同時使用epoll_ctl()來註冊事件,最後啟動網路監視執行緒。
(d) 網路監視執行緒啟動迴圈,epoll_wait()等待epoll事件發生。
(e) 如果epoll事件表明有新的連線請求,則呼叫accept()函式,將使用者socket描述符新增到epoll_data聯合體,同時設定該描述符為非阻塞,並在epoll_event結構中設定要處理的事件型別為讀和寫,工作方式為epoll_ET。
(f) 如果epoll事件表明socket描述符上有資料可讀,則將該socket描述符加入可讀佇列,通知接收執行緒讀入資料,並將接收到的資料放入到接收資料的連結串列中,經邏輯處理後,將反饋的資料包放入到傳送資料鏈表中,等待由傳送執行緒傳送。

例子程式碼:

#include <iostream>
#include <sys/socket.h> 
#include <sys/epoll.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <fcntl.h> 
#include <unistd.h> 
#include <stdio.h>

#define MAXLINE 10 
#define OPEN_MAX 100 
#define LISTENQ 20 
#define SERV_PORT 5555 
#define INFTIM 1000

void setnonblocking(int sock) 

  int opts; 
  opts=fcntl(sock,F_GETFL);

  if(opts<0) 
  { 
    perror("fcntl(sock,GETFL)"); 
    exit(1); 
  }

  opts = opts | O_NONBLOCK;

  if(fcntl(sock,F_SETFL,opts)<0) 
  { 
    perror("fcntl(sock,SETFL,opts)"); 
    exit(1); 
  } 
}


int main() 

  int i, maxi, listenfd, connfd, sockfd, epfd, nfds; 
  ssize_t n; 
  char line[MAXLINE]; 
  socklen_t clilen;

  struct epoll_event ev,events[20]; //宣告epoll_event結構體的變數, ev用於註冊事件, events陣列用於回傳要處理的事件 
  epfd=epoll_create(256); //生成用於處理accept的epoll專用的檔案描述符, 指定生成描述符的最大範圍為256 

  struct sockaddr_in clientaddr; 
  struct sockaddr_in serveraddr;

  listenfd = socket(AF_INET, SOCK_STREAM, 0);

  setnonblocking(listenfd); //把用於監聽的socket設定為非阻塞方式

  ev.data.fd=listenfd; //設定與要處理的事件相關的檔案描述符 
  ev.events=EPOLLIN | EPOLLET; //設定要處理的事件型別 
  epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //註冊epoll事件

  bzero(&serveraddr, sizeof(serveraddr)); 
  serveraddr.sin_family = AF_INET; 
  char *local_addr="200.200.200.204"; 
  inet_aton(local_addr,&(serveraddr.sin_addr));
  serveraddr.sin_port=htons(SERV_PORT);  //或者htons(SERV_PORT);

  bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));

  listen(listenfd, LISTENQ);

  maxi = 0;

  for( ; ; ) { 
    nfds=epoll_wait(epfd,events,20,500); //等待epoll事件的發生

    for(i=0;i<nfds;++i) //處理所發生的所有事件 
      { 
       if(events[i].data.fd==listenfd)    /**監聽事件**/
        { 
           connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); 
           if(connfd<0){ 
             perror("connfd<0"); 
             exit(1); 
           }

         setnonblocking(connfd); //把客戶端的socket設定為非阻塞方式

         char *str = inet_ntoa(clientaddr.sin_addr); 
         std::cout<<"connect from "<_u115 ? tr<<std::endl;

         ev.data.fd=connfd; //設定用於讀操作的檔案描述符 
         ev.events=EPOLLIN | EPOLLET; //設定用於注測的讀操作事件 
         epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //註冊ev事件 
       } 
      else if(events[i].events&EPOLLIN)     /**讀事件**/
        { 
           if ( (sockfd = events[i].data.fd) < 0) continue; 
           if ( (n = read(sockfd, line, MAXLINE)) < 0) { 
              if (errno == ECONNRESET) { 
                close(sockfd); 
                events[i].data.fd = -1; 
                } else
                  { 
                    std::cout<<"readline error"<<std::endl; 
                  }
             } else if (n == 0) {
                close(sockfd); 
                events[i].data.fd = -1; 
              }

          ev.data.fd=sockfd; //設定用於寫操作的檔案描述符 
          ev.events=EPOLLOUT | EPOLLET; //設定用於注測的寫操作事件 
          epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要處理的事件為EPOLLOUT 
       } 
      else if(events[i].events&EPOLLOUT)    /**寫事件**/
        { 
          sockfd = events[i].data.fd; 
          write(sockfd, line, n);

          ev.data.fd=sockfd; //設定用於讀操作的檔案描述符 
          ev.events=EPOLLIN | EPOLLET; //設定用於註冊的讀操作事件 
          epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要處理的事件為EPOLIN 
        } 
     } 
  } 
}