1. 程式人生 > >epoll:EPOLLET模式下的正確讀寫方式

epoll:EPOLLET模式下的正確讀寫方式

1.EPOLLLT和EPOLLET最大的區別在於事件的通知機制,看這個文章EPOLLLT和EPOLLET的區別

2.EPOLLET模式下並不意味著要迴圈讀取完緩衝區的所有資料,貼出一段讀取程式碼:

n = 0;
while ((nread = read(fd, buf + n, BUFSIZ-1)) > 0) {
    n += nread;
}
if (nread == -1 && errno != EAGAIN) {
    perror("read error");
}
這段程式碼的陷阱在於,如果tcp緩衝區可讀資料比buf大,那麼將會造成堆疊溢位,所以迴圈讀取緩衝區資料直到遇到EAGAIN並不是正確的做法。

3.需要注意的是,首次增加和每次修改套接字的events時,都需要指明EPOLLET,並不是ADD一次後就會永久是EPOLLET模式

    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET; /* ET模式 */
    ev.data.fd = sockfd;
    if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &ev)) {
        perror("epoll_ctl EPOLL_CTL_ADD fail");
    }
4.EPOLLLT和EPOLLET在讀取資料時的區別在於當資料沒讀取完畢,EPOLLET必須註冊可讀事件
5.EPOLLLT和EPOLLET在傳送資料時的行為是一致的,都需要應用層主動判定是否有資料可傳送

epoll_event結構體的理解

// man 2 epoll_ctl

typedef union epoll_data {
    void        *ptr;
    int          fd;
    __uint32_t   u32;
    __uint64_t   u64;
} epoll_data_t;

struct epoll_event {
    __uint32_t   events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
};


測試程式碼:

----- server.cpp -----

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>

#define MAX_EVENTS 1000
#define MAX_LEN 1024

/*
參考 man epoll_ctl
EPOLLIN 可讀
EPOLLOUT 可寫
EPOLLPRI 緊急資料
EPOLLRDHUP 出錯 
EPOLLERR 出錯
EPOLLHUP 出錯
*/

