1. 程式人生 > >I/O多路複用之 epoll 系統呼叫

I/O多路複用之 epoll 系統呼叫

I/O多路複用除了之前我們提到的selectpoll外,epoll 也可以檢查多個檔案描述符的就緒狀態,以達到I/O多路複用的目的。
epoll 系統呼叫是 Linux 系統專有的,在 Linux 核心 2.6 版本新增,epoll 的主要優點有:

  • 當檢查大量的檔案描述符時,epoll 的效能比selectpoll高很多
  • epoll 既支援水平觸發也支援邊緣觸發,selectpoll只支援水平觸發

epoll 程式設計介面的核心資料結構為 epoll 例項,它和一個開啟的檔案描述符相關聯。這個檔案描述符是核心資料結構的控制代碼,該核心資料結構的作用主要有兩個:

  • 記錄在程序中宣告過的感興趣的檔案描述符列表,即 interest list
  • 維護處於I/O就緒狀態中檔案描述符列表,即 ready list

其中,ready list 是 interest list 的子集。

epoll 程式設計介面由以下3個系統呼叫組成:

  • epoll_create建立一個 epoll 例項,返回程式碼該例項的檔案描述符
  • epoll_ctl增刪改 epoll 例項的 interest list
  • epoll_wait返回與 epoll 例項相關聯的就緒列表中的成員

建立 epoll 例項: epoll_create

系統呼叫epoll_create建立一個新的 epoll 例項,其對應的 interest list 初始化為空。

#include <sys/epoll.h>
int epoll_create(int size);

引數size指定了我們想要通過 epoll 例項來檢查的檔案描述符個數,該引數並不是一個上限,而是告訴核心應該如何為內部資料結構劃分初始大小。epoll_create返回新建立 epoll 例項的檔案描述符,這個檔案描述符在其他幾個 epoll 系統呼叫中會被用來表示 epoll 例項。當這個檔案描述符不再使用時,應該通過close來關閉。

從 Linux 2.6.27 版核心以來,Linux 支援了一個新的系統呼叫 epoll_create1。該系統呼叫執行的任務同epoll_create

,但是去掉了無用的引數size,並增加了一個可用來修改系統呼叫行為的flag標誌。

修改 epoll 例項: epoll_ctl

系統呼叫epoll_ctl能夠修改由檔案描述符epfd所代表的 epoll 例項中的 interest list。

#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
  • 引數epfd指定 epoll 例項的檔案描述符,即對哪個 epoll 例項進行操作
  • 引數fd指明要修改 interest list 中的哪一個檔案描述符。
  • 引數op用來指定需要執行的操作,下文我們還會對op操作型別進行進一步描述
  • 引數ev是指向結構體epoll_event的指標,關於結構體epoll_event的定義,我們也在下文描述

epoll_ctlop支援的操作包括以下以種:

  • EPOLL_CTL_ADD
    將描述符fd新增到 epoll 例項的 interest list 中去。對於fd上我們感興趣的事件,在ev所指向的結構體中指定。

  • EPOLL_CTL_MOD
    修改描述符fd上設定的事件,需用到由ev所指向的結構體中的資訊。

  • EPOLL_CTL_DEL
    將描述符fd從 epoll 例項的 interest list 中移除,該操作忽略ev引數。

上面我們多處提到了evev是指向結構體epoll_event的指標,該結構體的定義如下:

struct epoll_event {
    uint32_t events;  // epoll 事件
    epoll_data data;  // 使用者資料
};

結構體epoll_event中的data欄位的型別為epoll_data,其定義以下:

typedef union epoll_data {
    void *ptr;    // 使用者自定義資料的指標
    int fd;       // 檔案描述符
    uint32_t u32; // 32位整型
    uint64_t u64; // 64位整型
} epoll_data_t;

引數ev為檔案描述符fd所做的設定如下:

  • 結構體epoll_event中的events欄位是一個位掩碼,它指定了 epoll 例項監控的事件集合
  • data欄位是一個聯合體,當fd就緒時,聯合體的成員可用來指定傳回給呼叫程序的資訊

