1. 程式人生 > >epoll的水平觸發和邊緣觸發,以及邊緣觸發為什麽要使用非阻塞IO

epoll的水平觸發和邊緣觸發,以及邊緣觸發為什麽要使用非阻塞IO

alt 開啟 本機 另一個 trigger stdio.h 什麽 我們 水平

轉自:http://www.cnblogs.com/yuuyuu/p/5103744.html

一.基本概念

我們通俗一點講:

Level_triggered(水平觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據一次性全部讀寫完(如讀寫緩沖區太小),那麽下次調用 epoll_wait()時,它還會通知你在上沒讀寫完的文件描述符上繼續讀寫,當然如果你一直不去讀寫,它會一直通知你!!!如果系統中有大量你不需要讀寫的就緒文件描述符,而它們每次都會返回,這樣會大大降低處理程序檢索自己關心的就緒文件描述符的效率!!!

Edge_triggered(邊緣觸發):當被監控的文件描述符上有可讀寫事件發生時,epoll_wait()會通知處理程序去讀寫。如果這次沒有把數據全部讀寫完(如讀寫緩沖區太小),那麽下次調用epoll_wait()時,它不會通知你,也就是它只會通知你一次,直到該文件描述符上出現第二次可讀寫事件才會通知你!!!這種模式比水平觸發效率高,系統不會充斥大量你不關心的就緒文件描述符!!!

阻塞IO:當你去讀一個阻塞的文件描述符時,如果在該文件描述符上沒有數據可讀,那麽它會一直阻塞(通俗一點就是一直卡在調用函數那裏),直到有數據可讀。當你去寫一個阻塞的文件描述符時,如果在該文件描述符上沒有空間(通常是緩沖區)可寫,那麽它會一直阻塞,直到有空間可寫。以上的讀和寫我們統一指在某個文件描述符進行的操作,不單單指真正的讀數據,寫數據,還包括接收連接accept(),發起連接connect()等操作...

非阻塞IO:當你去讀寫一個非阻塞的文件描述符時,不管可不可以讀寫,它都會立即返回,返回成功說明讀寫操作完成了,返回失敗會設置相應errno狀態碼,根據這個errno可以進一步執行其他處理。它不會像阻塞IO那樣,卡在那裏不動!!!

二.幾種IO模型的觸發方式

select(),poll()模型都是水平觸發模式,信號驅動IO是邊緣觸發模式,epoll()模型即支持水平觸發,也支持邊緣觸發,默認是水平觸發。

這裏我們要探討epoll()的水平觸發和邊緣觸發,以及阻塞IO和非阻塞IO對它們的影響!!!下面稱水平觸發為LT,邊緣觸發為ET。

對於監聽的socket文件描述符我們用sockfd代替,對於accept()返回的文件描述符(即要讀寫的文件描述符)用connfd代替。

我們來驗證以下幾個內容:

1.水平觸發的非阻塞sockfd

2.邊緣觸發的非阻塞sockfd

3.水平觸發的阻塞connfd

4.水平觸發的非阻塞connfd

5.邊緣觸發的阻塞connfd

6.邊緣觸發的非阻塞connfd

以上沒有驗證阻塞的sockfd,因為epoll_wait()返回必定是已就緒的連接,設不設置阻塞accept()都會立即返回。例外:UNP裏面有個例子,在BSD上,使用select()模型。設置阻塞的監聽sockfd時,當客戶端發起連接請求,由於服務器繁忙沒有來得及accept(),此時客戶端自己又斷開,當服務器到達accept()時,會出現阻塞。本機測試epoll()模型沒有出現這種情況,我們就暫且忽略這種情況!!!

三.驗證代碼

文件名:epoll_lt_et.c

/* 
 *url:http://www.cnblogs.com/yuuyuu/p/5103744.html
 *
 */

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <fcntl.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/epoll.h>

/* 最大緩存區大小 */
#define MAX_BUFFER_SIZE 5
/* epoll最大監聽數 */
#define MAX_EPOLL_EVENTS 20
/* LT模式 */
#define EPOLL_LT 0
/* ET模式 */
#define EPOLL_ET 1
/* 文件描述符設置阻塞 */
#define FD_BLOCK 0
/* 文件描述符設置非阻塞 */
#define FD_NONBLOCK 1

/* 設置文件為非阻塞 */
int set_nonblock(int fd)
{
    int old_flags = fcntl(fd, F_GETFL);
    fcntl(fd, F_SETFL, old_flags | O_NONBLOCK);
    return old_flags;
}

/* 註冊文件描述符到epoll,並設置其事件為EPOLLIN(可讀事件) */
void addfd_to_epoll(int epoll_fd, int fd, int epoll_type, int block_type)
{
    struct epoll_event ep_event;
    ep_event.data.fd = fd;
    ep_event.events = EPOLLIN;

    /* 如果是ET模式,設置EPOLLET */
    if (epoll_type == EPOLL_ET)
        ep_event.events |= EPOLLET;

    /* 設置是否阻塞 */
    if (block_type == FD_NONBLOCK)
        set_nonblock(fd);

    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &ep_event);
}

