1. 程式人生 > >樸素、Select、Poll和Epoll網路程式設計模型實現和分析——樸素模型

樸素、Select、Poll和Epoll網路程式設計模型實現和分析——樸素模型

        做Linux網路開發,一般繞不開標題中幾種網路程式設計模型。網上已有很多寫的不錯的分析文章,它們的基本論點是差不多的。但是我覺得他們講的還不夠詳細,在一些關鍵論點上缺乏資料支援。所以我決定好好研究這幾個模型。(轉載請指明出於breaksoftware的csdn部落格)

        在研究這些模型前,我決定按如下步驟去做:

  1. 實現樸素模型
  2. 實現發請求的測試程式
  3. 實現Select模型,測試其效率
  4. 實現Poll模型,測試其效率
  5. 實現Epoll模型,測試其效率
  6. 分析各模型效能,分析和對比其原始碼
  7. 針對各模型特點,修改上述程式進行測試和分析

        樸素模型是我們程式設計時可以使用的最簡單的一種模型。因為沒有一個確切的名字可以稱呼,我索性叫它樸素模型。我選擇先實現它,一是為了由易而難,二是為了遵循模型發展的過程、體會技術發展的歷程。在實現完樸素模型之後,我們要去實現一個用於傳送請求的測試程式,它將幫助我們傳送大量的請求,以便於之後我們對各個模型進行可用性測試。之後我們再去實現Select、Poll和Epoll網路模型。這個順序也是技術發展的順序,我們可以在實現前一個模型時分析其優缺點,然後在後一個模型分析中,看到其對這些缺點的改進方案,體會技術進步的過程。

        為了便於之後各個模型的對比,我會盡可能的重用程式碼,即各個模型功能相同的模組將使用相同的函式去實現,如果實在不可以重用,則使用引數進行區分,但是區分的程式碼片段將足夠的小。所以,我們將在本文看到大部分重要的程式碼實現片段。

        為了比較直觀的觀察各個模型的執行,我們將在各個模型執行前,啟動一個列印統計資訊的執行緒

         err = init_print_thread();
         if (err < 0) {
                 perror("create print thread error");
                 exit(EXIT_FAILURE);
         }

        init_print_thread函式將被各個模型使用,wait_print_thread是用於等待該列印結果的執行緒退出。由於我並不準備讓這個執行緒退出,所以wait_print_thread往往用來阻塞主執行緒。

 pthread_t g_print_thread;
 
 int
 init_print_thread() {
         return pthread_create(&g_print_thread, NULL, print_count, NULL);
 }
 
 void
 wait_print_thread() {
         pthread_join(g_print_thread, NULL);
 }

        print_count函式是用於執行緒執行的實體,它每隔一秒鐘列印一條記錄

static int g_request_count = 0;

static int g_server_suc = 0;
static int g_client_suc = 0;
static int g_read_suc = 0;
static int g_write_suc = 0;

static int g_server_fai = 0;
static int g_client_fai = 0;
static int g_read_fai = 0;
static int g_write_fai = 0;

