1. 程式人生 > >select、poll和epoll的總結對比

select、poll和epoll的總結對比

綜述

首先要搞明白兩個基本概念:I/O複用和(非)阻塞機制。
I/O複用指的是允許計算機執行或者阻塞在一組資料流上,直到某個到達喚醒阻塞的程序,此時的I/O通道不僅僅是通過一個數據流,而是一組,所以是複用。

阻塞和非阻塞:拿I/O為例子,如果是阻塞模型,那麼程式一直會等到有資料來的時候才會繼續向下執行,否則會一直等待資料的到來;如果是非阻塞模型,如果有資料,那麼直接讀取資料向下執行,沒有資料也會繼續向下執行,不過此時可能會進行一些其他的操作,比如Linux中設定一些錯誤的位元位等。

selectpollepoll這三個函式是Linux系統中I/O複用系統呼叫函式。I/O複用使得這三個函式可以同時監聽多個9檔案描述符]()(File Descriptor, FD),因為每個檔案描述符相當於一個需要 I/O的“檔案”,在socket中共用一個埠。但是,三個函式的本身是阻塞的,因此即使是利用了I/O複用技術,如果程式不採用特別的措施,那麼還是隻能順序處理每個檔案描述符到來的I/O請求,因此這樣預設伺服器是序列的。而併發是把上面說的序列處理成同時或者同一時間段,本文暫時不討論併發。

select

select是三者當中最底層的,它的事件的輪訓機制是基於位元位的。每次查詢都要遍歷整個事件列表。
理解select,首先要理解select要處理的fd_set資料結構,每個select都要處理一個fd_set結構。fd_set簡單地理解為一個長度是1024的位元位,每個位元位表示一個需要處理的FD,如果是1,那麼表示這個FD有需要處理的I/O事件,否則沒有。Linux為了簡化位操作,定義了一組巨集函式來處理這個位元位陣列。

void FD_CLR(int fd, fd_set *set);     // 清空fd在fd_set上的對映,說明select不在處理該fd
int
FD_ISSET(int fd, fd_set *set); // 查詢fd指示的fd_set是否是有事件請求 void FD_SET(int fd, fd_set *set); // 把fd指示的fd_set置1 void FD_ZERO(fd_set *set); // 清空整個fd_set,一般用於初始化

從上述可以看出,select能處理fd最大的數量是1024,這是由fd_set的容量決定的。

再看select的呼叫方式:

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *
exceptfds, struct timeval *timeout);
  • nfds:表示表示檔案描述符最大的數目+1,這個數目是指讀事件和寫事件中數目最大的,+1是為了全面檢查
  • readfds:表示需要監視的會發生讀事件的fd,沒有設定為NULL
  • writefds:表示需要監視的會發生寫事件的fd,沒有設定為NULL
  • exceptfds:表示異常處理的,暫時沒用到。。。
  • timeout:表示阻塞的時間,如果是0表示非阻塞模型,NULL表示永遠阻塞,直到有資料來
struct timeval {
   long    tv_sec;         /* seconds */
   long    tv_usec;        /* microseconds */
};

有三個型別的返回值:

  • 正數: 表示readfdswritefds就緒事件的總數
  • 0:超時返回0
  • -1:出現錯誤

給出一個一般的通用模型:

int main() {


  fd_set read_fs, write_fs;
  struct timeval timeout;
  int max_sd = 0;  // 用於記錄最大的fd,在輪詢中時刻更新即可
  
  /*
   * 這裡進行一些初始化的設定,
   * 包括socket建立,地址的設定等,
   * 同時記得初始化max_sd
   */

  // 初始化位元位
  FD_ZERO(&read_fs);
  FD_ZERO(&write_fs);

  int rc = 0;
  int desc_ready = 0; // 記錄就緒的事件,可以減少遍歷的次數
  while (1) {
    // 這裡進行阻塞
    rc = select(max_sd + 1, &read_fd, &write_fd, NULL, &timeout);
    if (rc < 0) {
      // 這裡進行錯誤處理機制
    }
    if (rc == 0) {
      // 這裡進行超時處理機制
    }

    desc_ready = rc;
    // 遍歷所有的位元位,輪詢事件
    for (int i = 0; i <= max_sd && desc_ready; ++i) {
      if (FD_ISSET(i, &read_fd)) {
        --desc_ready;
        // 這裡處理read事件,別忘了更新max_sd
      }
      if (FD_ISSET(i, &write_fd)) {
        // 這裡處理write事件,別忘了更新max_sd
      }
    }
  }
}

