1. 程式人生 > >linux IO複用之epoll

linux IO複用之epoll

linux IO複用之epoll

這篇文章是我檢視網上各種文章來總結的,為自己學習來做個筆記!!!

大多數來源於:https://www.cnblogs.com/lojunren/p/3856290.html

首先,什麼事是epoll?

epoll是Linux核心為處理大批控制代碼而作改進的poll,是Linux下多路複用IO介面select/poll的增強版本,它能顯著的減少程式在大量併發連線中只有少量活躍的情況下的系統CPU利用率。因為它會複用檔案描述符集合來傳遞結果而不是迫使開發者每次等待事件之前都必須重新準備要被偵聽的檔案描述符集合,另一個原因就是獲取事件的時候,它無須遍歷整個被偵聽的描述符集,只要遍歷那些被核心IO事件非同步喚醒而加入Ready佇列的描述符集合就行了。epoll除了提供select\poll那種IO事件的電平觸發(Level Triggered)外,還提供了邊沿觸發(Edge Triggered),這就使得使用者空間程式有可能快取IO狀態,減少epoll_wait/epoll_pwait的呼叫,提供應用程式的效率。

兩種工作方式:

 LT(level triggered):水平觸發,預設方式,同時支援block和no-block socket,在這種做法中,核心告訴我們一個檔案描述符是否被就緒了,如果就緒了,你就可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,所以,這種模式程式設計出錯的可能性較小。傳統的select\poll都是這種模型的代表。

ET(edge-triggered):邊沿觸發,高速工作方式,只支援no-block socket。在這種模式下,當描述符從未就緒變為就緒狀態時,核心通過epoll告訴你。然後它會假設你知道檔案描述符已經就緒,並且不會再為那個描述符傳送更多的就緒通知,直到你做了某些操作導致那個檔案描述符不再為就緒狀態了(比如:你在傳送、接受或者接受請求,或者傳送接受的資料少於一定量時導致了一個EWOULDBLOCK錯誤)。但是請注意,如果一直不對這個fs做IO操作(從而導致它再次變成未就緒狀態),核心不會發送更多的通知。

  區別:LT事件不會丟棄,而是隻要讀buffer裡面有資料可以讓使用者讀取,則不斷的通知你。而ET則只在事件發生之時通知。

epoll的優點:

1.支援一個程序開啟大數目的socket描述符(FD)

select 最不能忍受的是一個程序所開啟的FD是有一定限制的,由FD_SETSIZE設定,預設值是2048。對於那些需要支援的上萬連線數目的IM伺服器來說顯然太少了。這時候你一是可以選擇修改這個巨集然後重新編譯核心,不過資料也同時指出這樣會帶來網路效率的下降,二是可以選擇多程序的解決方案(傳統的 Apache方案),不過雖然linux上面建立程序的代價比較小,但仍舊是不可忽視的,加上程序間資料同步遠比不上執行緒間同步的高效,所以也不是一種完美的方案。不過 epoll則沒有這個限制,它所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於2048,舉個例子,在1GB記憶體的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。

2.IO效率不隨FD數目增加而線性下降

傳統的select/poll另一個致命弱點就是當你擁有一個很大的socket集合,不過由於網路延時,任一時間只有部分的socket是"活躍"的,但是select/poll每次呼叫都會線性掃描全部的集合,導致效率呈現線性下降。但是epoll不存在這個問題,它只會對"活躍"的socket進行操作---這是因為在核心實現中epoll是根據每個fd上面的callback函式實現的。那麼,只有"活躍"的socket才會主動的去呼叫 callback函式,其他idle狀態socket則不會,在這點上,epoll實現了一個"偽"AIO,因為這時候推動力在os核心。在一些 benchmark中,如果所有的socket基本上都是活躍的---比如一個高速LAN環境,epoll並不比select/poll有什麼效率,相反,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。

3.使用mmap加速核心與使用者空間的訊息傳遞

這點實際上涉及到epoll的具體實現了。無論是select,poll還是epoll都需要核心把FD訊息通知給使用者空間,如何避免不必要的記憶體拷貝就很重要,在這點上,epoll是通過核心於使用者空間mmap同一塊記憶體實現的。而如果你想我一樣從2.5核心就關注epoll的話,一定不會忘記手工 mmap這一步的。

4.核心微調

這一點其實不算epoll的優點了,而是整個linux平臺的優點。也許你可以懷疑linux平臺,但是你無法迴避linux平臺賦予你微調核心的能力。比如,核心TCP/IP協議棧使用記憶體池管理sk_buff結構,那麼可以在執行時期動態調整這個記憶體pool(skb_head_pool)的大小— 通過echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函式的第2個引數(TCP完成3次握手的資料包佇列長度),也可以根據你平臺記憶體大小動態調整。更甚至在一個數據包面數目巨大但同時每個資料包本身大小卻很小的特殊系統上嘗試最新的NAPI網絡卡驅動架構。

