1. 程式人生 > >socket通訊之五:select多路複用的客戶/伺服器模型

socket通訊之五:select多路複用的客戶/伺服器模型

前面一篇介紹了伺服器端使用多執行緒的方式來處理多個客戶端的請求的,但是當客戶端數量增多時執行緒數量會急劇增加,導致消耗大量的資源。

於是就引出了伺服器端的一種新的模型。

1. 阻塞與非阻塞

首先介紹幾個基本的概念。

阻塞方式( block ),顧名思義,就是程序或是執行緒執行到這些函式時必須等待某個事件的發生,假如事件沒有發生,程序或執行緒就被阻塞,函式不能立即返回。

非阻塞方式( non-block ),就是程序或執行緒執行此函式時不必非要等待事件的發生,一旦執行肯定返回,以返回值的不同來反映函式的執行情況,假如事件發生則和阻塞方式相同,若事件沒有發生則返回一個程式碼來告知事件未發生,而程序或執行緒繼續執行,所以效率較高。

套接字相關函式預設時採用阻塞方式操作,而大多數情況下,程式不僅僅只擁有一個套接字。當程序以阻塞方式操作其中之一時,必將導致不能使用其他的套接字。如果希望這些套接字同時工作,就必須設計併發的套接字程式,即在一個套接字讀寫的同時保證另一個套接字也能正常地操作。

多路複用函式 select 把一些檔案描述符集合在一起,如果某個檔案描述符的狀態發生變化,比如進入“寫就緒”或者“讀就緒”狀態,函式 select 會立即返回,並且通知程序讀取或寫入資料;如果沒有 I/O 到達,程序將進入阻塞,直到函式 select 超時退出為止。


套接字也是檔案,函式 select 也支援套接字描述符,並且可以應用在 TCP 協議和 UDP 協議的套接字中。利用多路複用,程序就可以同時監控多個套接字資訊,在多個套接字上併發地執行操作。

2.select介紹

select的大概思想:將多個套接字放在一個集合裡,然後統一檢查這些套接字的狀態(可讀、可寫、異常等),呼叫select後,會更新這些套接字的狀態,然後做判斷,如果套接字可讀,就執行read操作。這樣就巧妙地避免了阻塞,達到同時處理多個連線的目的。當然如果沒有事件發生,select會一直阻塞,如果不想一直讓它等待,想去處理其它事情,可以設定一個最大的等待時間。

  1. int select(int maxfdp,fd_set *readfds,fd_set *writefds,fd_set *errorfds,struct timeval *timeout);  
  2. /*引數列表
     
  3. int maxfdp是一個整數值,是指集合中所有檔案描述符的範圍,即所有檔案描述符的最大值加1,不能錯!在Windows中這個引數的值無所謂,可以設定不正確。  
  4. fd_set *readfds是指向fd_set結構的指標,這個集合中應該包括檔案描述符,我們是要監視這些檔案描述符的讀變化的,即我們關心是否可以從這些檔案中讀取資料了,如果這個集合中有一個檔案可讀,select就會返回一個大於0的值,表示有檔案可讀,如果沒有可讀的檔案,則根據timeout引數再判斷是否超時,若超出timeout的時間,select返回0,若發生錯誤返回負值。可以傳入NULL值,表示不關心任何檔案的讀變化。  
  5. fd_set *writefds是指向fd_set結構的指標,這個集合中應該包括檔案描述符,我們是要監視這些檔案描述符的寫變化的,即我們關心是否可以向這些檔案中寫入資料了,如果這個集合中有一個檔案可寫,select就會返回一個大於0的值,表示有檔案可寫,如果沒有可寫的檔案,則根據timeout引數再判斷是否超時,若超出timeout的時間,select返回0,若發生錯誤返回負值。可以傳入NULL值,表示不關心任何檔案的寫變化。  
  6. fd_set *errorfds同上面兩個引數的意圖,用來監視檔案錯誤異常。  
  7. struct timeval* timeout是select的超時時間,這個引數至關重要,它可以使select處於三種狀態: 
  8. 第一,若將NULL以形參傳入,即不傳入時間結構,就是將select置於阻塞狀態,一定等到監視檔案描述符集合中某個檔案描述符發生變化為止; 
  9. 第二,若將時間值設為0秒0毫秒,就變成一個純粹的非阻塞函式,不管檔案描述符是否有變化,都立刻返回繼續執行,檔案無變化返回0,有變化返回一個正值; 
  10. 第三,timeout的值大於0,這就是等待的超時時間,即 select在timeout時間內阻塞,超時時間之內有事件到來就返回了,否則在超時後不管怎樣一定返回,返回值同上述。 
  11. */  
  12. /* 
  13. 返回值:  
  14. 負值:select錯誤 
  15. 正值:某些檔案可讀寫或出錯 
  16. 0:等待超時,沒有可讀寫或錯誤的檔案 
  17. */  