void* print_count(void* arg) {
        struct timeval cur_time;
        int index = 0;
        fprintf(stderr, "index\tseconds_micro_seconds\tac\tst\tsr\tsw\tft\tfr\tfw\n");
        while (1) {
                sleep(1);
                gettimeofday(&cur_time, NULL);
                fprintf(stderr, "%d\t%ld\t%d\t%d\t%d\t%d\t%d\t%d\t%d\n",
                                index,
                                cur_time.tv_sec * 1000000 + cur_time.tv_usec,
                                g_request_count,
                                g_server_suc > g_client_suc ? g_server_suc : g_client_suc,
                                g_read_suc,
                                g_write_suc,
                                g_server_fai > g_client_fai ? g_server_fai : g_client_fai,
                                g_read_fai,
                                g_write_fai);
                index++;
        }
}

        上述各資料的定義如下:        

  • g_request_count用於記錄總請求數;
  • g_server_suc是用於記錄服務行為成功數,其場景為:讀取客戶端成功且傳送回包成功
  • g_server_fai是記錄服務其行為失敗數,其場景為:1 讀取客戶端失敗;2 讀取客戶端成功但是傳送回包失敗;
  • g_client_suc用於記錄客戶端行為成功數,其場景為:傳送包成功且讀取伺服器回包成功;
  • g_client_fai用於記錄客戶端行為失敗數,其場景為:1 傳送包失敗; 2 傳送包成功但是接收伺服器回包失敗;
  • g_read_suc用於記錄讀取行為成功數,其場景為: 1 伺服器讀取客戶端請求包成功; 2 客戶端讀取伺服器回包成功;
  • g_read_fai用於記錄讀取行為失敗數,其場景為: 1 伺服器讀取客戶端請求包失敗; 2 客戶端讀取伺服器回包失敗;
  • g_write_suc用於記錄傳送行為成功數,其場景為: 1 客戶端向伺服器傳送請求包成功; 2 伺服器向客戶端回包成功;
  • g_write_fai用於記錄傳送行為失敗數,其場景為: 1 客戶端向伺服器傳送請求包失敗; 2 伺服器向客戶端回包失敗;

        通過資料的列印,我們將知道伺服器和客戶端執行執行的過程,以及出問題的環節,還有伺服器的丟包情況。

        下一步,我們需要建立一個供客戶端連線的Socket。

	listen_sock = make_socket(0);

        我們對make_socket傳入了引數0,是因為我們不要求建立的監聽Socket具有非同步屬性。