就緒等待: epoll_wait

系統呼叫epoll_wait返回 epoll 例項中處於就緒狀態的檔案描述符的資訊。單個epoll_wait呼叫能返回多個就緒態檔案描述符的資訊,這也正是I/O多路複用的體現。

#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
  • 引數evlist所指向的結構體陣列中返回就緒狀態檔案描述符的資訊。資料evlist的空間由呼叫者負責申請,所包含的元素個數在引數maxevents中指定。
  • 引數timeout指定epoll_wait的阻塞行為,例如timeout等於-1,呼叫將一直阻塞,走到 interest list 中的檔案描述符上有事件產生。

epoll_wait 呼叫成功後,返回資料evlist中的元素個數,即就緒的描述符個數。

例子

我們以編寫一個 TCP 伺服器為例子,說明 epoll 的用法,該 TCP 伺服器打印出所有接收到的訊息。
我們先來看建立和繫結 TCP 監聽套接字的函式。

static int
create_and_bind (char *port)
{
    struct addrinfo hints;
    struct addrinfo *result, *rp;
    int s, sfd;

    memset (&hints, 0, sizeof (struct addrinfo));
    hints.ai_family = AF_UNSPEC;     // 支援 IPv4 和 IPv6
    hints.ai_socktype = SOCK_STREAM; // TCP socket
    hints.ai_flags = AI_PASSIVE;     // 監聽套接字

    s = getaddrinfo (NULL, port, &hints, &result);
    if (s != 0)
    {
        fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
        return -1;
    }

    for (rp = result; rp != NULL; rp = rp->ai_next)
    {
        sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
        if (sfd == -1)
            continue;

        s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
        if (s == 0)
        {
            // 已成功繫結套接字
            break;
        }

        close (sfd);
    }

    if (rp == NULL)
    {
        fprintf (stderr, "Could not bind\n");
        return -1;
    }

    freeaddrinfo (result);

    return sfd;
}

create_and_bind接受port引數(表示監聽的埠),其作用是建立並繫結監聽套接字。
getaddrinfo函式既可以用於IPv4,也可以用於IPv6,能夠處理名字到地址以及服務到埠這兩種轉換,它返回addrinfo結構體陣列的指標。關於getaddrinfo詳細介紹,可以參考《UNIX網路程式設計》的有關描述。
create_and_bind返回結構體addrinfo陣列的指標(儲存在reslut指標中)接下來,我們對result進行遍歷,直到將監聽套接字成功繫結為止。

接下來,我們再來看將一個套接字設定為非阻塞套接字的函式。

static int
make_socket_non_blocking (int sfd)
{
    int flags, s;

    flags = fcntl (sfd, F_GETFL, 0);
    if (flags == -1)
    {
        perror ("fcntl");
        return -1;
    }

    flags |= O_NONBLOCK;
    s = fcntl (sfd, F_SETFL, flags);
    if (s == -1)
    {
        perror ("fcntl");
        return -1;
    }

    return 0;
}

最後我們來看下main函式的實現。