這只是一個簡單的模型,有時候還可能需要使用FD_CTLFD_SET增加或者減少fd,根據實際情況靈活處理即可。

poll

可以認為poll是一個增強版本的select,因為select的位元位操作決定了一次性最多處理的讀或者寫事件只有1024個,而poll使用一個新的方式優化了這個模型。

還是先了解poll底層操作的資料結構pollfd

struct pollfd {
	int fd;          // 需要監視的檔案描述符
	short events;    // 需要核心監視的事件
	short revents;   // 實際發生的事件
};

在使用該結構的時候,不用進行位元位的操作,而是對事件本身進行操作就行。同時還可以自定義事件的型別。具體可以參考手冊

同樣的,事件預設初始化全部都是0,通過bzero或者memset統一初始化即可,之後在events上註冊感興趣的事件,監聽的時候在revents上監聽即可。註冊事件使用|操作,查詢事件使用&操作。比如想要註冊POLLIN資料到來的事件,需要pfd.events |= POLLIN,註冊多個事件進行多次|操作即可。取消事件進行~操作,比如pfd.events ~= POLLIN。查詢事件:pfd.revents & POLLIN

使用poll函式進行操作:

#include <poll.h>
int poll(struct pollfd* fds, nfds_t nfds, int timeout);

引數說明:

  • fds:一個pollfd佇列的隊頭指標,我們先把需要監視的檔案描述符和他們上面的事件放到這個佇列中
  • nfds:佇列的長度
  • timeout:事件操作,設定指定正數的阻塞事件,0表示非阻塞模式,-1表示永久阻塞。
    時間的資料結構:
struct timespec {
	long    tv_sec;         /* seconds */
    long    tv_nsec;        /* nanoseconds */
};

給出一個常用的模型:

// 先巨集定義長度
#define MAX_POLLFD_LEN 200  

int main() {
  /*
   * 在這裡進行一些初始化的操作,
   * 比如初始化資料和socket等。
   */

  int rc = 0;
  pollfd fds[MAX_POLL_LEN];
  memset(fds, 0, sizeof(fds));
  int ndfs  = 1;  // 佇列的實際長度,是一個隨時更新的,也可以自定義其他的
  int timeout = 0;
  /*
   * 在這裡進行一些感興趣事件的註冊,
   * 每個pollfd可以註冊多個型別的事件,
   * 使用 | 操作即可,就行博文提到的那樣。
   * 根據需要設定阻塞時間
   */

  int current_size = ndfs;
  int compress_array = 0;  // 壓縮佇列的標記
  while (1) {
    rc = poll(fds, nfds, timeout);
    if (rc < 0) {
    // 這裡進行錯誤處理
    }
    if (rc == 0) {
    // 這裡進行超時處理
    }

    for (int i = 0; i < current_size; ++i) {
      if (fds[i].revents == 0){  // 沒有事件可以處理
        continue;
      }
      if (fds[i].revents & POLLIN) {  // 簡單的例子,比如處理寫事件
      
      }
      /*
       * current_size 是為了降低複雜度的,可以隨時進行更新
       * ndfs如果要更新,應該是最後統一進行
       */
    }

    if (compress_array) {  // 如果需要壓縮佇列
      compress_array = 0;
      for (int i = 0; i < ndfs; ++i) {
        for (int j = i; j < ndfs; ++j) {
          fds[i].fd = fds[j + i].fd;
        }
        --i;
        --ndfs;
      }
    }
  }
}

程式碼中涉及到了一些壓縮佇列的操作,也可以不用這些。。。

epoll

epoll是一個更加高階的操作,上述的select或者poll操作都需要輪詢所有的候選佇列逐一判斷是否有事件,而且事件佇列是直接暴露給呼叫者的,比如上面selectwrite_fdpollfds,這樣複雜度高,而且容易誤操作。epoll給出了一個新的模式,直接申請一個epollfd的檔案,對這些進行統一的管理,初步具有了面向物件的思維模式。

還是先了解底層的資料結構:

