1. 程式人生 > >Linux下套接字詳解(十)---epoll模式下的IO多路複用伺服器

Linux下套接字詳解(十)---epoll模式下的IO多路複用伺服器

1 epoll模型簡介

epoll可是當前在Linux下開發大規模併發網路程式的熱門人選,epoll 在Linux2.6核心中正式引入,和select相似,其實都I/O多路複用技術而已,並沒有什麼神祕的。

其實在Linux下設計併發網路程式,向來不缺少方法,比如典型的Apache模型(Process Per Connection,簡稱PPC),TPC(Thread PerConnection)模型,以及select模型和poll模型,那為何還要再引入Epoll這個東東呢?那還是有得說說的…

2 常用模型的缺點

如果不擺出來其他模型的缺點,怎麼能對比出Epoll的優點呢。

2.1 多程序PPC/多執行緒TPC模型

這兩種模型思想類似,就是讓每一個到來的連線一邊自己做事去,別再來煩我。只是PPC是為它開了一個程序,而TPC開了一個執行緒。可是別煩我是有代價的,它要時間和空間啊,連線多了之後,那麼多的程序/執行緒切換,這開銷就上來了;

因此這類模型能接受的最大連線數都不會高,一般在幾百個左右。

2.2 select模型-O(n)

多程序多執行緒的模型龐大而且繁瑣,因此我們出現了select模型

  int select(int nfds, fd_set *readfds, fd_set *writefds,
                  fd_set *exceptfds, struct
timeval *timeout); void FD_CLR(int fd, fd_set *set); int FD_ISSET(int fd, fd_set *set); void FD_SET(int fd, fd_set *set); void FD_ZERO(fd_set *set);

select系統呼叫是用來讓我們的程式監視多個檔案控制代碼(file descrīptor)的狀態變化的。通過select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。

select系統呼叫是用來讓我們的程式監視多個檔案描述符的狀態變化的。程式會停在select這裡等待,直到被監視的檔案描述符有某一個或多個發生了狀態改變。

select()的機制中提供一fd_set的資料結構,實際上是一long型別的陣列,每一個數組元素都能與一開啟的檔案控制代碼建立聯絡,建立聯絡的工作由程式設計師完成,當呼叫select()時,由核心根據IO狀態修改fd_set的內容,由此來通知執行了select()的程序哪些Socket或檔案可讀可寫。

當某些描述符可以讀寫之後,select返回資料(沒有資料讀寫時,select也會返回,因為select是同步)時就掃描一遍描述符fd_set來查詢那些有資料請求的描述符,並進行處理。時間複雜度為O(n)

因此效能比那些阻塞的多程序或者多執行緒模型效能提高不少,但是仍然不夠。因為select有很多限制

  1. 最大併發數限制,因為一個程序所開啟的FD(檔案描述符)是有限制的,由FD_SETSIZE設定(可以檢視深入解析為何select最多隻能監聽1024個),預設值是1024/2048,因此Select模型的最大併發數就被相應限制了。使用者可以自己修改FD_SETSIZE,然後重新編譯,但是其實,並不推薦這麼做

    linux 下 fd_set 是個 1024 位的點陣圖,每個位代表一個 fd 的值,返回後需要掃描點陣圖,這也是效率低的原因。效能問題且不提,正確性問題則更值得重視。

    因為這是一個 1024 位的點陣圖,因此當程序內的 fd 值 >= 1024 時,就會越界,可能會造成崩潰。對於伺服器程式,fd >= 1024 很容易達到,只要連線數 + 開啟的檔案數足夠大即可發生。

    include/linux/posix_types.h:
    
    #define __FD_SETSIZE         1024
    
  2. 效率問題,select每次呼叫都會線性掃描全部的FD集合,這樣效率就會呈現線性下降,把FD_SETSIZE改大的後果就是,大家都慢慢來,什麼?都超時了??!!

  3. 核心/使用者空間 記憶體拷貝問題,如何讓核心把FD訊息通知給使用者空間呢?在這個問題上select採取了記憶體拷貝方法。

2.3 poll模型

poll的實現和select非常相似,只是描述fd集合的方式不同,poll使用pollfd結構而不是select的fd_set結構,其他的都差不多。