linux下epoll怎麼實現高效的功能的

首先,我們現來介紹一下epoll中所使用的函式:

  1、int epoll_create(int size)

建立一個epoll控制代碼,引數size用來告訴核心監聽的數目。

  2、int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)

epoll事件註冊函式,

  引數epfd為epoll的控制代碼;

  引數op表示動作,用3個巨集來表示:EPOLL_CTL_ADD(註冊新的fd到epfd),EPOLL_CTL_MOD(修改已經註冊的fd的監聽事件),EPOLL_CTL_DEL(從epfd刪除一個fd);

  引數fd為需要監聽的標示符;

  引數event告訴核心需要監聽的事件,event的結構如下:

struct epoll_event {
  __uint32_t events;  /* Epoll events */
  epoll_data_t data;  /* User data variable */
};

  其中events可以用以下幾個巨集的集合:

  EPOLLIN :表示對應的檔案描述符可以讀(包括對端SOCKET正常關閉)

  EPOLLOUT:表示對應的檔案描述符可以寫

  EPOLLPRI:表示對應的檔案描述符有緊急的資料可讀(這裡應該表示有帶外資料到來)

  EPOLLERR:表示對應的檔案描述符發生錯誤

  EPOLLHUP:表示對應的檔案描述符被結束通話;

  EPOLLET: 將EPOLL設為邊緣觸發(Edge Triggered)模式,這是相對於水平觸發(Level Triggered)來說的

  EPOLLONESHOT:只監聽一次事件,當監聽完這次事件之後,如果還需要繼續監聽這個socket的話,需要再次把這個socket加入到EPOLL佇列裡

3、 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)

  等待事件的產生,類似於select()呼叫。引數events用來從核心得到事件的集合,maxevents告之核心這個events有多大,這個maxevents的值不能大於建立epoll_create()時的size,引數timeout是超時時間(毫秒,0會立即返回,-1將不確定,也有說法說是永久阻塞)。該函式返回需要處理的事件數目,如返回0表示已超時。

由此可知, epoll精巧的使用了3個方法來實現select方法要做的事:

  1. 新建epoll描述符==epoll_create()
  2. epoll_ctrl(epoll描述符,新增或者刪除所有待監控的連線)
  3. 返回的活躍連線 ==epoll_wait( epoll描述符 )

    與select相比,epoll分清了頻繁呼叫和不頻繁呼叫的操作。例如,epoll_ctrl是不太頻繁呼叫的,而epoll_wait是非常頻繁呼叫的。這時,epoll_wait卻幾乎沒有入參,這比select的效率高出一大截,而且,它也不會隨著併發連線的增加使得入參越發多起來,導致核心執行效率下降。

我們繼續深入看看epoll怎麼實現的高效:

epoll的三大關鍵要素:mmap、紅黑樹、連結串列