上面有兩個結構體。

第一, struct timeval 代表時間值

  1. struct timeval{  
  2.       long tv_sec;/* 秒 */  
  3.       long tv_usec;/* 毫秒 */  
  4. };  

第二, struct fd_set 能夠理解為檔案描述符的集合

  1. #ifndef FD_SETSIZE  
  2. #define FD_SETSIZE      64  
  3. #endif /* FD_SETSIZE */  
  4. typedef struct fd_set {  
  5.         u_int fd_count;               /* how many are SET? */  
  6.         SOCKET  fd_array[FD_SETSIZE];   /* an array of SOCKETs */  
  7. } fd_set;  


fd_set 集合能夠通過一些巨集由人為來操作,比如清空集合 FD_ZERO(fd_set *) ,將一個給定的檔案描述符加入集合之中 FD_SET(int ,fd_set*) ,將一個給定的檔案描述符從集合中刪除 FD_CLR(int,fd_set*),檢查集合中指定的檔案描述符是否能夠讀寫 FD_ISSET(int ,fd_set* ) 。

3.操作fd_set的巨集

下面是這些巨集的詳細描述:

  1. FD_ZERO(fd_set * set),是把集合清空(初始化為0,確切的說,是把集合中的元素個數初始化為0,並不修改描述符陣列).使用集合前,必須用FD_ZERO初始化,否則集合在棧上作為自動變數分配時,fd_set分配的將是隨機值,導致不可預測的問題。
  2. FD_SET(int s,fd_set * set),向集合中加入一個套介面描述符(如果該套介面描述符s沒在集合中,並且陣列中已經設定的個數小於最大個數時,就把該描述符加入到集合中,集合元素個數加1)。這裡是將s的值直接放入陣列中。
  3. FD_ISSET(int s,fd_set * set),檢查描述符是否在集合中,如果在集合中返回非0值,否則返回0. 它的巨集定義並沒有給出具體實現,但實現的思路很簡單,就是搜尋集合,判斷套接字s是否在陣列中。
  4. FD_CLR(int s,fd_set * set),從集合中移出一個套介面描述符(比如一個套接字連線中斷後,就應該移除它)。實現思路是,在陣列集合中找到對應的描述符,然後把後面的描述依次前移一個位置,最後把描述符的個數減1。

下面是使用這些巨集的基本方式。

  1. 呼叫FD_ZERO來初始化fd_set;
  2. 呼叫FD_SET將感興趣的套接字描述符加入fd_set集合中(每次迴圈都要重新加入,因為select更新後,會將一些沒有滿足條件的套接字移除佇列);
  3. 設定等待時間後,呼叫select函式--更新套接字的狀態;
  4. 呼叫FD_ISSET,來判斷套接字是否有相應狀態,然後做相應操作,比如,如果套接字可讀,就呼叫recv函式去接收資料。

4.自定義管理套接字的集合類SocketList

從上面可以看出,我們需要維護一個套接字集合,這個套接字的集合裡存放的是和伺服器建立連線的套接字,伺服器每次迴圈呼叫select()時首先需要使用FD_ZERO巨集來初始化fd_set物件,然後呼叫FD_SET將我們維護的這個套接字集合中的套接字加入fd_set這個集合中,然後就可以呼叫select函數了,呼叫完select函式後,再次遍歷我們維護的這個套接字集合,通過FD_ISSET巨集來判斷這個套接字是否是我們需要進行處理的,如果是需要進行處理的,那麼就對它進行處理。

所以現在這裡的關鍵就是要維護一個套接字的集合,我把它定義為SocketList這個類,下面是我實現的C++類。這個套接字的集合類可以很方便地實現下面的功能:

  1. 新增socket
  2. 刪除socket
  3. 將前類中的所有感興趣的socket加入到fd_set中

5.伺服器端的實現

仍然是在原來服務端的第6步進行更改,但是在這之前啟動了一個新的執行緒workThread,並且服務端也只需要啟動一個這樣的執行緒用來訪問SocketList的物件。此時第6步就只需要將當前建立的socket新增到SocketList這個物件中。在workThread這個執行緒中訪問SocketList的物件,在這個執行緒中呼叫select()函式即可。