他通過註冊一堆事件組,當有事件請求時返回,然後仍然需要輪詢一遍pollfd才能知道查詢到對應的檔案描述符,資料也需要在核心空間和使用者空間來回拷貝。時間複雜度為O(n)

因此他只解決了select的問題1,但是問題2,3仍然得不帶解決。

3 epoll模型

這裡寫圖片描述

3.1 epoll的效能提升

把其他模型逐個批判了一下,再來看看Epoll的改進之處吧,其實把select的缺點反過來那就是Epoll的優點了。

  1. epoll沒有最大併發連線的限制,上限是最大可以開啟檔案的數目,這個數字一般遠大於2048, 一般來說這個數目和系統記憶體關係很大,具體數目可以cat /proc/sys/fs/file-max察看。

  2. 效率提升,Epoll最大的優點就在於它只管你“活躍”的連線,而跟連線總數無關,因此在實際的網路環境中,Epoll的效率就會遠遠高於select和poll。

  3. 記憶體拷貝,Epoll在這點上使用了“共享記憶體”,這個記憶體拷貝也省略了。

3.2 如何解決上述的3個缺點

epoll既然是對select和poll的改進,就避免上述的三個缺點。那epoll都是怎麼解決的呢?

在此之前,我們先看一下epoll和select和poll的呼叫介面上的不同,select和poll都只提供了一個函式——select或者poll函式。

而epoll提供了三個函式,epoll_create,epoll_ctlepoll_wait

  • epoll_create是建立一個epoll控制代碼;

  • epoll_ctl是註冊要監聽的事件型別;

  • epoll_wait則是等待事件的產生。

3.2.1 支援一個程序開啟大數 目的socket描述符(FD)

對於第一個缺點併發數目限制

epoll沒有這個限制,它所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於2048,舉個例子,在1GB記憶體的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。

這裡寫圖片描述

select 最不能忍受的是一個程序所開啟的FD是有一定限制的,由FD_SETSIZE設定,預設值是2048。對於那些需要支援的上萬連線數目的IM伺服器來說顯 然太少了。這時候你一是可以選擇修改這個巨集然後重新編譯核心,不過資料也同時指出這樣會帶來網路效率的下降,二是可以選擇多程序的解決方案(傳統的 Apache方案),不過雖然linux上面建立程序的代價比較小,但仍舊是不可忽視的,加上程序間資料同步遠比不上執行緒間同步的高效,所以也不是一種完 美的方案。不過 epoll則沒有這個限制,它所支援的FD上限是最大可以開啟檔案的數目,這個數字一般遠大於2048,舉個例子,在1GB記憶體的機器上大約是10萬左 右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統記憶體關係很大。

3.2.2 IO效率不隨FD數目增加而線性下降

對於第二個缺點輪詢描述符的線性複雜度

epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的裝置等待佇列中,而只在epoll_ctl時把current掛一遍(這一遍必不可少)併為每個fd指定一個回撥函式,當裝置就緒,喚醒等待佇列上的等待者時,就會呼叫這個回撥函式,而這個回撥函式會把就緒的fd加入一個就緒連結串列)。epoll_wait的工作實際上就是在這個就緒連結串列中檢視有沒有就緒的f

傳統的select/poll另一個致命弱點就是當你擁有一個很大的socket集合,不過由於網路延時,任一時間只有部分的socket是”活躍”的, 但是select/poll每次呼叫都會線性掃描全部的集合,導致效率呈現線性下降。但是epoll不存在這個問題,它只會對”活躍”的socket進行 操作—這是因為在核心實現中epoll是根據每個fd上面的callback函式實現的。那麼,只有”活躍”的socket才會主動的去呼叫 callback函式,其他idle狀態socket則不會,在這點上,epoll實現了一個”偽”AIO,因為這時候推動力在os核心。在一些 benchmark中,如果所有的socket基本上都是活躍的—比如一個高速LAN環境,epoll並不比select/poll有什麼效率,相 反,如果過多使用epoll_ctl,效率相比還有稍微的下降。但是一旦使用idle connections模擬WAN環境,epoll的效率就遠在select/poll之上了。

3.2.3 使用mmap加速核心 與使用者空間的訊息傳遞

對於第三缺點資料在核心空間和使用者空間的拷貝

