I/O複用(一)——Select
為什麼要有I/O複用
從多程序多執行緒到程序池執行緒池,儘管我們處理事件的效率越來越高,但是卻有一個問題,一直都沒有解決,那就是,當伺服器分配了一個執行緒或程序為某一個客戶端服務時,該程序/執行緒就相當於與這個客戶端綁定了,除非客戶端斷開連線,否則不管有沒有請求事件,這個執行緒/程序都只能為這個客戶端處理事件,我們知道,系統允許我們建立的執行緒/程序是有限的,這樣其實會造成資源的浪費,所以,我們需要一套機制,在客戶端沒有請求事件的時候,該執行緒/程序可以去處理其他客戶端的事件,也就是我們需要將多個檔案描述符同時監聽,當由請求時,去處理髮出請求的客戶端的事件。所以就有了我們的I/O複用。
什麼是I/O複用
I/O複用的本質就是一個執行緒監聽多個檔案描述符,我們可以這樣理解,多執行緒多程序程式設計,就是老師對學生一對一輔導,但是不是所有的是時候學生都有問題,老師就閒了,會浪費很大的師資力量,而我們的I/O複用,就是一個老師輔導多個學生,學生有問題提出問題(發出請求事件),老師處理完就可以處理別的學生的問題了。然後就有人說了,I/O理解起來太麻煩,我用迴圈,輪詢一遍就能找到該處理哪個事件了,但是你見過有老師一個一個問學生你有問題嗎,那老師知道有人問問題就挨個問一遍,他還有力氣處理問題麼,同樣的道理,我們的CPU那麼可愛又忙碌,它時來處理事件的,不是挨個打招呼的,所以我們就需要直接告知CPU處理那個問題。
通過上面這個簡單的小故事,我們可以理解,I/O複用的兩個好處
一 它減少了程序/執行緒的建立數量,節省了我們的記憶體資源,使得資源更加高效的被利用。
二 它減少了CPU的工作壓力,CPU只需要做自己最擅長的時就好。
I/O實現
其實在linux下I/O複用的實現有三個系統呼叫:poll,epoll,select,其中epoll為linux特有,今天我們就先來說說select。
select系統呼叫的用途是:在一段指定時間內,監聽使用者感興趣的檔案描述符上的可讀、可寫和異常等事件。
我們先來看一下select系統呼叫的原型:
(1)nfds 引數指定被監聽的檔案描述符的總數。它通常被設定成監聽的所有檔案描述符中的最大值加一,因為檔案描述符的計數從0開始。
(2)readfds,writefds,exceptfds分別表示可讀,可寫,和異常等事件對應的檔案描述符集合。
(3)timeout用來設定select函式的超時時間。
select成功時返回就緒檔案描述符的總數,如果在超時時間內沒有任何檔案描述符就緒,select返回0,select失敗返回-1並且設定errno。
這裡面值得我們關注的有一個很重要的結構體fd_set,該結構體定義如下:
我們可以看到,其實這個結構體僅僅包含了一個型別為long int的陣列,陣列長度為32;這個陣列的每個元素的每一位標記一個檔案描述符,也就是說做多監聽1024個檔案描述符,檔案描述符最大是1023。
我們都知道,這樣的機制下,位操作相對來說是比較麻煩的,所以系統就很善良的為我們提供了一系列巨集來訪問fd_set中的位:
另外一個值得關注的結構體就是我們的timeval,定義如下:
timeout有三種取值:
- NULL select一直阻塞,直到readfds、writefds、exceptfds集合中至少一個檔案描述符就緒select才返回。
- 0 select不阻塞。
- timeout_value select在timeout_value這個時間段內阻塞,有事件就緒就返回或者超時返回。
select的應用——實現I/O複用
在實現之前我們需要注意幾點問題:
(1) select返回是全部的檔案描述符,我們需要輪詢確定是哪個檔案描述符上的事件發生
(2) select返回時會將fd_set結構的變數更改,所以我們每次select之前需要重置fd_set的值
好了,看程式碼嘍
#include<stdlib.h>
#include<stdio.h>
#include<string.h>
#include<assert.h>
#include<arpa/inet.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<sys/select.h>
void INIT_fds(int *fds,int length)
{
int i=0;
for(;i<length;i++)
{
fds[1]=-1;
}
}
void Insert_fd(int *fds,int fd,int length)
{
int i=0;
for(;i<length;i++)
{
if(fds[i]==-1)
{
fds[i]=fd;
break;
}
}
}
void Delete(int *fds,int fd,int len)
{
int i=0;
for(;i<len;i++)
{
if(fds[i]==fd)
{
fds[i]=-1;
break;
}
}
}
int main()
{
int sockfd=socket(AF_INET,SOCK_STREAM,0);
assert(sockfd!=-1);
struct sockaddr_in ser,cli;
memset(&ser,0,sizeof(ser));
ser.sin_family=AF_INET;
ser.sin_port=htons(6000);
ser.sin_addr.s_addr=inet_addr("127.0.0.1");
int res=bind(sockfd,(struct sockaddr*)&ser,sizeof(ser));
assert(res !=-1);
listen(sockfd,5);
fd_set readfds;
int fds[100];
INIT_fds(fds,100);
Insert_fd(fds,sockfd,100);
while(1)
{
int max=-1;
FD_ZERO(&readfds);
int i=0;
for(;i<100;i++)
{
if(fds[i]!=-1)
{
if(fds[i]>max)
{
max=fds[i];
}
FD_SET(fds[i],&readfds);
}
}
int n=select(max+1,&readfds,NULL,NULL,NULL);
if(n<0)
{
printf("select fail\n");
continue;
}
int c;
for(i=0;i<100;i++)
{
if(fds[i]!=-1&&FD_ISSET(fds[i],&readfds))
{
if(fds[i]==sockfd)
{
int len =sizeof(cli);
c=accept(sockfd,(struct sockaddr*)&cli,&len);
if(c<0)
{
printf("one link error\n");
continue;
}
Insert_fd(fds,c,100);
}
else
{
int fd=fds[i];
char recvbuff[128]={0};
int n = recv(c,recvbuff,127,0);
if(n<=0)
{
close(fd);
Delete(fds,fd,100);
continue;
}
printf("%d: %s\n",c,recvbuff);
send(c,"ok",2,0);
}
}
}
}
}