int
main (int argc, char *argv[])
{
    int sfd, s;
    int efd;
    struct epoll_event event;
    struct epoll_event *events;

    if (argc != 2)
    {
        fprintf (stderr, "Usage: %s [port]\n", argv[0]);
        exit (EXIT_FAILURE);
    }

    sfd = create_and_bind (argv[1]);
    if (sfd == -1)
        abort ();

    s = make_socket_non_blocking (sfd);
    if (s == -1)
        abort ();

    s = listen (sfd, SOMAXCONN);
    if (s == -1)
    {
        perror ("listen");
        abort ();
    }

    efd = epoll_create1 (0);
    if (efd == -1)
    {
        perror ("epoll_create");
        abort ();
    }

    event.data.fd = sfd;
    // ET 模式
    event.events = EPOLLIN | EPOLLET;
    s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
    if (s == -1)
    {
        perror ("epoll_ctl");
        abort ();
    }

    // 用來儲存epoll_wait返回的就緒檔案描述符列表
    events = calloc (MAXEVENTS, sizeof event);

    // 主迴圈
    while (1)
    {
        int n, i;

        n = epoll_wait (efd, events, MAXEVENTS, -1);
        for (i = 0; i < n; i++)
        {
            if ((events[i].events & EPOLLERR) ||
                (events[i].events & EPOLLHUP) ||
                (!(events[i].events & EPOLLIN)))
            {
                // 監測的檔案描述符出錯了
                fprintf (stderr, "epoll error\n");
                close (events[i].data.fd);
                continue;
            }

            else if (sfd == events[i].data.fd)
            {
                // 監聽套接字就緒,表明有一個或者多個連線進來
                while (1)
                {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;
                    infd = accept (sfd, &in_addr, &in_len);
                    if (infd == -1)
                    {
                        if ((errno == EAGAIN) ||
                            (errno == EWOULDBLOCK))
                        {
                            // 處理完所有的連線
                            break;
                        }
                        else
                        {
                            perror ("accept");
                            break;
                        }
                    }

                    s = getnameinfo (&in_addr, in_len,
                                     hbuf, sizeof hbuf,
                                     sbuf, sizeof sbuf,
                                     NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0)
                    {
                        printf("Accepted connection on descriptor %d "
                                       "(host=%s, port=%s)\n", infd, hbuf, sbuf);
                    }

                    // 設定已連線套接字為非阻塞,並且加入到 epoll 例項監測中
                    s = make_socket_non_blocking (infd);
                    if (s == -1)
                        abort ();

                    event.data.fd = infd;
                    // ET 模式
                    event.events = EPOLLIN | EPOLLET;
                    s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
                    if (s == -1)
                    {
                        perror ("epoll_ctl");
                        abort ();
                    }
                }
                continue;
            }
            else
            {
                // 已連線套接字可讀,我們讀取該套接字所有的資料並打印出來
                // 由於使用了 ET 模式,我們必須將所有可讀資料讀取完畢
                int done = 0;

                while (1)
                {
                    ssize_t count;
                    char buf[512];

                    count = read (events[i].data.fd, buf, sizeof buf);
                    if (count == -1)
                    {
                        // 如果 errno == EAGAIN,說明所有資料已讀取完畢
                        // 如果 errno != EAGAIN,說明讀取出錯
                        if (errno != EAGAIN)
                        {
                            // 讀取出錯
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        // 客戶端斷開了連線
                        done = 1;
                        break;
                    }

                    // 列印到標準輸出
                    s = write (1, buf, count);
                    if (s == -1)
                    {
                        perror ("write");
                        abort ();
                    }
                }

                if (done)
                {
                    printf ("Closed connection on descriptor %d\n",
                            events[i].data.fd);

                    // 關閉連線
                    close (events[i].data.fd);
                }
            }
        }
    }

    free (events);

    close (sfd);

    return EXIT_SUCCESS;
}

main函式首先呼叫create_and_bind建立並繫結監聽套接字,接下來呼叫make_socket_non_blocking設定監聽套接字為非阻塞模式,並呼叫listen系統呼叫監聽客戶端的連線請求。
接下來,我們建立了一個 epoll 例項,並將監聽套接字加入到該 epoll 例項的 interest list,當監聽套接字可讀時,說明有新的客戶端請求連線。
在主迴圈中,我們呼叫epoll_wait等待就緒事件的發生。timeout引數設定為-1說明主執行緒會一直阻塞到事件就緒。這些就緒事件包括以下型別:

  • 客戶端請求到達:當監聽套接字可讀時,說明一個或者多個客戶端連線請求到達,我們設定新的已連線套接字為非阻塞模式並新增到 epoll 例項的 interest list 中。
  • 客戶端資料可讀:已連線套接字就緒時,說明客戶端資料可讀。我們使用read每次讀出512位元組的資料,直接所有的資料讀取完畢。這是由於我們使用了 ET 模式,ET 模式對於資料可讀只會通知一次。讀出的資料通過write系統呼叫列印到標準輸出。

參考資料