epoll的解決方案在epoll_ctl函式中。每次註冊新的事件到epoll控制代碼中時(在epoll_ctl中指定EPOLL_CTL_ADD),會把所有的fd拷貝進核心,而不是在epoll_wait的時候重複拷貝。epoll保證了每個fd在整個過程中只會拷貝一次。

這點實際上涉及到epoll的具體實現了。無論是select,poll還是epoll都需要核心把FD訊息通知給使用者空間,如何避免不必要的記憶體拷貝就 很重要,在這點上,epoll是通過核心於使用者空間mmap同一塊記憶體實現的。而如果你想我一樣從2.5核心就關注epoll的話,一定不會忘記手工 mmap這一步的。

3.3 總結

  1. select,poll實現需要自己不斷輪詢所有fd集合,直到裝置就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要呼叫epoll_wait不斷輪詢就緒連結串列,期間也可能多次睡眠和喚醒交替,但是它是裝置就緒時,呼叫回撥函式,把就緒fd放入就緒連結串列中,並喚醒在epoll_wait中進入睡眠的程序。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的時候只要判斷一下就緒連結串列是否為空就行了,這節省了大量的CPU時間。這就是回撥機制帶來的效能提升。

  2. select,poll每次呼叫都要把fd集合從使用者態往核心態拷貝一次,並且要把current往裝置等待佇列中掛一次,而epoll只要一次拷貝,而且把current往等待佇列上掛也只掛一次(在epoll_wait的開始,注意這裡的等待佇列並不是裝置等待佇列,只是一個epoll內部定義的等待佇列)。這也能節省不少的開銷。

4 Epoll的使用

4.1 epoll關鍵資料結構

前面提到Epoll速度快和其資料結構密不可分,其關鍵資料結構就是:

structepoll_event {

    __uint32_t events;      // Epoll events

    epoll_data_t data;      // User datavariable

};

typedef union epoll_data {

    void *ptr;

   int fd;

    __uint32_t u32;

    __uint64_t u64;

} epoll_data_t;

可見epoll_data是一個union結構體,藉助於它應用程式可以儲存很多型別的資訊:fd、指標等等。有了它,應用程式就可以直接定位目標了。

4.2 使用Epoll

首先回憶一下select模型,當有I/O事件到來時,select通知應用程式有事件到了快去處理,而應用程式必須輪詢所有的FD集合,測試每個FD是否有事件發生,並處理事件;程式碼像下面這樣:
Epoll的高效和其資料結構的設計是密不可分的,這個下面就會提到。

首先回憶一下select模型,當有I/O事件到來時,select通知應用程式有事件到了快去處理,而應用程式必須輪詢所有的FD集合,測試每個FD是否有事件發生,並處理事件;

程式碼像下面這樣:


int res = select(maxfd+1, &readfds, NULL, NULL, 120);
if(res > 0)
{

    for(int i = 0; i < MAX_CONNECTION; i++)
    {
        if(FD_ISSET(allConnection[i],&readfds))
        {
            handleEvent(allConnection[i]);
        }
    }
}
// if(res == 0) handle timeout, res < 0 handle error

epoll不僅會告訴應用程式有I/0事件到來,還會告訴應用程式相關的資訊,這些資訊是應用程式填充的,因此根據這些資訊應用程式就能直接定位到事件,而不必遍歷整個FD集合。

intres = epoll_wait(epfd, events, 20, 120);

for(int i = 0; i < res;i++)
{
    handleEvent(events[n]);
}

首先通過create_epoll(int maxfds)來建立一個epoll的控制代碼,其中maxfds為你epoll所支援的最大控制代碼數。這個函式會返回一個新的epoll控制代碼,之後的所有操作 將通過這個控制代碼來進行操作。在用完之後,記得用close()來關閉這個創建出來的epoll控制代碼。之後在你的網路主迴圈裡面,每一幀的呼叫epoll_wait(int epfd, epoll_event events, int max events, int timeout)來查詢所有的網路介面,看哪一個可以讀,哪一個可以寫了。基本的語法為:

nfds = epoll_wait(kdpfd, events, maxevents, -1);

