1. 程式人生 > >樸素、Select、Poll和Epoll網路程式設計模型實現和分析——Select模型

樸素、Select、Poll和Epoll網路程式設計模型實現和分析——Select模型

        和樸素模型一樣,我們首先要建立一個監聽socket,然後呼叫listen去監聽伺服器埠。不同的是,我們要對make_socket方法傳遞1,因為我們要建立一個非同步的socket。

	listen_sock = make_socket(1);
	if (listen(listen_sock, SOMAXCONN) < 0) {
		perror("listen error");
		exit(EXIT_FAILURE);
	}

        下一步我們需要清空Select模型使用的fd_set結構體物件,並把監聽socket設定進去

	FD_ZERO(&active_fd_set);
	FD_SET(listen_sock, &active_fd_set);

        active_fd_set用於記錄活動的socket資訊。之後我們使用到的read_fd_set則是其一個拷貝,因為我們只關心讀行為

	fd_set active_fd_set, read_fd_set;

        和樸素模型類似,我們也需要使用一個死迴圈讓伺服器不要停止

	/* Initialize the set of active sockets. */
	while (1) {
		timeout.tv_sec = 0;
		timeout.tv_usec = 500;

		/* Service all the sockets with input pending. */
		read_fd_set = active_fd_set;
		switch(select(FD_SETSIZE, &read_fd_set, NULL, NULL, &timeout)) {
			case -1 : {
				perror("select error\n");
				exit(EXIT_FAILURE);
			}break;
			case 0 : {
				//perror("select timeout\n");
			}break;
			default: {

        select函式第一個引數我們傳遞了FD_SETSIZE,它在我的系統上是1024,它代表需要關注的socket的最大個數。第二引數是用於記錄需要關注的發生讀事件的fd_set物件。我們讓select函式按非同步方式執行,故最後一個引數設定為500微秒的超時時間。整個select函式意思是我們需要等待socket發生可讀事件,如果等待時間超過超時設定,則函式返回0,如果出錯則返回-1,如果等待到事件則返回其他值。

			default: {
				for (index = 0; index < FD_SETSIZE; ++index) {
					if (FD_ISSET(index, &read_fd_set)) {
						if (listen_sock == index) {
							/* Connection request on original socket. */
							int new_sock;
							new_sock = accept(listen_sock, NULL, NULL);
							if (new_sock < 0) {
								perror("accept error");
								exit(EXIT_FAILURE);
							}
							request_add(1);
							//set_block_filedes_timeout(new_sock);
							FD_SET(new_sock, &active_fd_set);
						} else {
							if (0 == server_read(index)) {
								server_write(index);
							}
							close(index);
							FD_CLR(index, &active_fd_set);
						}
					}
				}
			}
		}
	}
	return 0;
}

        default中才是我們程式的重點。

        我們使用一個for迴圈遍歷每個socket。如果該socket通過FD_ISSET巨集判斷不處於我們關注的可讀事件fd_set中,則忽略它。

        如果處在可讀fd_set中,則看看其是否是監聽socket。
        如果是監聽socket,則使用accpet方法獲取接入的socket。並使用request_add讓請求數量加一。還要使用FD_SET巨集將該socket加入到活動狀態的fd_set中。之後該活動狀態的fd_set將被賦值給需要關注可讀事件的fd_set中。

        如果不是監聽socket,則是接入的socket。於是我們呼叫《樸素、Select、Poll和Epoll網路程式設計模型實現和分析——樸素模型》一文中介紹的server_read和server_write方法讀取內容並回包。最後我們還要關閉socket,並使用FD_CLR巨集將該socket從活動狀態的fd_set中去掉。之後的select函式將不會在關注該socket了。

        整個過程非常簡單。但是這其中卻包含了很多值得思考的問題。

        首先我丟擲一個問題,我在default中使用了一個從0到FD_SETSIZE的遍歷行為。並且將遍歷的遊標——index作為socket去操作——使用server_read和server_write去讀取。於是問題就來了,使用make_socket建立的socket值和使用accept接收到的socket的值怎麼和遊標產生關聯?程式碼中似乎沒有任何讓它們產生關聯的邏輯,而且它們的關係是嚴格的“相等”的關係!那麼只有一個假設,就是make_socket和accept返回的socket值在FD_SETSIZE和0之間。但是目前我沒有找到文件對這個問題進行說明,而我也沒深入研究這兩個函式考證到其值就是在這個範圍之內,那麼為什麼我還要這麼去用呢?

        我們先記下這個問題,深入到linux的原始碼中取解釋這個使用的正確性。

        我們先看下fd_set的定義