epoll是通過核心與使用者空間mmap同一塊記憶體實現的。mmap將使用者空間的一塊地址和核心空間的一塊地址同時對映到相同的一塊實體記憶體地址(不管是使用者空間還是核心空間都是虛擬地址,最終要通過地址對映對映到實體地址),使得這塊實體記憶體對核心和對使用者均可見,減少使用者態和核心態之間的資料交換。核心可以直接看到epoll監聽的控制代碼,效率高。

  紅黑樹將儲存epoll所監聽的套接字。上面mmap出來的記憶體如何儲存epoll所監聽的套接字,必然也得有一套資料結構,epoll在實現上採用紅黑樹去儲存所有套接字,當新增或者刪除一個套接字時(epoll_ctl),都在紅黑樹上去處理,紅黑樹本身插入和刪除效能比較好,時間複雜度O(logN)。

 通過epoll_ctl函式新增進來的事件都會被放在紅黑樹的某個節點內,所以,重複新增是沒有用的。當把事件新增進來的時候時候會完成關鍵的一步,那就是該事件都會與相應的裝置(網絡卡)驅動程式建立回撥關係,當相應的事件發生後,就會呼叫這個回撥函式,該回調函式在核心中被稱為:ep_poll_callback,**這個回撥函式其實就所把這個事件新增到rdllist這個雙向連結串列中**。一旦有事件發生,epoll就會將該事件新增到雙向連結串列中。那麼當我們呼叫epoll_wait時,epoll_wait只需要檢查rdlist雙向連結串列中是否有存在註冊的事件,效率非常可觀。這裡也需要將發生了的事件複製到使用者態記憶體中即可。

  下面幾個關鍵資料結構的定義   
 1 struct epitem
 2 {
 3     struct rb_node rbn;            //用於主結構管理的紅黑樹
 4     struct list_head rdllink;       //事件就緒佇列
 5     struct epitem *next;           //用於主結構體中的連結串列
 6     struct epoll_filefd ffd;         //每個fd生成的一個結構
 7     int nwait;                 
 8     struct list_head pwqlist;     //poll等待佇列
 9     struct eventpoll *ep;          //該項屬於哪個主結構體
10     struct list_head fllink;         //連結fd對應的file連結串列
11     struct epoll_event event;  //註冊的感興趣的事件,也就是使用者空間的epoll_event
12  }
 1 struct eventpoll
 2 {
 3     spin_lock_t lock;            //對本資料結構的訪問
 4     struct mutex mtx;            //防止使用時被刪除
 5     wait_queue_head_t wq;        //sys_epoll_wait() 使用的等待佇列
 6     wait_queue_head_t poll_wait; //file->poll()使用的等待佇列
 7     struct list_head rdllist;    //事件滿足條件的連結串列
 8     struct rb_root rbr;          //用於管理所有fd的紅黑樹
 9     struct epitem *ovflist;      //將事件到達的fd進行連結起來發送至使用者空間
10 }
  epoll_wait的工作流程:
  1. epoll_wait呼叫ep_poll,當rdlist為空(無就緒fd)時掛起當前程序,直到rdlist不空時程序才被喚醒。

  2. 檔案fd狀態改變(buffer由不可讀變為可讀或由不可寫變為可寫),導致相應fd上的回撥函式ep_poll_callback()被呼叫。

  3. ep_poll_callback將相應fd對應epitem加入rdlist,導致rdlist不空,程序被喚醒,epoll_wait得以繼續執行。

  4. ep_events_transfer函式將rdlist中的epitem拷貝到txlist中,並將rdlist清空。

  5. ep_send_events函式(很關鍵),它掃描txlist中的每個epitem,呼叫其關聯fd對用的poll方法。此時對poll的呼叫僅僅是取得fd上較新的events(防止之前events被更新),之後將取得的events和相應的fd傳送到使用者空間(封裝在struct epoll_event,從epoll_wait返回)。

有了這些理論基礎,我們開始實現一個小例子:

https://www.cnblogs.com/lojunren/p/3856290.html

這篇部落格中的例項很有參考價值。

程式碼:摘自:https://github.com/shineyr/Socket/tree/master/epoll_socket

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>


#ifndef CONNECT_SIZE
#define CONNECT_SIZE 256
#endif

#define PORT 7777
#define MAX_LINE 2048
#define LISTENQ 20

void setNonblocking(int sockfd)
{
    int opts;
    opts=fcntl(sockfd,F_GETFL);
    if(opts<0)
    {
        perror("fcntl(sock,GETFL)");
        return;
    }//if

    opts = opts|O_NONBLOCK;
    if(fcntl(sockfd,F_SETFL,opts)<0)
    {
        perror("fcntl(sock,SETFL,opts)");
        return;
    }//if
}