其中kdpfd為用epoll_create建立之後的控制代碼,events是一個 epoll_event*的指標,當epoll_wait這個函式操作成功之後,epoll_events裡面將儲存所有的讀寫事件。 max_events是當前需要監聽的所有socket控制代碼數。最後一個timeout是 epoll_wait的超時,為0的時候表示馬上返回,為-1的時候表示一直等下去,直到有事件範圍,為任意正整數的時候表示等這麼長的時間,如果一直沒 有事件,則範圍。一般如果網路主迴圈是單獨的執行緒的話,可以用-1來等,這樣可以保證一些效率,如果是和主邏輯在同一個執行緒的話,則可以用0來保證主迴圈 的效率。
既然epoll相比select這麼好,那麼用起來如何呢?會不會很繁瑣啊…先看看下面的三個函式吧,就知道epoll的易用了。

intepoll_create(int size);

生成一個Epoll專用的檔案描述符,其實是申請一個核心空間,用來存放你想關注的socket fd上是否發生以及發生了什麼事件。size就是你在這個Epoll fd上能關注的最大socket fd數,大小自定,只要記憶體足夠。

int epoll_ctl(int epfd, intop, int fd, structepoll_event *event);

控制某個Epoll檔案描述符上的事件:註冊、修改、刪除。其中引數epfd是epoll_create()建立Epoll專用的檔案描述符。相對於select模型中的FD_SET和FD_CLR巨集。

int epoll_wait(int epfd,structepoll_event * events,int maxevents,int timeout);

等待I/O事件的發生,返回發生事件數;

功能類似與select函式

引數說明:

引數 描述
epfd 由epoll_create() 生成的Epoll專用的檔案描述符
epoll_event 用於回傳代處理事件的陣列
maxevents 每次能處理的事件數
timeout 等待I/O事件發生的超時值

4.3 epoll的工作模式

4.3.1 LT和ET模式

令人高興的是,2.6核心的epoll比其2.5開發版本的/dev/epoll簡潔了許多,所以,大部分情況下,強大的東西往往是簡單的。

唯一有點麻煩

epoll對檔案描述符的操作有2種模式: LT和ET

模式 名稱 設定 描述
LT Level Trigger, 電平觸發 預設 只有檔案描述符號上有未處理的讀寫事件都會通知, 只要存在著事件就會不斷的觸發,直到處理完成
ET Edge Trigger, 邊沿觸發 通過EPOLLET來設定 當且僅當讀寫事件到來時通知, 只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發


LT(level triggered)是預設的工作方式,並且同時支援block和no-block socket.在這種做法中,核心告訴你一個檔案描述符是否就緒了,然後你可以對這個就緒的fd進行IO操作。如果你不作任何操作,核心還是會繼續通知你的,所以,這種模式程式設計出錯誤可能性要小一點。傳統的select/poll都是這種模型的代表. 這種模式相當於一個效率高的poll

對於採用LT模式工作的檔案描述符, 當epoll_wait檢測到其上有事件發生並將此事件通知應用程式後, 應用程式可以不用立即處理該事件. 這樣, 當應用程式下次呼叫epoll_wait時, epoll_wait還會再次嚮應用程式通告此事件,直到該事件被處理.

ET (edge-triggered)是高速工作方式,只支援no-block socket。在這種模式下,當描述符從未就緒變為就緒時,核心通過epoll告訴你。然後它會假設你知道檔案描述符已經就緒,並且不會再為那個檔案描述 符傳送更多的就緒通知,直到你做了某些操作導致那個檔案描述符不再為就緒狀態了(比如,你在傳送,接收或者接收請求,或者傳送接收的資料少於一定量時導致 了一個EWOULDBLOCK 錯誤)。但是請注意,如果一直不對這個fd作IO操作(從而導致它再次變成未就緒),核心不會發送更多的通知(only once),不過在TCP協議中,ET模式的加速效用仍需要更多的benchmark確認。

而當往epoll核心事件表中註冊一個檔案描述符上的EPOLLET事件時, epoll將以ET模式來操作該檔案描述符, ET模式是epoll的高效工作模式. 對於採用ET模式工作的檔案描述符, 當epoll_wait檢測到其上有事件發生並將此事件通知應用程式後, 應用程式必須立即處理該事件, 因為後續的epoll_wait呼叫將不再向應用程式通知該事件.

可見, ET模式在很大程式上降低了同一個epoll事件被重複觸發的次數, 因此效率比LT模式高.

注意

每個使用ET模式的檔案描述符都應該是非阻塞的.