typedef union epoll_data {
	void *ptr;
	int fd;
	uint32_t u32;
	uint64_t u64;
} epoll_data_t;

struct epoll_event {
	uint32_t events;
	epoll_data_t data;
};

注意到,epoll_data是一個union型別。fd很容易理解,是檔案描述符;而檔案描述符本質上是一個索引核心中資源地址的一個下標描述,因此也可以用ptr指標代替;同樣的這些資料可以用整數代替。
再來看epoll_event,有一個data用於表示fd,之後又有一個events表示註冊的事件。

epoll通過一組函式進行。
建立epollfd

#include <sys/epoll.h>
int epoll_create(int size);

size用於指定核心維護的佇列大小,不過在2.6.8之後這個引數就沒有實際價值了,因為核心維護一個動態的隊列了。
函式返回的是一個epoll的fd,之後的事件操作通過這個epollfd進行。

還有另一個建立的函式:

#include <sys/epoll.h>
int epoll_create1(int flag);

flag==0時,功能同上,另一個選項是EPOLL_CLOEXEC。這個選項的作用是當父程序fork出一個子程序的時候,子程序不會包含epollfd,這在多程序程式設計時十分有用。

處理事件:

#include <sys.epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
  • epfd是建立的epollfd
  • op表示操作的型別
    • EPOLL_CTL_ADD :註冊事件
    • EPOLL_CTL_MOD:更改事件
    • EPOLL_CTL_DEL:刪除事件
  • fd是相應的檔案描述符
  • event是事件佇列

等待事件

int epoll_wait(int epfd, struct epoll_event* evlist, int maxevents, int timeout);
  • epfdepoll的檔案描述符
  • evlist是發生的事件佇列
  • maxevents是佇列最長的長度
  • timeout是時間限制,正整數時間,0是非阻塞,-1永久阻塞直到事件發生。
    返回就緒的個數,0表示沒有,-1表示出錯。

給出官網上的一個模板:

#define MAX_EVENTS 10
int main() {
	struct epoll_event ev, events[MAX_EVENTS];
    int listen_sock, conn_sock, nfds, epollfd;

    /* Code to set up listening socket, 'listen_sock',
     (socket(), bind(), listen()) omitted */

	epollfd = epoll_create1(0);
	if (epollfd == -1) {
		perror("epoll_create1");
      	exit(EXIT_FAILURE);
	}

	ev.events = EPOLLIN;
	ev.data.fd = listen_sock;
	if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listen_sock, &ev) == -1) {
		perror("epoll_ctl: listen_sock");
		exit(EXIT_FAILURE);
	}

	for (;;) {
	    // 永久阻塞,直到有事件
		nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);
		if (nfds == -1) {  // 處理錯誤
			perror("epoll_wait");
			exit(EXIT_FAILURE);
		}

		for (n = 0; n < nfds; ++n) {
			if (events[n].data.fd == listen_sock) {
				conn_sock = accept(listen_sock, (struct sockaddr *) &addr, &addrlen);
				if (conn_sock == -1) {
					perror("accept");
					exit(EXIT_FAILURE);
				}
				setnonblocking(conn_sock);
				ev.events = EPOLLIN | EPOLLET;
				ev.data.fd = conn_sock;
				if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock, &ev) == -1) {
					perror("epoll_ctl: conn_sock");
					exit(EXIT_FAILURE);
				}
			} else {
				do_use_fd(events[n].data.fd);
			}
		}
	}
	return 0;
}

epoll的ET和LT工作模式
epoll的ET模式是預設模式,這也是select和poll的模式,即只要有事件發生,那麼就會被epoll_wait所捕獲,如果一次讀寫沒有完成,那麼會在下一次epoll_wait呼叫時接著被捕獲;而ET邊沿觸發模式是讀寫沒完成,下次不會被捕獲,之後新的資料到達時才會觸發。

EPOLLONESHOT事件
epoll特有的事件,作業系統上最多觸發檔案描述符上註冊的一個可讀、可寫或者異常事件,只能觸發一次,除非使用epoll_ctl重置該描述符。這在多執行緒程式設計時常用到,處理完畢後需要重新復原。

總結


但是,如果是連線數量不是特別多,但是經常會有連線加入或者退出的時候,就要考慮poll或者select了。