int main(int argc , char **argv)
{
    int i, listenfd, connfd, sockfd, epfd, nfds;

    ssize_t n, ret;

    char buf[MAX_LINE];

    socklen_t clilen;

    struct sockaddr_in servaddr , cliaddr;

    /*宣告epoll_event結構體變數,ev用於註冊事件,陣列用於回傳要處理的事件*/
    struct epoll_event ev, events[20];

    /*(1) 得到監聽描述符*/
    listenfd = socket(AF_INET , SOCK_STREAM , 0);
    setNonblocking(listenfd);

    /*生成用於處理accept的epoll專用檔案描述符*/   
    epfd = epoll_create(CONNECT_SIZE);
    /*設定監聽描述符*/
    ev.data.fd = listenfd;
    /*設定處理事件型別*/
    ev.events = EPOLLIN | EPOLLET;
    /*註冊事件*/
    epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);      

    /*(2) 繫結套接字*/   
    bzero(&servaddr , sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);

    bind(listenfd , (struct sockaddr *)&servaddr , sizeof(servaddr));

    /*(3) 監聽*/
    listen(listenfd , LISTENQ);

    /*(4) 進入伺服器接收請求死迴圈*/
    while(1)
    {
        /*等待事件發生*/
        nfds = epoll_wait(epfd , events , CONNECT_SIZE , -1);
        if(nfds <= 0)
            continue;

        printf("nfds = %d\n" , nfds);
        /*處理髮生的事件*/
        for(i=0 ; i<nfds ; ++i)
        {
            /*檢測到使用者連結*/
            if(events[i].data.fd == listenfd)
            {   
                /*接收客戶端的請求*/
                clilen = sizeof(cliaddr);

                if((connfd = accept(listenfd , (struct sockaddr *)&cliaddr , &clilen)) < 0)
                {
                    perror("accept error.\n");
                    exit(1);
                }//if       

                printf("accpet a new client: %s:%d\n", inet_ntoa(cliaddr.sin_addr) , cliaddr.sin_port);

                /*設定為非阻塞*/
                setNonblocking(connfd);
                ev.data.fd = connfd;
                ev.events = EPOLLIN | EPOLLET;
                epoll_ctl(epfd , EPOLL_CTL_ADD , connfd , &ev);
            }//if
            /*如果是已連結使用者,並且收到資料,進行讀入*/
            else if(events[i].events & EPOLLIN){

                if((sockfd = events[i].data.fd) < 0)
                    continue;
                bzero(buf , MAX_LINE);
                printf("reading the socket~~~\n");
                if((n = read(sockfd , buf , MAX_LINE)) <= 0)
                {
                    close(sockfd);
                    events[i].data.fd = -1;
                }//if
                else{
                    buf[n] = '\0';
                    printf("clint[%d] send message: %s\n", i , buf);

                    /*設定用於註冊寫操作檔案描述符和事件*/
                    ev.data.fd = sockfd;
                    ev.events = EPOLLOUT| EPOLLET;  
                    epoll_ctl(epfd , EPOLL_CTL_MOD , sockfd , &ev);         
                }//else                                         
            }//else
            else if(events[i].events & EPOLLOUT)
            {
                if((sockfd = events[i].data.fd) < 0)
                continue;
                if((ret = write(sockfd , buf , n)) != n)    
                {
                    printf("error writing to the sockfd!\n");
                    break;
                }//if
                /*設定用於讀的檔案描述符和事件*/
                ev.data.fd = sockfd;
                ev.events = EPOLLIN | EPOLLET;
                /*修改*/
                epoll_ctl(epfd , EPOLL_CTL_MOD , sockfd , &ev);             
            }//else
        }//for
    }//while
    free(events);
    close(epfd);
    exit(0);
}

cli.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <time.h>
#include <sys/socket.h>
#include <sys/select.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <fcntl.h>

#define PORT 7777
#define MAX_LINE 2048

int max(int a , int b)
{
    return a > b ? a : b;
}

/*readline函式實現*/
ssize_t readline(int fd, char *vptr, size_t maxlen)
{
    ssize_t n, rc;
    char    c, *ptr;

    ptr = vptr;
    for (n = 1; n < maxlen; n++) {
        if ( (rc = read(fd, &c,1)) == 1) {
            *ptr++ = c;
            if (c == '\n')
                break;  /* newline is stored, like fgets() */
        } else if (rc == 0) {
            *ptr = 0;
            return(n - 1);  /* EOF, n - 1 bytes were read */
        } else
            return(-1);     /* error, errno set by read() */
    }

    *ptr = 0;   /* null terminate like fgets() */
    return(n);
}

/*普通客戶端訊息處理函式*/
void str_cli(int sockfd)
{
    /*傳送和接收緩衝區*/
    char sendline[MAX_LINE] , recvline[MAX_LINE];
    while(fgets(sendline , MAX_LINE , stdin) != NULL)   
    {
        write(sockfd , sendline , strlen(sendline));

        bzero(recvline , MAX_LINE);
        if(readline(sockfd , recvline , MAX_LINE) == 0)
        {
            perror("server terminated prematurely");
            exit(1);
        }//if

        if(fputs(recvline , stdout) == EOF)
        {
            perror("fputs error");
            exit(1);
        }//if

        bzero(sendline , MAX_LINE);
    }//while
}

int main(int argc , char **argv)
{
    /*宣告套接字和連結伺服器地址*/
    int sockfd;
    struct sockaddr_in servaddr;

    /*判斷是否為合法輸入*/
    if(argc != 2)
    {
        perror("usage:tcpcli <IPaddress>");
        exit(1);
    }//if

    /*(1) 建立套接字*/
    if((sockfd = socket(AF_INET , SOCK_STREAM , 0)) == -1)
    {
        perror("socket error");
        exit(1);
    }//if

    /*(2) 設定連結伺服器地址結構*/
    bzero(&servaddr , sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port = htons(PORT);
    if(inet_pton(AF_INET , argv[1] , &servaddr.sin_addr) < 0)
    {
        printf("inet_pton error for %s\n",argv[1]);
        exit(1);
    }//if

    /*(3) 傳送連結伺服器請求*/
    if(connect(sockfd , (struct sockaddr *)&servaddr , sizeof(servaddr)) < 0)
    {
        perror("connect error");
        exit(1);
    }//if

    /*呼叫訊息處理函式*/
    str_cli(sockfd);    
    exit(0);