如果檔案描述符是阻塞的, 那麼讀寫操作將會因為沒有後續的事件而一直處於阻塞狀態(飢渴狀態)

4.3.2 示例程式碼

//  程式碼清單9-3 LT和ET模式
//  參見Linux高效能伺服器程式設計
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 10


#define DEFAULT_SERVER_PORT 6666



int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

void addfd( int epollfd, int fd, bool enable_et )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN;
    if( enable_et )
    {
        event.events |= EPOLLET;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}


/*  LT(Level Trigger, 電平觸發)
 *  相當於一個效率較高的poll
 *  對於採用LT工作模式的檔案描述符
 *  當epoll_wait檢測到其上時間發生並將此事件通知應用程式後
 *  應用程式可以不用立即處理
 *  這樣下次呼叫時, 還會再次嚮應用程式通知此時間
 */
void lt( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, false );
        }
        else if ( events[i].events & EPOLLIN )
        {
            printf( "LT-event trigger once\n" );
            memset( buf, '\0', BUFFER_SIZE );
            int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
            if( ret <= 0 )
            {
                close( sockfd );
                continue;
            }
            printf( "get %d bytes of content: %s\n", ret, buf );
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

/*  ET模式的工作流程
 *  對於採用ET模式的檔案描述符號
 *  當epoll_wait檢測到其上由事件發生並將此時間通知應用程式後
 *  應用程式應該立即處理該事件
 *  因為後續的epoll_wait不會再向應用程式通知這一事件
 */
void et( epoll_event* events, int number, int epollfd, int listenfd )
{
    char buf[ BUFFER_SIZE ];
    for ( int i = 0; i < number; i++ )
    {
        int sockfd = events[i].data.fd;
        if ( sockfd == listenfd )
        {
            struct sockaddr_in client_address;
            socklen_t client_addrlength = sizeof( client_address );
            int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );
            addfd( epollfd, connfd, true );
        }
        else if ( events[i].events & EPOLLIN )
        {
            /* 這段程式碼不會重複觸發
             * 因此我嫩迴圈讀取資料
             * 以確保把socket讀快取中的所有哦資料讀出
             */
            printf( "ET-event trigger only once\n" );
            while( 1 )
            {
                memset( buf, '\0', BUFFER_SIZE );
                int ret = recv( sockfd, buf, BUFFER_SIZE-1, 0 );
                if( ret < 0 )
                {
                    if( ( errno == EAGAIN ) || ( errno == EWOULDBLOCK ) )
                    {
                        printf( "read later\n" );
                        break;
                    }
                    close( sockfd );
                    break;
                }
                else if( ret == 0 )
                {
                    close( sockfd );
                }
                else
                {
                    printf( "get %d bytes of content: %s\n", ret, buf );
                }
            }
        }
        else
        {
            printf( "something else happened \n" );
        }
    }
}

int main( int argc, char* argv[] )
{
    int     port = DEFAULT_SERVER_PORT;
    char    *ip = NULL;

    if( argc > 3 )
    {
        printf( "usage: %s [port_number [ip_address]]\n", basename( argv[0] ) );
        return 1;
    }
    else if (argc == 2)
    {
        port = atoi( argv[1] );
    }
    else if(argc == 3)
    {
        port = atoi( argv[1] );
        ip = argv[2];
    }

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    if(ip != NULL)
    {
        inet_pton( AF_INET, ip, &address.sin_addr );
    }
    else
    {
        address.sin_addr.s_addr = AF_INET;
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(port);

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );
    addfd( epollfd, listenfd, true );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        //lt( events, ret, epollfd, listenfd );
        et( events, ret, epollfd, listenfd );
    }

    close( listenfd );
    return 0;
}

4.4 EPOLLONESHOT事件

4.4.1 EPOLLONESHOT事件簡介

在前面說過,epoll有兩種觸發的方式即LT(水平觸發)和ET(邊緣觸發)兩種,在前者,只要存在著事件就會不斷的觸發,直到處理完成,而後者只觸發一次相同事件或者說只在從非觸發到觸發兩個狀態轉換的時候兒才觸發。

這會出現一種情況,就是即使我們使用ET模式, 一個socket上的某個事件還是可能被觸發多次, 這在併發程式中會引起一個問題.