服務端程式碼:

  1. #include <stdio.h>  
  2. #include <stdlib.h>  
  3. #include <WinSock2.h>  
  4. #include <iostream>  
  5. #pragma comment(lib, "ws2_32.lib")  
  6. using namespace std;  
  7. #define  PORT 6000  
  8. //#define  IP_ADDRESS "10.11.163.113"  //表示伺服器端的地址  
  9. #define  IP_ADDRESS "127.0.0.1"  //直接使用本機地址  
  10. class SocketList  
  11. {  
  12.     private:  
  13.     int num;//記錄socket的真實數目  
  14.     SOCKET socketArray[FD_SETSIZE];//存放socket的陣列  
  15.     public:  
  16.     SOCKET getSocket(int i)  
  17.     {  
  18.         return socketArray[i];  
  19.     }  
  20.     //建構函式中對兩個成員變數進行初始化  
  21.     SocketList()  
  22.     {  
  23.         num=0;  
  24.         for (int i=0;i<FD_SETSIZE;i++)  
  25.         {  
  26.             //因為socket的值是一個非負整數值,所以可以將socketArray初始化為0,讓它來表示陣列中的這一個元素有沒有被使用  
  27.             socketArray[i]=0;  
  28.         }  
  29.     }  
  30.     //往socketArray中新增一個socket  
  31.     void insertSocket(SOCKET s)  
  32.     {  
  33.         for (int i=0;i<FD_SETSIZE;i++)  
  34.         {  
  35.             //如果某一個socketArray[i]為0,表示哪一個位可以放入socket  
  36.             if (socketArray[i]==0)  
  37.             {  
  38.                 socketArray[i]=s;  
  39.                 num++;  
  40.                 break;//這裡一定要加上break,不然一個socket會放在socketArray的多個位置上  
  41.             }  
  42.         }  
  43.     }  
  44.     //從socketArray中刪除一個socket  
  45.     void deleteSocket(SOCKET s)  
  46.     {  
  47.         for (int i=0;i<FD_SETSIZE;i++)  
  48.         {  
  49.             if (socketArray[i]==s)  
  50.             {  
  51.                 socketArray[i]=0;  
  52.                 num--;  
  53.                 break;  
  54.             }  
  55.         }  
  56.     }  
  57.     //將socketArray中的套接字放入fd_list這個結構體中  
  58.     void makefd(fd_set * fd_list)  
  59.     {  
  60.             FD_ZERO(fd_list);//首先將fd_list清0  
  61.             //將每一個socket加入fd_list中  
  62.             for (int i=0;i<FD_SETSIZE;i++)  
  63.             {  
  64.                 if (socketArray[i]>0)  
  65.                 {  
  66.                     FD_SET(socketArray[i],fd_list);  
  67.                 }  
  68.             }  
  69.     }  
  70. };  
  71. //使用這一個執行緒來通過select來處理多個socket  
  72. DWORD WINAPI workThread(LPVOID lpParam)  
  73. {  
  74.         //傳遞進來的socketList指標  
  75.         SocketList * socketList=(SocketList *)lpParam;  
  76.         int err=0;  
  77.         fd_set fdread;//存在讀檔案的set,select會檢測這個set中是否可以從某些socket中讀入資訊  
  78.         struct timeval timeout;//設定select超時的時間  
  79.         timeout.tv_sec=6;  
  80.         timeout.tv_usec=0;  
  81.         //輸入輸出緩衝區  
  82.         char receBuff[MAX_PATH];  
  83.         char sendBuf[MAX_PATH];  
  84.         SOCKET socket;  
  85.         while(true)  
  86.         {  
  87.                 socketList->makefd(&fdread);  
  88.                 err=select(0,&fdread,NULL,NULL,&timeout);  
  89.                 if (err==0)//select返回0表示超時  
  90.                 {  
  91.                     cout<<"select() is time-out!"<<endl;  
  92.                     continue;  
  93.                 }  
  94.                 else  
  95.                 {  
  96.                     //遍歷socketList中的每一個socket,檢視那些socket是可讀的,處理可讀的socket  
  97.                     //從中讀取資料到緩衝區,併發送資料給客戶端  
  98.                         for (int i=0;i<FD_SETSIZE;i++)  
  99.                         {  
  100.                             //讀取有效的socket  
  101.                             if (socketList->getSocket(i)==0)  
  102.                                     continue;  
  103.                             socket=socketList->getSocket(i);  
  104.                             //判斷哪些socket是可讀的,如果這個socket是可讀的,從它裡面讀取資料  
  105.                             if (FD_ISSET(socket,&fdread))  
  106.                             {  
  107.                                 err=recv(socket,receBuff,MAX_PATH,0);  
  108.                                 //如果返回值表示要關閉這個連線,那麼關閉它,並將它從sockeList中去掉  
  109.                                 if (err==0||err==SOCKET_ERROR)  
  110.                                 {  
  111.                                     closesocket(socket);  
  112.                                     cout<<"斷開連線!"<<endl;  
  113.                                     socketList->deleteSocket(socket);  
  114.                                 }  
  115.                                 else  
  116.                                 {  
  117.                                     cout<<"message from client:"<<receBuff<<endl;  
  118.                                     strcpy(sendBuf,"server receive a message:");  
  119.                                     strcat(sendBuf,receBuff);  
  120.                                     //傳送資料  
  121.                                     send(socket,sendBuf,strlen(sendBuf)+1,0);   //第三個引數加上1是為了將字串結束符'\0'也傳送過去     
  122.                                 }  
  123.                             }//end if  
  124.                         }//end for  
  125.                 }//end if (err==0)  
  126.         }//end while  
  127.         return 0;  
  128. }  
  129. void main()  
  130. {  
  131.     WSADATA wsaData;  
  132.     int err;  
  133.     //1.載入套接字型檔  
  134.     err=WSAStartup(MAKEWORD(1,1),&wsaData);  
  135.     if (err!=0)  
  136.     {  
  137.         cout<<"Init Windows Socket Failed::"<<GetLastError()<<endl;  
  138.         return ;  
  139.     }  
  140.     //2.建立socket  
  141.     //套接字描述符,SOCKET實際上是unsigned int  
  142.     SOCKET serverSocket;  
  143.     serverSocket=socket(AF_INET,SOCK_STREAM,0);  
  144.     if (serverSocket==INVALID_SOCKET)  
  145.     {  
  146.         cout<<"Create Socket Failed::"<<GetLastError()<<endl;  
  147.         return ;  
  148.     }  
  149.     //伺服器端的地址和埠號  
  150.     struct sockaddr_in serverAddr,clientAdd;  
  151.     serverAddr.sin_addr.s_addr=inet_addr(IP_ADDRESS);  
  152.     serverAddr.sin_family=AF_INET;  
  153.     serverAddr.sin_port=htons(PORT);  
  154.     //3.繫結Socket,將Socket與某個協議的某個地址繫結  
  155.     err=bind(serverSocket,(struct sockaddr*)&serverAddr,sizeof(serverAddr));  
  156.     if (err!=0)  
  157.     {  
  158.         cout<<"Bind Socket Failed::"<<GetLastError()<<endl;  
  159.         return ;  
  160.     }  
  161.     //4.監聽,將套接字由預設的主動套接字轉換成被動套接字  
  162.     err=listen(serverSocket,10);  
  163.     if (err!=0)  
  164.     {  
  165.         cout<<"listen Socket Failed::"<<GetLastError()<<endl;  
  166.         return ;  
  167.     }  
  168.     cout<<"伺服器端已啟動......"<<endl;  
  169.     int addrLen=sizeof(clientAdd);  
  170.     SOCKET sockConn;  
  171.     SocketList socketList;  
  172.     HANDLE hThread=CreateThread(NULL,0,workThread,&socketList,0,NULL);  
  173.     if (hThread==NULL)  
  174.     {  
  175.         cout<<"Create Thread Failed!"<<endl;  
  176.     }  
  177.     CloseHandle(hThread);  
  178.     while(true)  
  179.     {  
  180.         //5.接收請求,當收到請求後,會將客戶端的資訊存入clientAdd這個結構體中,並返回描述這個TCP連線的Socket  
  181.         sockConn=accept(serverSocket,(struct sockaddr*)&clientAdd,&addrLen);  
  182.         if (sockConn==INVALID_SOCKET)  
  183.         {  
  184.             cout<<"Accpet Failed::"<<GetLastError()<<endl;  
  185.             return ;  
  186.         }  
  187.         cout<<"客戶端連線:"<<inet_ntoa(clientAdd.sin_addr)<<":"<<clientAdd.sin_port<<endl;  
  188.         //將之前的第6步替換成了上面啟動workThread這個執行緒函式和下面這一行程式碼  
  189.         //將socket放入socketList中  
  190.         socketList.insertSocket(sockConn);  
  191.     }  
  192.     closesocket(serverSocket);  
  193.     //清理Windows Socket庫  
  194.     WSACleanup();  
  195. }  

客戶端的程式碼和前面是一樣的,這裡就不再列出來了。

程式執行的效果:

上面伺服器端顯示selecet() is time-out!是指超過了你設定的傳遞給select的timeout時間後仍然沒有socket傳送資料過來,此時select會返回,但是可以從這個返回的狀態碼中判斷是否有socket可讀。

可執行檔案可以在這裡下載,工程檔案可以在這裡下載。