1. 程式人生 > >網路程式設計-I/O複用

網路程式設計-I/O複用

## I/O模型 Unix下可用的I/O模型有五種: + 阻塞式I/O + 非阻塞式I/O + I/O複用(select和poll、epoll) + 訊號驅動式I/O(SIGIO) + 非同步I/O(POSIX的aio_系列函式) > 詳見Unix網路程式設計卷一第六章 select()和poll()在Unix系統中存在時間長,主要優勢在於可移植性,主要缺點在於當同時檢查大量的檔案描述符時效能拓展性不佳。 epoll API的關鍵優勢在於能讓應用高效地檢查大量的檔案描述符,主要缺點是專屬於Linux系統的API。 ![](https://img2020.cnblogs.com/blog/1925550/202101/1925550-20210127233923785-1996613087.png) ## I/O複用-select select()首次出現在BSD系統的套接字API中。 select()系統呼叫的用途:在一段指定的時間內,監聽使用者感興趣的檔案描述符上的可讀、可寫和異常事件。 系統呼叫select()會一直阻塞,直到一個或多個檔案描述符集合成為就緒態。 ```c #include
#include //若有就緒描述符則返回其數目,若超時則返回0,若出錯則返回-1 int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset, const struct timeval *timeout); ``` **探究下fd_set的結構** ```c /*typesizes.h*/ #define __FD_SETSIZE 1024 /*select.h*/ typedef long int __fd_mask; //long int型別共有多少bits #define __NFDBITS (8 * (int) sizeof (__fd_mask)) typedef struct { //long int型陣列,陣列大小 = 描述符最大數 / long int的位數 //陣列大小為 __FD_SETSIZE bits __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS]; } fd_set; ``` **select()程式示例:** ```c #include
#include #include #include #include #include #include #include #include #include #include #include static void usageError(const char* progName){ fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName); fprintf(stderr, " - means infinite timeout; \n"); fprintf(stderr, " r = monitor for read\n"); fprintf(stderr, " w = monitor for wirite\n\n"); fprintf(stderr, " e.g.: %s - 0rw 1w\n", progName); exit(1); } void cmdLineErr(const char *format, ...) { va_list argList; fflush(stdout); /* Flush any pending stdout */ fprintf(stderr, "Command-line usage error: "); va_start(argList, format); vfprintf(stderr, format, argList); va_end(argList); fflush(stderr); /* In case stderr is not line-buffered */ exit(EXIT_FAILURE); } int main(int argc, char* argv[]){ fd_set readfds, writefds; int ready, nfds, fd, numRead, j; struct timeval timeout; struct timeval *pto; char buf[10]; if(argc < 2 || strcmp(argv[1], "--help") == 0){ usageError(argv[0]); } if(strcmp(argv[1], "-") == 0){ pto = NULL; } else{ pto = &timeout; timeout.tv_sec = strtol(argv[1], NULL, 0); timeout.tv_usec = 0; } nfds = 0; FD_ZERO(&readfds); FD_ZERO(&writefds); for(j = 2; j < argc; j++){ numRead = sscanf(argv[j], "%d%2[rw]", &fd, buf); if(numRead != 2){ usageError(argv[0]); } if(fd >= FD_SETSIZE){ cmdLineErr("file descriptor exceeds limit (%d)\n", FD_SETSIZE); } if(fd >= nfds){ nfds = fd + 1; } if(strchr(buf, 'r') != NULL){ FD_SET(fd, &readfds); } if(strchr(buf, 'w') != NULL){ FD_SET(fd, &writefds); } } ready = select(nfds, &readfds, &writefds, NULL, pto); if(ready == -1){ printf("errExit(select)"); exit(1); } printf("ready = %d\n", ready); for(fd = 0; fd < nfds; fd++){ printf("%d: %s%s\n",fd, FD_ISSET(fd, &readfds) ? "r" : "", FD_ISSET(fd, &writefds) ? "w" : ""); } if(pto != NULL){ printf("timeout after select(): %ld.%03ld\n", (long) timeout.tv_sec, (long) timeout.tv_usec / 1000); } exit(0); } ``` **select處理正常資料和帶外資料:** ```c #include
#include #include #include #include #include #include #include #include #include #include int main(int argc, char* argv[]){ if(argc <= 2){ printf("usage: %s ip_adress port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); if(connfd < 0){ printf("error is: %d\n", errno); close(listenfd); } char buf[1024]; fd_set read_fds; fd_set exception_fds; FD_ZERO(&read_fds); FD_ZERO(&exception_fds); while(true){ memset(buf, '\0', sizeof(buf)); FD_SET(connfd, &read_fds); FD_SET(connfd, &exception_fds); ret = select(connfd + 1, &read_fds, NULL, &exception_fds, NULL); if(ret < 0){ printf("selection failure\n"); break; } if(FD_ISSET(connfd, &read_fds)){ ret = recv(connfd, buf, sizeof(buf)-1, 0); if(ret <= 0){ break; } printf("get %d bytes of normal data: %s\n", ret, buf); } else if(FD_ISSET(connfd, &exception_fds)){ ret = recv(connfd, buf, sizeof(buf)-1, MSG_OOB); if(ret <= 0){ break; } printf("get %d bytes of oob data: %s\n", ret, buf); } } close(connfd); close(listenfd); return 0; } ``` ## I/O複用-poll poll函式起源於SVR3,最初侷限於流裝置,SVR4取消了這種限制,允許poll工作在任何描述符上。 poll提供的功能與select類似,不過在處理流裝置時,它能夠提供額外的資訊。 ```c #include struct pollfd{ int fd; short events; //指定要測試的條件 short revents;//返回描述符的狀態 } //若有就緒描述符返回其數目,超時返回0,出錯返回-1 int poll(struct pollfd *fdarray, unsigned long nfds, int timeout); ``` > select()同poll()返回正整數的區別:如果一個檔案描述符在返回的集合中出現了不止一次,系統呼叫select()會將同一個檔案描述符計數多次。而系統呼叫poll()返回的是就緒態檔案描述符個數,且一個檔案描述符只會統計一次,就算在相應的revents欄位中設定了多個位掩碼也是如此。 **poll示例程式:** ```c #include #include #include #include #include #include #include #include #include #include #include #include #include #include static void usageError(const char* progName){ fprintf(stderr, "Usage: %s {timeout | -} fd-num[rw]...\n", progName); fprintf(stderr, " - means infinite timeout; \n"); fprintf(stderr, " r = monitor for read\n"); fprintf(stderr, " w = monitor for wirite\n\n"); fprintf(stderr, " e.g.: %s - 0rw 1w\n", progName); exit(1); } int main(int argc, char* argv[]){ int numPipes, j, ready, randPipe, numWrites; int (*pfds)[2];//指向陣列的指標 struct pollfd *pollFd; if(argc < 2 || strcmp(argv[1], "--help") == 0){ printf("%s num-pipes [num-writes]\n", argv[0]); exit(1); } numPipes = strtol(argv[1], NULL, 10); pfds = (int (*)[2])calloc(numPipes, sizeof(int [2])); if(pfds == NULL){ printf("error malloc"); exit(1); } pollFd = (pollfd*)calloc(numPipes, sizeof(struct pollfd)); if(pollFd == NULL){ printf("error malloc"); exit(1); } for(j = 0; j < numPipes; j++){ if(pipe(pfds[j]) == -1){ printf("error pipe %d", j); exit(1); } } numWrites = (argc > 2) ? strtol(argv[2], NULL, 10) : 1; srandom((int)time(NULL)); for(j = 0; j < numWrites; j++){ randPipe = random() % numPipes; printf("Writing to fd: %3d (read fd: %3d)\n", pfds[randPipe][1], pfds[randPipe][0]); if (write(pfds[randPipe][1], "a", 1) == -1){ printf("write %d", pfds[randPipe][1]); exit(1); } } for(j = 0; j < numPipes; j++){ pollFd[j].fd = pfds[j][0]; pollFd[j].events = POLLIN; } ready = poll(pollFd, numPipes, -1); if(ready == -1){ printf("poll error"); exit(1); } printf("poll() returned: %d\n", ready); for(j = 0; j < numPipes; j++){ if(pollFd[j].revents & POLLIN){ printf("Readable: %d %3d\n", j, pollFd[j].fd); } } return 0; } ``` ## I/O複用-epoll epoll API由三組系統呼叫組成; + `epoll_create()`建立一個epoll例項 + `epoll_ctl()`操作同epoll例項相關聯的興趣列表 + `epoll_wait()`返回與epoll相關聯的就緒列表中的成員 epoll例項:epoll API的核心資料結構,和一個開啟的檔案描述符相關聯。這個檔案描述符不用來做IO操作,相反它是核心資料結構的控制代碼,這些核心資料結構實現了兩個目的: + 記錄興趣列表 + 維護就緒列表 ### epoll_create ```c #include int epoll_create(int size); ``` 引數size指定我們想要通過epoll例項來檢查的描述符個數,不是上限,只是告知核心應該如何為內部資料結構劃分初始大小。 函式返回epoll例項的檔案描述符,該檔案描述符不需要時需要close()。 當所有與epoll例項相關的檔案描述符都被關閉時,例項被銷燬,相關資源釋放。(多個檔案描述符可能引用到相同的epoll例項,這是由於呼叫了fork()或dup()這樣的類似函式所致)。 > linux2.6.8版以來,size引數被忽略不用。 > > linux2.6.27以來,Linux支援一個新的系統呼叫epoll_create1(): > > + 去掉了無用的引數size > + 增加了一個可用來修改系統呼叫行為的flags引數 > + flag目前只支援一個標誌:EPOLL_CLOEXEC,使核心在新的檔案描述符上啟動了執行即關閉(close-on-exec)標誌(FD_CLOEXEC) ### epoll_ctl ```c #include int epoll_ctl(int epfd, int op, int fd, struct epoll *ev); ``` 成功返回0,失敗返回-1並設定errno。 引數fd:指明修改興趣列表中哪一個檔案描述符的設定 引數op:指定需要執行的操作 + EPOLL_CTL_ADD:新增 + EPOLL_CTL_MOD:修改 + EPOLL_CTL_DEL:刪除 引數ev: ```c struct epoll_event{ uint32_t events;//epoll事件,位掩碼 epoll_data_t data; //使用者資料 } ``` ```c typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64; }epoll_data_t; ``` + 結構體epoll_event在的events欄位是一個位掩碼,指定待檢查的描述符fd上感興趣的事件集合 + data欄位是一個聯合體,當描述符fd成為就緒態時,聯合體的成員可用來指定傳回給呼叫程序的資訊 + 聯合體成員不能一起使用,常用fd + 想要將檔案描述符和使用者資料關聯起來,以實現快速的資料訪問,只能使用其它手段,比如放棄使用fd,而在ptr指向的使用者資料中包含fd > **max_user_watches上限** > > 每個註冊到epoll例項上的檔案描述符需要佔用一小段不能被交換的核心記憶體空間,因此核心提供了一個介面用來定義每個使用者可以註冊到epoll例項上的檔案描述符總數。 > > 這個上限值可以通過max_user_watches來檢視和修改,max_user_watches是專屬於Linux系統的/proc/sys/fd/epoll目錄下的一個檔案。預設上限值根據可用系統記憶體計算得出。 ### epoll_wait ```c #include int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout); ``` 成功返回就緒態的檔案描述符的個數,失敗返回-1並設定errno 引數evlist指向的結構體陣列中返回的是有關就緒態檔案描述符的資訊。陣列evlist的空間由呼叫者負責申請,所包含的元素個數在引數maxevents中指定。 在陣列evlist中每個元素返回的都是單個就緒態檔案描述符的資訊: + events欄位返回在該描述符上已經發生的事件掩碼 + data欄位返回的是適用epoll_ctl()註冊監聽事件時在ev.data中所指定的值。data欄位是唯一可獲知同這個事件相關的檔案描述符號的途徑,因此,在呼叫epoll_ctl()時要麼將ev.data.fd設為檔案描述符號,要麼將ev.data.ptr設為指向包含檔案描述符號的結構體 引數timeout用來確定epoll_wait()的阻塞行為: + timeout為-1,呼叫將一直阻塞,直到興趣列表中的檔案描述符上有事件發生,或者直到捕獲到一個訊號為止 + timeout為0,執行一次非阻塞式的檢查 + timeout大於0,呼叫將阻塞至多timeout毫秒,直到檔案描述符上有事件發生,或者直到捕獲到一個訊號為止 在多執行緒程式中,可以在一個執行緒中使用epoll_ctl()將檔案描述符新增到另一個執行緒中由epoll_wait()所監視的epoll例項的興趣列表中去。這些對興趣列表的修改將立刻得到處理,而epoll_wait()呼叫將返回有關新新增的檔案描述符的就緒資訊。 epoll事件:除了有一個額外的字首E外,大多數位掩碼的名稱同poll中對應的事件掩碼名稱相同。例外情況: + EPOLLET:epoll支援邊緣觸發 + EPOLLONESHOT:只觸發一次,觸發完標記為非啟用狀態,需要使用EPOLL_CTL_MOD操作重新啟用對這個檔案描述符的檢查 ![](https://img2020.cnblogs.com/blog/1925550/202101/1925550-20210127234000230-47009156.png) **epoll程式示例:** ```c #include #include #include #include #include #include #include #define MAX_BUF 1000 #define MAX_EVENTS 5 int main(int argc, char* argv[]){ int epfd, ready, fd, s, j, numOpenFds; struct epoll_event ev; struct epoll_event evlist[MAX_EVENTS]; char buf[MAX_BUF]; if(argc < 2 || strcmp(argv[1], "--help")==0){ printf("usage: %s file...\n", argv[0]); exit(1); } epfd = epoll_create(argc - 1); if(epfd == -1){ printf("error epoll_create"); exit(1); } for(j = 1; j < argc; j++){ fd = open(argv[j], O_RDONLY); if(fd == -1){ printf("error open"); exit(1); } printf("Opened \"%s\" on fd %d\n", argv[j], fd); ev.events = EPOLLIN; ev.data.fd = fd; if(epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev) == -1){ printf("error epoll_ctl"); exit(1); } } numOpenFds = argc - 1; while(numOpenFds > 0){ printf("About to epoll_wait()\n"); ready = epoll_wait(epfd, evlist, MAX_EVENTS, -1); if(ready == -1){ if(errno == EINTR)continue; else{ printf("error epoll_wait"); exit(1); } } printf("Ready: %d\n", ready); for(j = 0; j < ready; j++){ printf(" fd = %d; events: %s%s%s\n", evlist[j].data.fd, (evlist[j].events & EPOLLIN) ? "EPOLLIN ":"", (evlist[j].events & EPOLLHUP) ? "EPOLLHUP":"", (evlist[j].events & EPOLLERR) ? "EPOLLERR":""); if(evlist[j].events & EPOLLIN){ s = read(evlist[j].data.fd, buf, MAX_BUF); if(s == -1){ printf("error read"); } printf(" read %d bytes : %.*s",s,s,buf); } else if(evlist[j].events & (EPOLLHUP | EPOLLERR)){ printf(" closing fd %d\n", evlist[j].data.fd); if(close(evlist[j].data.fd) == -1){ printf("error close"); exit(1); } numOpenFds--; } } } printf("All file descriptors closed; bye\n"); exit(0); } ``` **ET模式比LT模式觸發事件的次數更少:** ```c #include #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 10 int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd, bool enable_et){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN; if(enable_et){ event.events |= EPOLLET; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void lt(epoll_event *events, int number, int epollfd, int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, false); } else if(events[i].events & EPOLLIN){ printf("event trigger once\n"); memset(buf, '\0', BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1,0); if(ret <= 0){ close(sockfd); continue; } printf("get %d bytes of content: %s\n", ret, buf); } else{ printf("something else happened \n"); } } } void et(epoll_event* events, int number, int epollfd, int listenfd){ char buf[BUFFER_SIZE]; for(int i = 0; i < number; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, true); } else if(events[i].events & EPOLLIN){ printf("event trigger once\n"); while(true){ memset(buf, '\0',BUFFER_SIZE); int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if(ret < 0){ if((errno == EAGAIN) || (errno == EWOULDBLOCK)){ printf("read later\n"); break; } close(sockfd); break; } else if(ret == 0){ close(sockfd); } else{ printf("get %d bytes of content: %s\n",ret, buf); } } } else{ printf("something else happend \n"); } } } int main(int argc, char* argv[]){ if(argc <= 2){ printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char *ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd, true); while(true){ int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(ret < 0){ printf("epoll failure\n"); break; } lt(events, ret, epollfd, listenfd); //et(events, ret, epollfd, listenfd); } close(listenfd); return 0; } ``` ```c #include #include #include #include #include #include #include #include #include #include #include #include #include #define MAX_EVENT_NUMBER 1024 #define BUFFER_SIZE 1024 struct fds{ int epollfd; int sockfd; }; int setnonblocking(int fd){ int old_option = fcntl(fd, F_GETFL); int new_option = old_option | O_NONBLOCK; fcntl(fd, F_SETFL, new_option); return old_option; } void addfd(int epollfd, int fd, bool oneshot){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET; if(oneshot){ event.events |= EPOLLONESHOT; } epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); setnonblocking(fd); } void reset_oneshot(int epollfd, int fd){ epoll_event event; event.data.fd = fd; event.events = EPOLLIN | EPOLLET | EPOLLONESHOT; epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); } void *worker(void *arg){ int sockfd = ((fds*)arg)->sockfd; int epollfd = ((fds*)arg)->epollfd; printf("start new thread to receive data on fd: %d\n", sockfd); char buf[BUFFER_SIZE]; memset(buf, '\0', BUFFER_SIZE); while(1){ int ret = recv(sockfd, buf, BUFFER_SIZE-1, 0); if(ret == 0){ close(sockfd); printf("foreiner closed the connection\n"); break; } else if(ret < 0){ if(errno == EAGAIN){ reset_oneshot(epollfd, sockfd); printf("read later\n"); break; } } else{ printf("get content: %s\n", buf); sleep(5); } } printf("end thread receving data on fd : %d\n", sockfd); } int main(int argc, char* argv[]){ if(argc < 2){ printf("usage: %s ip_address port_number\n", basename(argv[0])); return 1; } const char* ip = argv[1]; int port = atoi(argv[2]); int ret = 0; struct sockaddr_in address; bzero(&address, sizeof(address)); address.sin_family = AF_INET; inet_pton(AF_INET, ip, &address.sin_addr); address.sin_port = htons(port); int listenfd = socket(PF_INET, SOCK_STREAM, 0); assert(listenfd >= 0); ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address)); assert(ret != -1); ret = listen(listenfd, 5); assert(ret != -1); epoll_event events[MAX_EVENT_NUMBER]; int epollfd = epoll_create(5); assert(epollfd != -1); addfd(epollfd, listenfd, false); while(1){ int ret = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1); if(ret < 0){ printf("epoll failure\n"); break; } for(int i = 0; i < ret; i++){ int sockfd = events[i].data.fd; if(sockfd == listenfd){ struct sockaddr_in client_address; socklen_t client_addrlength = sizeof(client_address); int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength); addfd(epollfd, connfd, true); } else if(events[i].events & EPOLLIN){ pthread_t thread; fds fds_for_new_worker; fds_for_new_worker.epollfd = epollfd; fds_for_new_worker.sockfd = sockfd; pthread_create(&thread, NULL, worker, (void*)&fds_for_new_worker); } else{ printf("something else happened \n"); } } } close(listenfd); return 0; }xxxxxxxxxx