/* The fd_set member is required to be an array of longs.  */
typedef long int __fd_mask;

/* Some versions of <linux/posix_types.h> define this macros.  */
#undef	__NFDBITS
/* It's easier to assume 8-bit bytes than to get CHAR_BIT.  */
#define __NFDBITS	(8 * (int) sizeof (__fd_mask))
#define	__FD_ELT(d)	((d) / __NFDBITS)
#define	__FD_MASK(d)	((__fd_mask) 1 << ((d) % __NFDBITS))

/* fd_set for select and pselect.  */
typedef struct
  {
    /* XPG4.2 requires this member name.  Otherwise avoid the name
       from the global namespace.  */
#ifdef __USE_XOPEN
    __fd_mask fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->fds_bits)
#else
    __fd_mask __fds_bits[__FD_SETSIZE / __NFDBITS];
# define __FDS_BITS(set) ((set)->__fds_bits)
#endif
  } fd_set;

/* Maximum number of file descriptors in `fd_set'.  */
#define	FD_SETSIZE		__FD_SETSIZE

        以上我是在我ubuntu系統的/usr/include/x86_64-linux-gnu/sys/select.h檔案中找到的定義。我們看到fd_set的主體就是一個long int型陣列__fds_bits。該陣列的個數是兩個數的商。被除數__FD_SETSIZE就是我們程式中使用的FD_SETSIZE,也就是1024。除數__NFDBITS是64。於是fd_set中陣列元素的個數是1024/64=16。注意一下這個值是16,而我們程式中關注的socket的最大個數是FD_SETSIZE——1024,這是為什麼?其實這就是該結構設計的一個精妙之處。fd_set的__fds_bits是一個16個元素的long int型陣列,其總長度就是16*64=1024位。於是可以使用每一位表示一個socket。

        我們到/usr/include/x86_64-linux-gnu/bits/select.h 檔案中看看linux是如何讓socket和這個空間中每一位進行對應的。我們檢視FD_SET巨集

#define __FD_SET(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))

        __FDS_BITS巨集定義在fd_set定義中。它就是返回fd_set的__fds_bits陣列首地址。陣列的遊標是通過__FD_ELT對socket進行處理的結果。__FD_ELT在上面我們已經見過,它是對socket值除以__NFBITS——64的值。通過遊標我們取到陣列元素值之後,我們再用其與__FD_MASK對socket進行操作的值進行或操作。__FD_MASK的定義也在上面給出,它是將socket的值與__NFBITS——64相除,取得餘數,然後讓1左移該餘數次。這樣我們就將該socket對映到fd_set記憶體的一位中。我們知道,只要在知道除數、商和餘數的情況下,可以很方便的推算出被除數是多少。可以說linux核心對這塊的設計真是做到了極致,不浪費一點點空間。

        有了上面的認識,我們就知道select模型最大隻能支援FD_SETSIZE個數的socket,而且socket的值也只能在FD_SETSIZE之內。如果socket()或accept()函式返回的socket值大於FD_SETSIZE,則select模型將出現錯誤——上面的計算將溢位。基於這種反向推理,我們可以放心大膽的使用0到FD_SETSIZE的值去當socket的值去計算。我看網上有很多select例子需要使用一個數組去維護接入的socket,如果在不考慮效率的前提下是不必要的。但是如果你追求極致的Select模型效能,還是建議使用一個數組去維護Socket,這樣不至於出現大量的浪費操作。這塊分析我們將在後文中給出。

        這兒再多言一句,正是因為這種位操作,我們才需要在使用fd_set之前呼叫FD_ZERO去清空所有空間

# if __WORDSIZE == 64
#  define __FD_ZERO_STOS "stosq"
# else
#  define __FD_ZERO_STOS "stosl"
# endif

# define __FD_ZERO(fdsp) \
  do {									      \
    int __d0, __d1;							      \
    __asm__ __volatile__ ("cld; rep; " __FD_ZERO_STOS			      \
			  : "=c" (__d0), "=D" (__d1)			      \
			  : "a" (0), "0" (sizeof (fd_set)		      \
					  / sizeof (__fd_mask)),	      \
			    "1" (&__FDS_BITS (fdsp)[0])			      \
			  : "memory");					      \
  } while (0)

        如果socket不再需要監測,則我們使用__FD_CLR在fd_set中去除其對應的位

#define __FD_CLR(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))

        再看下客戶端的輸出


        可見當前環境下,select模型的處理能力大概是每秒7000多連線。(和下一章介紹的Poll模型差距不大,而且如果使用陣列維護Socket還可以提高效能)