比如, 一個執行緒(或者程序)在讀取完某個socket上的資料後開始處理這些資料, 而在處理資料的過程中該socket上又有新資料可讀(EPOLLIN再次被觸發), 此時如果應用程式排程另外一個執行緒來讀取這些資料, 就會出現兩個執行緒同時操作一個socket的局面, 這會使程式的健壯性大降低而程式設計的複雜度大大增加. 這顯然不是我們所期望的.

解決這種現象有兩種方法

  • 一種是在單獨的執行緒或程序裡解析資料,也就是說,接收資料的執行緒接收到資料後立刻將資料轉移至另外的執行緒

*第二種方法就是使用EPOLLONESHOT事件. 對於註冊了EPOLLONESHOT事件的檔案描述符, 作業系統最多觸發其上註冊的一個可讀, 可寫或者異常事件, 且只觸發一次, 除非我們使用epoll_ctl和函式重置該檔案描述符上註冊的EPOLLONESHOT事件.

這樣, 當一個執行緒在處理某個socket的時候, 其他執行緒就不可能有機會操作該socket

但是反過來思考, 註冊了EPOLLONESHOT事件的socket一旦被某個執行緒處理完畢, 該執行緒就有責任立即重置這個socket上的EPOLLONESHOT事件, 以確保這個socket下一次可讀的時候, 其EPOLLIN事件能被再次觸發, 進而讓其他執行緒有機會處理這個socket.

4.4.2 EPOLLONESHOT事件示例程式

//  程式碼清單9-4 使用EPOLLONESHOT事件
//  參見Linux高效能伺服器程式設計
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <assert.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <stdlib.h>
#include <sys/epoll.h>
#include <pthread.h>

#define MAX_EVENT_NUMBER 1024
#define BUFFER_SIZE 1024

#define DEFAULT_SERVER_PORT 6666


typedef struct fds_pthread_args
{
   int epollfd;
   int sockfd;
}fds_pthread_args;

int setnonblocking( int fd )
{
    int old_option = fcntl( fd, F_GETFL );
    int new_option = old_option | O_NONBLOCK;
    fcntl( fd, F_SETFL, new_option );
    return old_option;
}

/*  將fd上的EPOLLIN和EPOLLET事件註冊到epollfd指示的epoll核心事件表中
 *  引數oneshot用來指定是否註冊fd上的EPOLLONESHOT事件
 * */
void addfd( int epollfd, int fd, bool oneshot )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET;
    /*  對於註冊了EPOLLONESHOT時間的檔案描述符
     *  作業系統最多觸發其上註冊的一個可讀, 可寫或者異常事件
     *  且只觸發一次  */
    if( oneshot )
    {
        event.events |= EPOLLONESHOT;
    }
    epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, &event );
    setnonblocking( fd );
}

/*  重置fd上的事件, 這樣操作後, 儘管fd上的EPOLLONESHOT事件被註冊後,
 *  但是作業系統仍然會觸發fd上的EPOLLIN事件
 *  且只觸發一次
 * */
void reset_oneshot( int epollfd, int fd )
{
    epoll_event event;
    event.data.fd = fd;
    event.events = EPOLLIN | EPOLLET | EPOLLONESHOT;
    epoll_ctl( epollfd, EPOLL_CTL_MOD, fd, &event );
}