int
make_socket(int asyn) {
	int listen_sock = -1;
	int rc = -1;
	int on = 1;
	struct sockaddr_in name;
	listen_sock = socket(AF_INET, SOCK_STREAM, 0);
	if (listen_sock < 0) {
		perror("create socket error");
		exit(EXIT_FAILURE);
	}

	rc = setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (char*)&on, sizeof(on));
	if (rc < 0) {
		perror("setsockopt error");
		exit(EXIT_FAILURE);
	}
	
	if (asyn) {
		rc = ioctl(listen_sock, FIONBIO, (char*)&on);
		if (rc < 0) {
			perror("ioctl failed");
			exit(EXIT_FAILURE);
		}
	}

	name.sin_family = AF_INET;
	name.sin_port = htons(PORT);
	name.sin_addr.s_addr = htonl(INADDR_ANY);
	if (bind(listen_sock, (struct sockaddr*)&name, sizeof(name)) < 0) {
		perror("bind error");
		exit(EXIT_FAILURE);
	}
	return listen_sock;
}

        這個函式中我們使用了socket函式建立了一個TCP的Socket。並使用bind函式將該socket繫結到本機特定的埠上。

        在樸素模型中,我們讓伺服器是一個同步處理過程。於是不要求之後的連線具有非同步屬性,所以我們建立該Socket時傳了引數0——讓監聽Socket不具有非同步特性。在之後介紹的Select、Poll和Epoll模型中,我們需要客戶端接入的連線是非同步的,於是我們就傳遞了引數1,讓監聽Socket具有非同步特性,這樣通過它接入的連線也是非同步的。

        Socket繫結之後,伺服器就要開始監聽客戶端的接入

	if (listen(listen_sock, SOMAXCONN) < 0) {
		perror("listen error");
		exit(EXIT_FAILURE);
	}

        SOMAXCONN是可以同時處理的最大連線數,它是一個系統巨集。在我係統上它的值是128。

        最後,我們在一個死迴圈中接收並處理客戶端的請求

	while (1) {
		int new_sock;
		new_sock = accept(listen_sock, NULL, NULL);
		if (new_sock < 0) {
			perror("accept error");
			exit(EXIT_FAILURE);
		}

        通過accept我們將獲得接入的socket。如果socket值合法,我們則需要讓接受的請求數自增1

		request_add(1);

        request_add函式將在之後不同模型以及測試程式中被呼叫,而且會是在不同的執行緒中呼叫。於是這兒就引入一個多執行緒的問題。我並不打算使用鎖等方法,而是利用簡單的原子操作來實現。

void 
request_add(int count) {
	__sync_fetch_and_add(&g_request_count, count);
}

        由於我們設計的樸素模式是一個同步過程,所以接入的socket不是非同步的。當一些特殊情況發生時,之後的讀取socket內容的行為或者往socket中寫入內容的行為可能會卡住。這樣將導致整個服務都卡住,這是我們不希望看到的。於是我們需要對該同步socket設定操作超時屬性。

		set_block_filedes_timeout(new_sock);
void
set_block_filedes_timeout(int filedes) {
 	struct timeval tv_out, tv_in;

	tv_in.tv_sec = READ_TIMEOUT_S;
    	tv_in.tv_usec = READ_TIMEOUT_US;
	if (setsockopt(filedes, SOL_SOCKET, SO_RCVTIMEO, &tv_in, sizeof(tv_in)) < 0) {
		perror("set rcv timeout error");
		exit(EXIT_FAILURE);
	}
	
	tv_out.tv_sec = WRITE_TIMEOUT_S;
    	tv_out.tv_usec = WRITE_TIMEOUT_US;
	if (setsockopt(filedes, SOL_SOCKET, SO_SNDTIMEO, &tv_out, sizeof(tv_out)) < 0) {
		perror("set rcv timeout error");
		exit(EXIT_FAILURE);
	}
}

        這兒要說明下,我在網上看過很多人提問說通過上述方法設定超時屬性無效。其實是他們犯了一個錯誤,就是將socket設定為非同步屬性。如果socket既設定為非同步屬性,又設定了超時,socket當然是按非同步特點去執行的,超時設定也就無效了。

        還有一個問題,就是有些同學在自己設計伺服器和客戶端時發生了“死鎖”問題(非嚴格定義意義上的死鎖)。那是因為設計的伺服器和客戶端都是同步的,而且socket都沒有設定超時。這樣在客戶端呼叫完write之後進入read時,伺服器此時也是read狀態,導致了“死鎖”。但是這個問題並不是經常發生,因為大部分同學實現read時給了一個很大的快取,並認為讀取的內容一次性可以讀完。而沒有考慮到一次read操作可能讀不完全部資料的情況,比如下面的實現

	while (nbytes > 0) {
		nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0);
		if (nbytes > 0) {
			total_length_recv += nbytes;
		}
		//buffer[nbytes] = 0;
		//fprintf(stderr, "%s", buffer);
	}

        這段伺服器read操作考慮到了一次性可能讀不完全部資料的問題。但是如果客戶端傳送完資料後,伺服器第一次recv可以把全部資料讀取出來了。由於讀取的資料大於0,於是再次進入讀取操作,這個時候,客戶端已經處於讀取伺服器返回的階段。由於socket是同步的,且未設定超時,導致伺服器一直卡在再次讀取的操作中,這樣就發生了“死鎖”。其實這個過程非常有意思,當我們對一段不健壯的程式碼進行加固時,往往會掉到另外一個坑裡。但是隻要我們努力的從坑裡跳出來,就會豁然開朗且認識到很多別人忽視的問題。

        我們再回到正題,我們設定好socket超時屬性後,就開始讓伺服器讀取客戶端的輸入內容,如果輸入內容讀取成功,則往客戶端回包。最後伺服器將該次連線關閉

		if (0 == server_read(new_sock)) {
			server_write(new_sock);
		}
		close(new_sock);

        server_read方法在底層呼叫了read_data方法,read_data方法是我們整個程式碼的兩個關鍵行為之一

int
is_nonblock(int fd) {
	int flags = fcntl(fd, F_GETFL);
	if (flags == -1) {
		perror("get fd flags error");
		exit(EXIT_FAILURE);
	}
	return (flags & O_NONBLOCK) ? 1 : 0;
}