/* LT處理流程 */
void epoll_lt(int sockfd)
{
    char buffer[MAX_BUFFER_SIZE];
    int ret;

    memset(buffer, 0, MAX_BUFFER_SIZE);
    printf("開始recv()...\n");
    ret = recv(sockfd, buffer, MAX_BUFFER_SIZE, 0);
    printf("ret = %d\n", ret);
    if (ret > 0)
        printf("收到消息:%s, 共%d個字節\n", buffer, ret);
    else
    {
        if (ret == 0)
            printf("客戶端主動關閉!!!\n");
        close(sockfd);
    }

    printf("LT處理結束!!!\n");
}

/* 帶循環的ET處理流程 */
void epoll_et_loop(int sockfd)
{
    char buffer[MAX_BUFFER_SIZE];
    int ret;

    printf("帶循環的ET讀取數據開始...\n");
    while (1)
    {
        memset(buffer, 0, MAX_BUFFER_SIZE);
        ret = recv(sockfd, buffer, MAX_BUFFER_SIZE, 0);
        if (ret == -1)
        {
            if (errno == EAGAIN || errno == EWOULDBLOCK)
            {
                printf("循環讀完所有數據!!!\n");
                break;
            }
            close(sockfd);
            break;
        }
        else if (ret == 0)
        {
            printf("客戶端主動關閉請求!!!\n");
            close(sockfd);
            break;
        }
        else
            printf("收到消息:%s, 共%d個字節\n", buffer, ret);
    }
    printf("帶循環的ET處理結束!!!\n");
}


/* 不帶循環的ET處理流程,比epoll_et_loop少了一個while循環 */
void epoll_et_nonloop(int sockfd)
{
    char buffer[MAX_BUFFER_SIZE];
    int ret;

    printf("不帶循環的ET模式開始讀取數據...\n");
    memset(buffer, 0, MAX_BUFFER_SIZE);
    ret = recv(sockfd, buffer, MAX_BUFFER_SIZE, 0);
    if (ret > 0)
    {
        printf("收到消息:%s, 共%d個字節\n", buffer, ret);
    }
    else
    {
        if (ret == 0)
            printf("客戶端主動關閉連接!!!\n");
        close(sockfd);
    }

    printf("不帶循環的ET模式處理結束!!!\n");
}

/* 處理epoll的返回結果 */
void epoll_process(int epollfd, struct epoll_event *events, int number, int sockfd, int epoll_type, int block_type)
{
    struct sockaddr_in client_addr;
    socklen_t client_addrlen;
    int newfd, connfd;
    int i;

    for (i = 0; i < number; i++)
    {
        newfd = events[i].data.fd;
        if (newfd == sockfd)
        {
            printf("=================================新一輪accept()===================================\n");
            printf("accept()開始...\n");

            /* 休眠3秒,模擬一個繁忙的服務器,不能立即處理accept連接 */
            printf("開始休眠3秒...\n");
            sleep(3);
            printf("休眠3秒結束!!!\n");

            client_addrlen = sizeof(client_addr);
            connfd = accept(sockfd, (struct sockaddr *)&client_addr, &client_addrlen);
            printf("connfd = %d\n", connfd);

            /* 註冊已鏈接的socket到epoll,並設置是LT還是ET,是阻塞還是非阻塞 */
            addfd_to_epoll(epollfd, connfd, epoll_type, block_type);
            printf("accept()結束!!!\n");
        }
        else if (events[i].events & EPOLLIN)
        {
            /* 可讀事件處理流程 */

            if (epoll_type == EPOLL_LT)    
            {
                printf("============================>水平觸發開始...\n");
                epoll_lt(newfd);
            }
            else if (epoll_type == EPOLL_ET)
            {
                printf("============================>邊緣觸發開始...\n");

                /* 帶循環的ET模式 */
                epoll_et_loop(newfd);

                /* 不帶循環的ET模式 */
                //epoll_et_nonloop(newfd);
            }
        }
        else
            printf("其他事件發生...\n");
    }
}