/*  工作執行緒  */
void* worker( void* args )
{
    fds_pthread_args *_args = (fds_pthread_args *)args;

    int sockfd = _args->sockfd;
    int epollfd = _args->epollfd;
    printf( "start new thread to receive data on fd: %d\n", sockfd );

    char buf[ BUFFER_SIZE ];
    memset( buf, '\0', BUFFER_SIZE );

    /*  迴圈讀取sockfd上的資料, 直到遇見EAGAIN錯誤  */
    while( 1 )
    {
        int ret = recv( sockfd, buf, BUFFER_SIZE - 1, 0 );
        if( ret == 0 )
        {
            close( sockfd );
            printf( "foreiner closed the connection\n" );
            break;
        }
        else if( ret < 0 )
        {
            /*  首先我們看看recv的返回值:
             *  EAGAIN、EWOULDBLOCK、EINTR與非阻塞 長連線
             *  EWOULDBLOCK     用於非阻塞模式,不需要重新讀或者寫
             *  EINTR           指操作被中斷喚醒,需要重新讀/寫
             *  在Linux環境下開發經常會碰到很多錯誤(設定errno),
             *  其中EAGAIN是其中比較常見的一個錯誤(比如用在非阻塞操作中)
             *  從字面上來看, 是提示再試一次.
             *  這個錯誤經常出現在當應用程式進行一些非阻塞(non-blocking)操作
             *  (對檔案或socket)的時候
             *  例如,以 O_NONBLOCK的標誌開啟檔案/socket/FIFO,
             *  如果你連續做read操作而沒有資料可讀.
             *  此時程式不會阻塞起來等待資料準備就緒返回,
             *  read函式會返回一個錯誤EAGAIN,
             *  提示你的應用程式現在沒有資料可讀請稍後再試重新讀資料,
             *  對非阻塞socket而言, EAGAIN不是一種錯誤。在VxWorks和Windows上,
             *  EAGAIN的名字叫做EWOULDBLOCK
             */
            if( errno == EAGAIN )
            {
                reset_oneshot( epollfd, sockfd );
                printf( "read later\n" );
                break;
            }
        }
        else
        {
            printf( "get content: %s\n", buf );
            /*  休眠5s, 模擬資料處理過程    */
            sleep( 5 );
        }
    }
    printf( "end thread receiving data on fd: %d\n", sockfd );

    return NULL;
}

int main( int argc, char* argv[] )
{
    int port = DEFAULT_SERVER_PORT;
    char *ip = NULL;

    if( argc > 3)
    {
        printf( "usage: %s port_number ip_address\n", basename( argv[0] ) );
        return 1;
    }
    else if( argc == 2 )
    {
        port = atoi(argv[1]);
    }
    else if(argc == 3)
    {
        port = atoi(argv[1]);
        ip = argv[2];
    }

    int ret = 0;
    struct sockaddr_in address;
    bzero( &address, sizeof( address ) );
    address.sin_family = AF_INET;
    if(ip != NULL)
    {
        inet_pton( AF_INET, ip, &address.sin_addr );
    }
    else
    {
        address.sin_addr.s_addr = INADDR_ANY;
    }
    address.sin_port = htons( port );

    int listenfd = socket( PF_INET, SOCK_STREAM, 0 );
    assert( listenfd >= 0 );

    ret = bind( listenfd, ( struct sockaddr* )&address, sizeof( address ) );
    assert( ret != -1 );

    ret = listen( listenfd, 5 );
    assert( ret != -1 );

    epoll_event events[ MAX_EVENT_NUMBER ];
    int epollfd = epoll_create( 5 );
    assert( epollfd != -1 );

    /*   注意, 監聽套接字listen上不能註冊EPOLLONESHOT事件,
     *   否則應用程式只能處理一個客戶端連線
     *   因為由於EPOLLONESHOT被設定
     *   後續的客戶端連線請求將不再觸發listenfd的EPOLLIN事件
     */
    addfd( epollfd, listenfd, false );

    while( 1 )
    {
        int ret = epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );
        if ( ret < 0 )
        {
            printf( "epoll failure\n" );
            break;
        }

        for ( int i = 0; i < ret; i++ )
        {
            int sockfd = events[i].data.fd;
            if ( sockfd == listenfd )
            {
                struct sockaddr_in client_address;
                socklen_t client_addrlength = sizeof( client_address );
                int connfd = accept( listenfd, ( struct sockaddr* )&client_address, &client_addrlength );

                /*  對每個非監聽檔案描述符都註冊EPOLLONEHOT事件  */
                addfd( epollfd, connfd, true );
            }
            else if ( events[i].events & EPOLLIN )
            {
                pthread_t           thread;
                fds_pthread_args    fds_for_new_worker;

                fds_for_new_worker.epollfd = epollfd;
                fds_for_new_worker.sockfd = sockfd;

                /*  新啟動一個工作縣城為sockfd服務  */
                pthread_create( &thread, NULL, worker, ( void* )&fds_for_new_worker );
            }
            else
            {
                printf( "something else happened \n" );
            }
        }
    }

    close( listenfd );
    return 0;
}

5 參考

epoll只有epoll_create,epoll_ctl,epoll_wait 3個系統呼叫

Leader/follower模式執行緒 pool實現,以及和epoll的配合。

Epoll的高效和其資料結構的設計是密不可分的,這個下面就會提到。