int
read_data(int filedes, int from_server) {
	char buffer[MAXMSG];
	int nbytes;
	int total_len_recv;
	int wait_count = 0;
	int rec_suc = 0;
	total_len_recv = 0;
	
	while (1) {
		nbytes = recv(filedes, buffer, sizeof(buffer) - 1, 0);
		if (nbytes < 0) {
			if (is_nonblock(filedes)) {
				if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) {
					if (wait_count < WAIT_COUNT_MAX) {
						wait_count++;
						usleep(wait_count);
						continue;
					}
				}
			}
			break;
		}
		if (nbytes == 0) {
			//fprintf(stderr, "read end\n");
			break;
		}
		else if (nbytes > 0) {
			total_len_recv += nbytes;
			//buffer[nbytes] = 0;
			//fprintf(stderr, "%s", buffer);
		}

		if ((from_server && is_server_recv_finish(total_len_recv))
			|| (!from_server && is_client_recv_finish(total_len_recv))) {
			rec_suc = 1;
			break;
		}
	}

        read_data行為分為客戶端和伺服器兩個版本實現,其基本邏輯是一樣的。我們考慮到讀取操作可能一次性讀不完,所以我們使用while迴圈持續嘗試讀取。如果是一個非同步的socket,我們則考慮recv函式返回小於0時各種錯誤值的場景,並使用漸長等待的方式進行多次嘗試。如果是同步的socket,一旦recv返回值小於0,則退出讀取操作。total_len_recv函式用於統計一共讀取的長度,之後通過這個長度結合是否是伺服器還是客戶端的標識,判斷讀取操作是否完成。

        當讀取操作結束後,我們要統計讀取操作的行為及其標識的整個過程的行為。

	if (from_server) {
		if (rec_suc) {
			__sync_fetch_and_add(&g_read_suc, 1);
			return 0;
		} else {
			__sync_fetch_and_add(&g_read_fai, 1);
			__sync_fetch_and_add(&g_server_fai, 1);
			return -1;
		}
	} else {
		if (rec_suc) {
			__sync_fetch_and_add(&g_read_suc, 1);
			__sync_fetch_and_add(&g_client_suc, 1);
			return 0;
		} else {
			__sync_fetch_and_add(&g_read_fai, 1);
			__sync_fetch_and_add(&g_client_fai, 1);
			return -1;
		}
	}

}

        如果讀取操作成功,則進行傳送操作。server_write方法在底層呼叫了write_data方法

int
write_data(int filedes, int from_server) {
	int nbytes;
	int total_len_send;
	int wait_count = 0;
	int index;
	int send_suc = 0;

	total_len_send = 0;
	index = 0;
	while (1) {
		if (from_server) {
			nbytes = send(filedes, get_server_send_ptr(index), get_server_send_len(index), 0);
		}
		else {
			nbytes = send(filedes, get_client_send_ptr(index), get_client_send_len(index), 0);
		}

		if (nbytes < 0) {
			if (is_nonblock(filedes)) {
				if (EAGAIN == errno || EWOULDBLOCK == errno || EINTR == errno) {
					if (wait_count < WAIT_COUNT_MAX) {
						wait_count++;
						usleep(wait_count);
						continue;
					}
				}
			}
			break;
		}
		else if (nbytes == 0) {
			break;
		}
		else if (nbytes > 0) {
			total_len_send += nbytes;
		}
		
		if ((from_server && is_server_send_finish(total_len_send)) 
			||(!from_server && is_client_send_finish(total_len_send))){
			send_suc = 1;
			break;
		}
	}

        其實現和read_data思路一致,也考慮到一次性寫不完的情況和同步非同步socket問題。寫入操作完成後再去統計相關行為

	if (from_server) {
		if (send_suc) {
			__sync_fetch_and_add(&g_write_suc, 1);
			__sync_fetch_and_add(&g_server_suc, 1);
			return 0;
		} else {
			__sync_fetch_and_add(&g_write_fai, 1);
			__sync_fetch_and_add(&g_server_fai, 1);
			return -1;
		}
	} else {
		if (send_suc) {
			__sync_fetch_and_add(&g_write_suc, 1);
			return 0;
		} else {
			__sync_fetch_and_add(&g_write_fai, 1);
			__sync_fetch_and_add(&g_client_fai, 1);
			return -1;
		}
	}
}

        最後我們講下測試程式的實現。為了便於測試,我要求測試程式可以接受至少2個引數,第一個引數是用於標識啟動多少個執行緒傳送請求;第二個引數用於指定執行緒中等待多少毫秒傳送一次請求;第三個引數是可選的,標識一共傳送多少次請求。這樣我們可以通過這些引數控制測試程式的行為

#define MAXREQUESTCOUNT 100000