/* 出錯處理 */
void err_exit(char *msg)
{
    perror(msg);
    exit(1);
}

/* 創建socket */
int create_socket(const char *ip, const int port_number)
{
    struct sockaddr_in server_addr;
    int sockfd, reuse = 1;

    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_port = htons(port_number);

    if (inet_pton(PF_INET, ip, &server_addr.sin_addr) == -1)
        err_exit("inet_pton() error");

    if ((sockfd = socket(PF_INET, SOCK_STREAM, 0)) == -1)
        err_exit("socket() error");

    /* 設置復用socket地址 */
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) == -1)
        err_exit("setsockopt() error");

    if (bind(sockfd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
        err_exit("bind() error");

    if (listen(sockfd, 5) == -1)
        err_exit("listen() error");

    return sockfd;
}

/* main函數 */
int main(int argc, const char *argv[])
{
    if (argc < 3)
    {
        fprintf(stderr, "usage:%s ip_address port_number\n", argv[0]);
        exit(1);
    }

    int sockfd, epollfd, number;

    sockfd = create_socket(argv[1], atoi(argv[2]));
    struct epoll_event events[MAX_EPOLL_EVENTS];

    /* linux內核2.6.27版的新函數,和epoll_create(int size)一樣的功能,並去掉了無用的size參數 */
    if ((epollfd = epoll_create1(0)) == -1)
        err_exit("epoll_create1() error");

    /* 以下設置是針對監聽的sockfd,當epoll_wait返回時,必定有事件發生,
     * 所以這裏我們忽略罕見的情況外設置阻塞IO沒意義,我們設置為非阻塞IO */

    /* sockfd:非阻塞的LT模式 */
    addfd_to_epoll(epollfd, sockfd, EPOLL_LT, FD_NONBLOCK);

    /* sockfd:非阻塞的ET模式 */
    //addfd_to_epoll(epollfd, sockfd, EPOLL_ET, FD_NONBLOCK);

   
    while (1)
    {
        number = epoll_wait(epollfd, events, MAX_EPOLL_EVENTS, -1);
        if (number == -1)
            err_exit("epoll_wait() error");
        else
        {
            /* 以下的LT,ET,以及是否阻塞都是是針對accept()函數返回的文件描述符,即函數裏面的connfd */

            /* connfd:阻塞的LT模式 */
            epoll_process(epollfd, events, number, sockfd, EPOLL_LT, FD_BLOCK);

            /* connfd:非阻塞的LT模式 */
            //epoll_process(epollfd, events, number, sockfd, EPOLL_LT, FD_NONBLOCK);

            /* connfd:阻塞的ET模式 */
            //epoll_process(epollfd, events, number, sockfd, EPOLL_ET, FD_BLOCK);

            /* connfd:非阻塞的ET模式 */
            //epoll_process(epollfd, events, number, sockfd, EPOLL_ET, FD_NONBLOCK);
        }
    }

    close(sockfd);
    return 0;
}

  

四.驗證

1.驗證水平觸發的非阻塞sockfd,關鍵代碼在247行。編譯運行

技術分享

代碼裏面休眠了3秒,模擬繁忙服務器不能很快處理accept()請求。這裏,我們開另一個終端快速用5個連接連到服務器:

技術分享

我們再看看服務器的反映,可以看到5個終端連接都處理完成了,返回的新connfd依次為5,6,7,8,9:

技術分享

上面測試完畢後,我們批量kill掉那5個客戶端,方便後面的測試:

1 $:for i in {1..5};do kill %$i;done

 2. 邊緣觸發的非阻塞sockfd,我們註釋掉247行的代碼,放開250行的代碼。編譯運行後,用同樣的方法,快速創建5個客戶端連接,或者測試5個後再測試10個。再看服務器的反映,5個客戶端只處理了2個。說明高並發時,會出現客戶端連接不上的問題:

技術分享

3.水平觸發的阻塞connfd,我們先把sockfd改回到水平觸發,註釋250行的代碼,放開247行。重點代碼在263行。

編譯運行後,用一個客戶端連接,並發送1-9這幾個數:

技術分享

再看服務器的反映,可以看到水平觸發觸發了2次。因為我們代碼裏面設置的緩沖區是5字節,處理代碼一次接收不完,水平觸發一直觸發,直到數據全部讀取完畢:

技術分享

4.水平觸發的非阻塞connfd。註釋263行的代碼,放開266行的代碼。同上面那樣測試,我們可以看到服務器反饋的消息跟上面測試一樣。這裏我就不再截圖。

5.邊緣觸發的阻塞connfd,註釋其他測試代碼,放開269行的代碼。先測試不帶循環的ET模式(即不循環讀取數據,跟水平觸發讀取一樣),註釋178行的代碼,放開181行的代碼。

編譯運行後,開啟一個客戶端連接,並發送1-9這幾個數字,再看看服務器的反映,可以看到邊緣觸發只觸發了一次,只讀取了5個字節:

技術分享

我們繼續在剛才的客戶端發送一個字符a,告訴epoll_wait(),有新的可讀事件發生:

技術分享

再看看服務器,服務器又觸發了一次新的邊緣觸發,並繼續讀取上次沒讀完的6789加一個回車符:

技術分享

這個時候,如果繼續在剛剛的客戶端再發送一個a,客戶端這個時候就會讀取上次沒讀完的a加上次的回車符,2個字節,還剩3個字節的緩沖區就可以讀取本次的a加本次的回車符共4個字節:

技術分享

我們可以看到,阻塞的邊緣觸發,如果不一次性讀取一個事件上的數據,會幹擾下一個事件!!!

接下來,我們就一次性讀取數據,即帶循環的ET模式。註意:我們這裏測試的還是邊緣觸發的阻塞connfd,只是換個讀取數據的方式。

註釋181行代碼,放開178的代碼。編譯運行,依然用一個客戶端連接,發送1-9。看看服務器,可以看到數據全部讀取完畢:

技術分享

細心的朋友肯定發現了問題,程序沒有輸出"帶循環的ET處理結束",是因為程序一直卡在了88行的recv()函數上,因為是阻塞IO,如果沒數據可讀,它會一直等在那裏,直到有數據可讀。如果這個時候,用另一個客戶端去連接,服務器不能受理這個新的客戶端!!!

6.邊緣觸發的非阻塞connfd,不帶循環的ET測試同上面一樣,數據不會讀取完。這裏我們就只需要測試帶循環的ET處理,即正規的邊緣觸發用法。註釋其他測試代碼,放開272行代碼。編譯運行,用一個客戶端連接,並發送1-9。再觀測服務器的反映,可以看到數據全部讀取完畢,處理函數也退出了,因為非阻塞IO如果沒有數據可讀時,會立即返回,並設置error,這裏我們根據EAGAIN和EWOULDBLOCK來判斷數據全部讀取完畢了,可以退出循環了:

技術分享

這個時候,我們用另一個客戶端去連接,服務器依然可以正常接收請求:

技術分享

五.總結

1.對於監聽的sockfd,最好使用水平觸發模式,邊緣觸發模式會導致高並發情況下,有的客戶端會連接不上。如果非要使用邊緣觸發,網上有的方案是用while來循環accept()。

2.對於讀寫的connfd,水平觸發模式下,阻塞和非阻塞效果都一樣,不過為了防止特殊情況,還是建議設置非阻塞。

3.對於讀寫的connfd,邊緣觸發模式下,必須使用非阻塞IO,並要一次性全部讀寫完數據。

轉自:http://www.cnblogs.com/yuuyuu/p/5103744.html

epoll的水平觸發和邊緣觸發,以及邊緣觸發為什麽要使用非阻塞IO