//設定非阻塞
static void setnonblocking(int sockfd) {
    int flag = fcntl(sockfd, F_GETFL, 0);
    if (flag < 0) {
        perror("fcntl F_GETFL fail");
        return;
    }
    if (fcntl(sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
        perror("fcntl F_SETFL fail");
    }
}

//新增epoll事件
static int epoll_add(int efd, int sockfd) {
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET; /* ET模式 */
    ev.data.fd = sockfd;
    if (-1 == epoll_ctl(efd, EPOLL_CTL_ADD, sockfd, &ev)) {
        perror("epoll_ctl EPOLL_CTL_ADD fail");
        return 1;
    }
    return 0;
}

//修改epoll事件,ET
static void epoll_write(int efd, int sockfd, bool enable) {
    struct epoll_event ev;
    ev.events = EPOLLIN | EPOLLET | (enable ? EPOLLOUT : 0);
    ev.data.fd = sockfd;
    epoll_ctl(efd, EPOLL_CTL_MOD, sockfd, &ev);
}

//刪除epoll事件
static void epoll_del(int efd, int sockfd) {
    perror("close by");
    epoll_ctl(efd, EPOLL_CTL_DEL, sockfd , NULL);
    close(sockfd);
}

//讀,定長
int readn(int fd, void *vptr, size_t n) {
    // see man 2 read
    size_t nleft;
    int nread;
    char *ptr;

    ptr = (char*)vptr;
    nleft = n;
    while (nleft > 0) {
        nread = (int)read(fd, ptr, nleft);
        if (nread < 0) {
            if (errno == EINTR)
                nread = 0; /* call read() again */
            else
                return -1; /* maybe errno == EAGAIN */
        } else if (0 == nread) {
            break; /* EOF */
        }
        nleft -= nread;
        ptr += nread;
    }
    return(n - nleft); /* return >= 0 */
}


//寫,定長
int writen(int fd, const void *vptr, size_t n) {
    // see man 2 write
    size_t nleft;
    int nwritten;
    const char *ptr;

    ptr = (char*)vptr;
    nleft = n;
    while (nleft > 0) {
        nwritten = write(fd, ptr, nleft);
        if (nwritten <= 0) {
            if (nwritten < 0 && errno == EINTR)
                nwritten = 0; /* call write() again */
            else
                return(-1); /* error */
        }
        nleft -= nwritten;
        ptr += nwritten;
    }
    return(n);
}

//是否有資料要傳送
bool ifsend() {
    return true;
}

int main()
{
    // socket
    int listenfd;
    struct sockaddr_in servaddr;
    short port = 9527;
  
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(port);

    listenfd = socket(AF_INET, SOCK_STREAM, 0);
    setnonblocking(listenfd); //accept呼叫時非阻塞
    int res = bind(listenfd, (sockaddr *)&servaddr, sizeof(servaddr));
    if (0 == res)
        printf("server bind success, 0.0.0.0:%d\n", port);
    else {
        perror("bind fail");
        exit(EXIT_FAILURE);
    }
    res = listen(listenfd, 100);
    if (0 == res)
        printf("server listen success\n");
    else {
        perror("listen fail");
        exit(EXIT_FAILURE);
    }


    // epoll create,see man epoll_create
    struct epoll_event ev, events[MAX_EVENTS];
    int epollfd = epoll_create(10);
    if (-1 == epollfd) {
        perror("epoll_create fail");
        exit(EXIT_FAILURE);
    }

    // epoll add
    if (epoll_add(epollfd, listenfd)) {
        exit(EXIT_FAILURE);
    }

    for (;;) {
        int nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
        if (-1 == nfds) {
            perror("epoll_wait fail");
            exit(EXIT_FAILURE);
        }
        for (int n = 0; n < nfds; ++n) {
            if (events[n].data.fd == listenfd) {
                struct sockaddr_in cliaddr;
                socklen_t len = sizeof(cliaddr);
                int connfd = accept(listenfd, (sockaddr *)&cliaddr, &len);
                if (-1 == connfd) {
                    perror("accept fail");
                    continue;
                }
                setnonblocking(connfd);
                if (epoll_add(epollfd, connfd)) {
                    close(connfd);
                    continue;
                }

                // 這裡進行一些處理,比如解析cliaddr
                char buff[INET_ADDRSTRLEN + 1] = {0};
                inet_ntop(AF_INET, &cliaddr.sin_addr, buff, INET_ADDRSTRLEN);
                uint16_t port = ntohs(cliaddr.sin_port);
                printf("connection from %s, port %d\n", buff, port);

            } else if (events[n].events & EPOLLIN) {
                char buffer[MAX_LEN + 1] = {0}; /* keep the end '\0' */
                size_t count = MAX_LEN;
                int connfd = events[n].data.fd;
                int res = readn(connfd, buffer, count);

                // 處理網路異常情況
                if (res < 0 && errno != EAGAIN) {
                    epoll_del(epollfd, connfd);
                    continue;
                } else if (0 == res) {
                    epoll_del(epollfd, connfd);
                    continue;
                }

                /* (res == -1 && errno == EAGAIN) 時資料讀完 */

                if (res > 0 && ifsend()) /* 資料未讀完,有資料傳送 */
                    epoll_write(epollfd, connfd, true);
                else if (res > 0) /* 資料未讀完,沒有資料傳送 */
                    epoll_write(epollfd, connfd, false);
                else if (ifsend()) /* 資料讀完,有資料傳送 */
                    epoll_write(epollfd, connfd, true);

                if (strlen(buffer) != 0) {
                    printf("fd:%d, read len:%ld\n", connfd, strlen(buffer));
                }


            } else if (events[n].events & EPOLLOUT) {
                const char* vptr = "hi client!"; /* 偽造的傳送資料 */
                int connfd = events[n].data.fd;
                size_t count = strlen(vptr);
                int len = writen(connfd, vptr, count);

                // 處理網路異常情況
                if (len < 0 && errno != EAGAIN) {
                    epoll_del(epollfd, connfd);
                    continue;
                } 

                if (len > 0) {
                    printf("fd:%d, write len:%d\n", connfd, len);
                }

                if (len < count) {
                    epoll_write(epollfd, connfd, true); /* 還有可寫資料,註冊EPOLLOUT */
                } else {
                    epoll_write(epollfd, connfd, false); /* 已經沒有可寫資料,註冊EPOLLIN */
                }

            } else if (events[n].events & EPOLLPRI) {
                // pass
            } else if (events[n].events & EPOLLRDHUP) {
                epoll_del(epollfd, events[n].data.fd);
            } else if (events[n].events & EPOLLERR) {
                epoll_del(epollfd, events[n].data.fd);
            } else if (events[n].events & EPOLLHUP) {
                epoll_del(epollfd, events[n].data.fd);
            } else {
                // pass
            }
        }
    }

}
----- client.cpp -----
#include <stdio.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/epoll.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>

// 設定非阻塞
static void setnonblocking(int sockfd) {
    int flag = fcntl(sockfd, F_GETFL, 0);
    if (flag < 0) {
        perror("fcntl F_GETFL fail");
        return;
    }
    if (fcntl(sockfd, F_SETFL, flag | O_NONBLOCK) < 0) {
        perror("fcntl F_SETFL fail");
    }
}

//讀,定長
int readn(int fd, void *vptr, size_t n) {
    /* see man 2 read */
    size_t nleft;
    int nread;
    char *ptr;

    ptr = (char*)vptr;
    nleft = n;
    while (nleft > 0) {
        nread = (int)read(fd, ptr, nleft);
        if (nread < 0) {
            if (errno == EINTR)
                nread = 0; /* call read() again */
            else
                return -1; /* maybe errno == EAGAIN */
        } else if (0 == nread) {
            break; /* EOF */
        }
        nleft -= nread;
        ptr += nread;
    }
    return(n - nleft); /* return >= 0 */
}


//寫,定長
int writen(int fd, const void *vptr, size_t n) {
    /* see man 2 write */
    size_t nleft;
    int nwritten;
    const char *ptr;

    ptr = (char*)vptr;
    nleft = n;
    while (nleft > 0) {
        nwritten = write(fd, ptr, nleft);
        if (nwritten <= 0) {
            if (nwritten < 0 && errno == EINTR)
                nwritten = 0; /* call write() again */
            else
                return(-1); /* error */
        }
        nleft -= nwritten;
        ptr += nwritten;
    }
    return(n);
}

int main()  
{  
    // socket
    struct sockaddr_in servaddr;  
    short port = 9527;  
    int sockfd = socket(AF_INET, SOCK_STREAM, 0);  
    servaddr.sin_family = AF_INET;  
    servaddr.sin_addr.s_addr = inet_addr("127.0.0.1");  
    servaddr.sin_port = htons(port);
    setnonblocking(sockfd);
    if (connect(sockfd, (sockaddr *) &servaddr, sizeof(sockaddr_in)) < 0) {
        if (errno != EINPROGRESS) {
            perror("connect fail");
            close(sockfd);
            exit(EXIT_FAILURE);
        }
    }

    const char* sendbuf = "hello server!";
    for (;;) {

        // write
        int len = writen(sockfd, sendbuf, strlen(sendbuf));
        if (len < 0 && errno != EAGAIN) {
            break;
        }
        if (len > 0) {
            printf("fd:%d, write len:%d\n", sockfd, len);
        }

        // read
        char recvbuf[1024+1] = {0};
        int res = readn(sockfd, recvbuf, 1024);
        if (res < 0 && errno != EAGAIN) {
            break;
        } else if (0 == res) {
            break;
        }

        if (strlen(recvbuf) > 0) {
            printf("fd:%d, read len:%ld\n", sockfd, strlen(recvbuf));
        }

        sleep(1);
    }
    perror("close by");
    close(sockfd);
}