static int g_total = 0;
static int g_max_total = 0;

void* send_data(void* arg) {
	int wait_time;
	int client_sock;
	wait_time = *(int*)arg;
	
	while (__sync_fetch_and_add(&g_total, 1) < g_max_total) {
		usleep(wait_time);
		client_sock = make_client_socket();
		connect_server(client_sock);
		request_add(1);
		set_block_filedes_timeout(client_sock);
		if (0 == client_write(client_sock)) {
			client_read(client_sock);
		}
		close(client_sock);
		client_sock = 0;
	}
}

int 
main(int argc, char* argv[]) {
	int thread_count;
	int index;
	int err;
	int wait_time;
	pthread_t thread_id;

	if (argc < 3) {
		fprintf(stderr, "error! example: client 10 50\n");
		return 0;
	}

	err = init_print_thread();
	if (err < 0) {
		perror("create print thread error");
		exit(EXIT_FAILURE);
	}

	thread_count = atoi(argv[1]);
	wait_time = atoi(argv[2]);
	
	g_max_total = MAXREQUESTCOUNT;
	if (argc > 3) {
		g_max_total = atoi(argv[3]);
	}	

	for (index = 0; index < thread_count; index++) {
		err = pthread_create(&thread_id, NULL, send_data, &wait_time);
		if (err != 0) {
			perror("can't create send thread");
			exit(EXIT_FAILURE);
		}
		
	}

	wait_print_thread();
	return 0;
}

        執行緒中,首先通過make_client_socket建立socket並繫結到本地埠上

int
make_client_socket() {
	int client_sock = -1;
	struct sockaddr_in client_addr;
	
	client_sock = socket(AF_INET, SOCK_STREAM, 0);
	if (client_sock < 0) {
		perror("create socket error");
		exit(EXIT_FAILURE);
	}

	bzero(&client_addr, sizeof(client_addr));
	client_addr.sin_family = AF_INET;
	client_addr.sin_addr.s_addr = htons(INADDR_ANY);
	client_addr.sin_port = htons(0);
	if (bind(client_sock, (struct sockaddr*)&client_addr, sizeof(client_addr)) < 0) {
		perror("bind error");
		exit(EXIT_FAILURE);
	}
	return client_sock;
}

        然後通過connect_server連線伺服器

void 
connect_server(int client_sock) {
	struct sockaddr_in server_addr;
	bzero(&server_addr, sizeof(server_addr));
	server_addr.sin_family = AF_INET;
	if (inet_aton("127.0.0.1", &server_addr.sin_addr) == 0) {
		perror("set server ip error");
		exit(EXIT_FAILURE);
	}
	server_addr.sin_port = htons(PORT);
	if (connect(client_sock, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {
		perror("client connect server error");
		exit(EXIT_FAILURE);
	}
}

        最後通過client_write和client_read和伺服器通訊。這兩個函式都是呼叫上面介紹的write_data和read_data,所以沒什麼好講的。

int
client_read(int filedes) {
	return read_data(filedes, 0);
}

int
client_write(int filedes) {
	return write_data(filedes, 0);
}

        我們啟動一千個執行緒,傳送30萬次請求。看看樸素模型的處理能力。

        首先我們看看伺服器的結果列印

        可以發現穩定的處理能力大概在每秒14000~15000左右。

        我們再看看客戶端的列印


        我們發現其傳送頻率差不多也是14000~16000。這兒要說明下,因為客戶端是同步模型,伺服器也是同步模型,所以這個速率是伺服器處理的峰值。否則按照設定的1微秒的等待時間,1000個執行緒一秒鐘傳送的請求數肯定不止15000。我使用過兩個測試程序同時去壓,也驗證了其最大的處理能力也就是在14000~15000左右(在我的配置環境下)。

        我們發現,使用樸素模型實現網路通訊是非常方便的。但是這個模型有個明顯的缺點,就是一次只能處理一個請求——即接收請求、讀socket、寫socket是序列執行的。除非使用執行緒池去優化這個流程,否則在單執行緒的情況下,似乎就不能解決這個問題了。科技總是進步的,我們將在下一節講解Select模型,它就可以解決這個問題。