1. 程式人生 > >非阻塞套接字資料收集

非阻塞套接字資料收集



1、 兩種I/O模式

    * 阻塞模式:  執行I/O操作完成前會一直進行等待,不會將控制權交給程式。套接字預設為阻塞模式。可以通過多執行緒技術進行處理。
    * 非阻塞模式:執行I/O操作時,Winsock函式會返回並交出控制權。這種模式使用起來比較複雜,因為函式在沒有執行完成就進行返回,會不斷地返回 WSAEWOULDBLOCK錯誤。但功能強大。
                 為了解決這個問題,提出了進行I/O操作的一些I/O模型,下面介紹最常見的三種:

◆、select模型:

  通過呼叫select函式可以確定一個或多個套接字的狀態,判斷套接字上是否有資料,或者能否向一個套接字寫入資料。

      int select(
        int nfds,                          
        fd_set FAR *readfds,              
        fd_set FAR *writefds,             
        fd_set FAR *exceptfds,            
        const struct timeval FAR *timeout 
      );

   
    nfds
        [in] Ignored. The nfds parameter is included only for compatibility with Berkeley sockets.
    readfds
        [in, out] Optional pointer to a set of sockets to be checked for readability.
    writefds
        [in, out] Optional pointer to a set of sockets to be checked for writability
    exceptfds
        [in, out] Optional pointer to a set of sockets to be checked for errors.
    timeout
        [in] Maximum time for select to wait, provided in the form of a TIMEVAL structure. Set the timeout parameter to NULL for blocking operation.


readfds:

    If listen has been called and a connection is pending, accept will succeed.
    Data is available for reading (includes OOB data if SO_OOBINLINE is enabled).
    Connection has been closed/reset/terminated.
   
writefds:

    If processing a connect call (nonblocking), connection has succeeded.
    Data can be sent.
   
exceptfds:
    If processing a connect call (nonblocking), connection attempt failed.
    OOB data is available for reading (only if SO_OOBINLINE is disabled).




The select function returns the total number of socket handles that are ready and contained in the fd_set structures,
zero if the time limit expired, or SOCKET_ERROR if an error occurred.


The fd_set structure is used by various Windows Sockets functions and service providers, such as the select function,
to place sockets into a "set" for various purposes, such as testing a given socket for readability using the readfds parameter of the select function.

typedef struct fd_set {
  u_int    fd_count;                 // how many are SET?
  SOCKET   fd_array[FD_SETSIZE];     // an array of SOCKETs
} fd_set;


Members:
    fd_count
        Number of sockets in the set.
    fd_array
        Array of sockets that are in the set.


The macros to manipulate and check fd_set contents are:

FD_CLR(s, *set)
    Removes the descriptor s from set.
FD_ISSET(s, *set)
    Nonzero if s is a member of the set. Otherwise, zero.
FD_SET(s, *set)
    Adds descriptor s to set.
FD_ZERO(*set)
    Initializes the set to the NULL set.



readfds、writefds、exceptfds三個變數至少有一個不為空,同時這個不為空的套接字組種至少有一個socket,道理很簡單,否則要select幹什麼呢。
舉例:測試一個套接字是否可讀:

fd_set fdread;

FD_ZERO(&fdread);
FD_SET(s,&fdread); //加入套接字

if(select(0,&fdread,NULL,NULL,NULL)>0
{
        //成功
        if(FD_ISSET(s,&fread) //是否存在fread中,詳細定義請看winsock2.h
        {
                //是可讀的
        }
}

◆I/O操作函式:主要用於獲取與套接字相關的操作引數。

The ioctlsocket function controls the I/O mode of a socket.

int ioctlsocket(
  SOCKET s,        
  long cmd,        
  u_long FAR *argp 
);
 

s為I/O操作的套接字。
cmd為對套接字的操作命令。
argp為命令所帶引數的指標。

The ioctlsocket function can be used on any socket in any state. It is used to set or retrieve operating parameters associated with the socket
,independent of the protocol and communications subsystem. Here are the supported commands to use in the cmd parameter and their semantics:

FIONBIO
    Use with a nonzero argp parameter to enable the nonblocking mode of socket s.
    The argp parameter is zero if nonblocking is to be disabled.
    When a socket is created, it operates in blocking mode by default (nonblocking mode is disabled).
   
    To set the socket back to blocking mode, an application must first disable WSAAsyncSelect by calling WSAAsyncSelect with the lEvent parameter equal to zero,
    or disable WSAEventSelect by calling WSAEventSelect with the lNetworkEvents parameter equal to zero.
   
FIONREAD
    Use to determine the amount of data pending in the network's input buffer that can be read from socket s.
    The argp parameter points to an unsigned long value in which ioctlsocket stores the result.
    FIONREAD returns the amount of data that can be read in a single call to the recv function,
    which may not be the same as the total amount of data queued on the socket.
   
    If s is message oriented (for example, type SOCK_DGRAM), FIONREAD still returns the amount of pending data in the network buffer
    , however, the amount that can actually be read in a single call to the recv function is limited to the data size written in the send or sendto function call.

SIOCATMARK

    Use to determine whether or not all OOB data has been read. (See section Windows Sockets 1.1 Blocking Routines and EINPROGRESS for a discussion on out of band (OOB) data.)
    This applies only to a stream oriented socket (for example, type SOCK_STREAM) that has been configured for in-line reception of any OOB data (SO_OOBINLINE). If no OOB data is waiting to be read, the operation returns TRUE. Otherwise, it returns FALSE, and the next recv or recvfrom performed on the socket will retrieve some or all of the data preceding the mark. The application should use the SIOCATMARK operation to determine whether any data remains. If there is any normal data preceding the urgent (out of band) data, it will be received in order. (A recv or recvfrom will never mix OOB and normal data in the same call.)
    The argp parameter points to an unsigned long value in which ioctlsocket stores the Boolean result.




◆、WSAAsynSelect模型:
WSAAsynSelect模型也是一個常用的非同步I/O模型。應用程式可以在一個套接字上接收以
WINDOWS訊息為基礎的網路事件通知。該模型的實現方法是通過呼叫WSAAsynSelect函
數 自動將套接字設定為非阻塞模式,並向WINDOWS註冊一個或多個網路時間,並提供一
個通知時使用的視窗控制代碼。當註冊的事件發生時,對應的視窗將收到一個基於訊息的通知。

      int  WSAAsyncSelect( SOCKET s, HWND hWnd, u_int wMsg, long lEvent);      

s為需要事件通知的套接字
hWnd為接收訊息的視窗控制代碼
wMsg為要接收的訊息
lEvent為掩碼,指定應用程式感興趣的網路事件組合,主要如下:

#define FD_READ_BIT 0
#define FD_READ (1 << FD_READ_BIT)
#define FD_WRITE_BIT 1
#define FD_WRITE (1 << FD_WRITE_BIT)
#define FD_OOB_BIT 2
#define FD_OOB (1 << FD_OOB_BIT)
#define FD_ACCEPT_BIT 3
#define FD_ACCEPT (1 << FD_ACCEPT_BIT)
#define FD_CONNECT_BIT 4
#define FD_CONNECT (1 << FD_CONNECT_BIT)
#define FD_CLOSE_BIT 5
#define FD_CLOSE (1 << FD_CLOSE_BIT)

用法:要接收讀寫通知:

int nResult= WSAAsyncSelect(s,hWnd,wMsg,FD_READ|FD_WRITE);
if(nResult==SOCKET_ERROR)
{
        //錯誤處理
}

取消通知:

      int nResult= WSAAsyncSelect(s,hWnd,0,0);

當應用程式視窗hWnd收到訊息時,wMsg.wParam引數標識了套接字,lParam的低字標明
了網路事件,高字則包含錯誤程式碼。

◆、WSAEventSelect模型
WSAEventSelect模型類似WSAAsynSelect模型,但最主要的區別是網路事件發生時會被髮
送到一個事件物件控制代碼,而不是傳送到一個視窗。

使用步驟如下:
a、 建立事件物件來接收網路事件:

#define WSAEVENT HANDLE
#define LPWSAEVENT LPHANDLE
WSAEVENT WSACreateEvent( void );

該函式的返回值為一個事件物件控制代碼,它具有兩種工作狀態:已傳信(signaled)和未傳信
(nonsignaled)以及兩種工作模式:人工重設(manual reset)和自動重設(auto reset)。預設未
未傳信的工作狀態和人工重設模式。

b、將事件物件與套接字關聯,同時註冊事件,使事件物件的工作狀態從未傳信轉變未
已傳信。

      int  WSAEventSelect( SOCKET s,WSAEVENT hEventObject,long lNetworkEvents ); 

s為套接字
hEventObject為剛才建立的事件物件控制代碼
lNetworkEvents為掩碼,定義如上面所述

c、I/O處理後,設定事件物件為未傳信

BOOL WSAResetEvent( WSAEVENT hEvent );

Hevent為事件物件

成功返回TRUE,失敗返回FALSE。

d、等待網路事件來觸發事件控制代碼的工作狀態:

DWORD WSAWaitForMultipleEvents( DWORD cEvents,
const WSAEVENT FAR * lphEvents, BOOL fWaitAll,
DWORD dwTimeout, BOOL fAlertable );

lpEvent為事件控制代碼陣列的指標
cEvent為為事件控制代碼的數目,其最大值為WSA_MAXIMUM_WAIT_EVENTS
fWaitAll指定等待型別:TRUE:當lphEvent陣列重所有事件物件同時有訊號時返回;
FALSE:任一事件有訊號就返回。
dwTimeout為等待超時(毫秒)
fAlertable為指定函式返回時是否執行完成例程

對事件陣列中的事件進行引用時,應該用WSAWaitForMultipleEvents的返回值,減去
預宣告值WSA_WAIT_EVENT_0,得到具體的引用值。例如:

nIndex=WSAWaitForMultipleEvents(…);
MyEvent=EventArray[Index- WSA_WAIT_EVENT_0];

e、判斷網路事件型別:

int WSAEnumNetworkEvents( SOCKET s,
WSAEVENT hEventObject, LPWSANETWORKEVENTS lpNetworkEvents );

s為套接字
hEventObject為需要重設的事件物件
lpNetworkEvents為記錄網路事件和錯誤程式碼,其結構定義如下:

typedef struct _WSANETWORKEVENTS {
        long lNetworkEvents;
        int iErrorCode[FD_MAX_EVENTS];
} WSANETWORKEVENTS, FAR * LPWSANETWORKEVENTS;

f、關閉事件物件控制代碼:

BOOL WSACloseEvent(WSAEVENT hEvent);

呼叫成功返回TRUE,否則返回FALSE




使用阻塞套接字。阻塞套接字可能會對應用程式的效能造成影響,在阻塞套接字時,程式必須等待SOCKET處理完畢或者出錯後才返回,這段時間CPU就無聊的等待,對於非阻塞可以在這段時間做一些資料準備工作,一旦SOCKET可以用就立即傳送資料,並行性更好

非阻塞在單socket的情況下操作起來比阻塞要複雜點,但是對於多個socket並且要求實時性比較高的場合下基本上阻塞就不能用。譬如,socket A先進行了一個阻塞操作,那麼不超時/不結束呼叫的話,程式碼的控制權不會返回,那麼B socket上面就算有資料需要被接受並且處理,由於已經阻塞在A上面了,此時沒有辦法去處理B上面的資料。而非阻塞的時候呼叫立即返回,B就可以被處理。



非陰塞的伺服器程式: 
  #include   <winsock2.h> 
  #include   <stdio.h> 
  
  
  #define   MYPORT   5550         //   定義預設通訊埠 
  
  void   main(void) 
  { 
        WSADATA                             wsaData; 
        SOCKET                               ListeningSocket;     //   伺服器套接字 
        SOCKET                               NewConnection;       //   客戶機套接字 
        SOCKADDR_IN                     ServerAddr;     
        SOCKADDR_IN                     ClientAddr;     
        int                                     Ret,   ClientAddrLen; 
        char                                   DataBuffer[1024];     //   接收資料的緩衝區 
        fd_set                               readfds;                 //   等待可讀性檢查的套介面結構體 
        unsigned   long                 ul   =   1;               
        struct   timeval               timeout;               //   最多等待時間 
  
        printf("/n------------------非阻塞模式的套接字(伺服器)------------------/n/n"); 
  
        //   初始化   Winsock   2.2   版本 
        if   ((Ret   =   WSAStartup(MAKEWORD(2,2),   &wsaData))   !=   0) 
        { 
              printf("WSAStartup   failed   with   error   %d/n",   Ret); 
              return; 
        } 
    
        //   建立一個新套接字來監聽客戶機的連線 
        if   ((ListeningSocket   =   socket(AF_INET,   SOCK_STREAM,   IPPROTO_TCP))   ==   INVALID_SOCKET) 
        { 
              printf("socket   failed   with   error   %d/n",   WSAGetLastError()); 
              WSACleanup(); 
              return; 
        }   
  
        //   初始化一個   SOCKADDR_IN   結構 
        ServerAddr.sin_family   =   AF_INET;       //   使用IP地址族 
        ServerAddr.sin_port   =   htons(MYPORT);     //   通訊埠(5550)     
        ServerAddr.sin_addr.s_addr   =   htonl(INADDR_ANY);     //   使用自己的IP地址,實際預設為0 
  
        //   將這個地址資訊和套接字關聯起來 
        if   (bind(ListeningSocket,   (SOCKADDR   *)&ServerAddr,   sizeof(ServerAddr))   ==   SOCKET_ERROR) 
        { 
              printf("bind   failed   with   error   %d/n",   WSAGetLastError()); 
              closesocket(ListeningSocket); 
              WSACleanup(); 
              return; 
        } 
  
        //   將套接字轉入監聽模式,監聽客戶機的連線,指定最大佇列長度為5 
        if   (listen(ListeningSocket,   5)   ==   SOCKET_ERROR) 
        { 
              printf("listen   failed   with   error   %d/n",   WSAGetLastError()); 
              closesocket(ListeningSocket); 
              WSACleanup(); 
              return; 
        }   
  
        //   設定套接字為非阻塞模式 
        Ret   =   ioctlsocket(ListeningSocket,   FIONBIO,   (unsigned   long   *)   &ul); 
  
        if   (Ret   ==   SOCKET_ERROR) 
        { 
        printf("ioctlsocket   failed   with   error   %d/n",   WSAGetLastError()); 
        } 
        else 
        { 
        printf("set   nonblock   mode   successed,   return   value   %d/n",   Ret); 
        } 
  
        printf("waiting   for   connection   on   port   %d..../n",   MYPORT); 
  
        //   定義Select()   的最多等待時間 
        timeout.tv_sec   =   0; 
        timeout.tv_usec   =   500; 
  
        while   (1) 
        { 
        FD_ZERO(&readfds);       //   對結構體進行初始化 
        FD_SET(ListeningSocket,   &readfds);       //   把套接字加入集合 
  
        //   查詢套介面的可讀性 
        Ret   =   select(0,   &readfds,   NULL,   NULL,   &timeout); 
        
        if   (Ret   >   0) 
        { 
        if   (FD_ISSET(ListeningSocket,   &readfds)) 
        {       
        ClientAddrLen   =   sizeof(ClientAddr); 
        
        //   當有連線請求到達時,接受一個新連線 
        if   ((NewConnection   =   accept(ListeningSocket,   (SOCKADDR   *)   &ClientAddr, 
        &ClientAddrLen))   ==   INVALID_SOCKET) 
        { 
        printf("accept   failed   with   error   %d/n",   WSAGetLastError()); 
        closesocket(ListeningSocket); 
        WSACleanup(); 
        exit(1); 
        } 
        
        printf("got   a   connection   from   %s   ./n",   inet_ntoa(ClientAddr.sin_addr)); 
        
        //   迴圈 
        while(1) 
        { 
        //       FD_ZERO(&readfds); 
        FD_SET(NewConnection,   &readfds); 
  
        //   第二次檢查套介面的可讀性,等待時間設為空,否則會出現超時錯誤 
        Ret   =   select(0,   &readfds,   NULL,   NULL,   NULL); 
        
        if   (FD_ISSET(NewConnection,   &readfds)) 
        {       
        //   接收資料 
        Ret   =   recv(NewConnection,   DataBuffer,   sizeof(DataBuffer)-1,   0);   
        
        if   (Ret   ==   SOCKET_ERROR   ||   Ret   ==   0)   
        { 
        //   接收錯誤 
        printf("recv   failed   with   error   %d/n",   WSAGetLastError()); 
        closesocket(NewConnection); 
        WSACleanup(); 
        return   ; 
        }   
        else 
        { 
            DataBuffer[Ret]   =   '/0'; 
        printf("received   %d   byte:   %s",   (Ret-1),   DataBuffer);       
        } 
        
        } 
        } 
        } 
        } 
        } 
        
        //   關閉套接字 
        closesocket(NewConnection); 
        
        //   釋放由   winsock   分配的資源   
        WSACleanup(); 
  }
 
 
 
 
  非阻塞的客戶機程式: 
    #include   <winsock2.h> 
    #include   <stdio.h> 
    
    #define   MYPORT   5550       //   定義預設通訊埠 
    #define   LINELEN       128       //   定義一行資料的最大長度 
    
    void   main(int   argc,   char   **argv) 
    { 
          WSADATA                     wsaData; 
          SOCKET                       s; 
          SOCKADDR_IN             ServerAddr; 
          int                             Ret,   length; 
          char                           buf[LINELEN];       
          fd_set                       writefds;             //   等待可寫性檢查的套介面結構體 
          unsigned   long         ul   =   1; 
          struct   timeval       timeout;               //   最多等待時間 
    
          //   對主函式的引數進行處理 
          switch(argc) 
          { 
          case   1: 
          argv[1]   =   "127.0.0.1";     //   定義一個預設的IP地址 
          break; 
    
          case   2: 
          argv[1] 
          argv[2] 
          break; 
    
          default: 
          printf("argument   error!/n"); 
          exit(1); 
          } 
    
          printf("/n------------------非阻塞模式的套接字(客戶機)------------------/n/n"); 
    
          //   初始化   Winsock   2.2   版本 
          if   ((Ret   =   WSAStartup(MAKEWORD(2,2),   &wsaData))   !=   0) 
          { 
                
                printf("WSAStartup   failed   with   error   %d/n",   Ret); 
                return; 
          } 
          
          //   建立一個新套接字來建立客戶機連線 
          if   ((s   =   socket(AF_INET,   SOCK_STREAM,   IPPROTO_TCP))   ==   INVALID_SOCKET) 
          { 
                printf("socket   failed   with   error   %d/n",   WSAGetLastError()); 
                WSACleanup(); 
                return; 
          } 
      
          //   設定套接字為非阻塞模式 
          Ret   =   ioctlsocket(s,   FIONBIO,   (unsigned   long   *)   &ul); 
    
          if   (Ret   ==   SOCKET_ERROR) 
          { 
          printf("ioctlsocket   failed   with   error   %d/n",   WSAGetLastError()); 
          } 
          else 
          { 
          printf("set   nonblock   mode   successed,   return   value   %d/n",   Ret); 
          } 
    
          //   初始化一個   SOCKADDR_IN   結構 
          ServerAddr.sin_family   =   AF_INET; 
          ServerAddr.sin_port   =   htons(MYPORT);         
          ServerAddr.sin_addr.s_addr   =   inet_addr(argv[1]);     //   定義伺服器地址   
    
          //   用套接字   s   來建立一個到伺服器的連線 
          Ret   =   connect(s,   (SOCKADDR   *)   &ServerAddr,   sizeof(ServerAddr)); 
          if   (WSAGetLastError()   !=   WSAEWOULDBLOCK) 
          { 
          printf("connect   failed   with   error   %d/n",   WSAGetLastError()); 
          closesocket(s); 
          WSACleanup(); 
          return; 
          }   
          else 
          { 
          //   和伺服器的連線成功 
          printf("connect   to   %s   on   %d   succeeded./n",   argv[1],   MYPORT); 
          printf("please   input   send   data..../n"); 
    
          //   定義Select()   的最多等待時間 
          timeout.tv_sec   =   0; 
          timeout.tv_usec   =   500; 
          
          while   (1) 
          { 
          FD_ZERO(&writefds); 
          FD_SET(s,   &writefds); 
    
          //   查詢套介面的可寫性 
          Ret   =   select(   0,   NULL,   &writefds,   NULL,   &timeout); 
          
          if   (Ret   >   0) 
          { 
          if   (FD_ISSET(s,   &writefds)) 
          { 
          //   迴圈:從輸入流中取資料 
          while   (fgets(buf,   sizeof(buf),   stdin)) 
          { 
          buf[LINELEN]   =   '/0';         //   在字串最後加終止符   
          length   =   strlen(buf);       //   實際傳送位元組數   
          
          //   如果輸入是回車,則結束程式 
          if   (buf[0]   ==   '/n') 
          { 
          closesocket(s); 
          WSACleanup(); 
          return; 
          } 
          
          //   傳送資料 
          if   ((Ret   =   send(s,   buf,   length,   0))   ==   SOCKET_ERROR) 
          { 
          printf("send   failed   with   error   %d/n",   WSAGetLastError()); 
          closesocket(s); 
          WSACleanup(); 
          return; 
          } 
          } 
          } 
          } 
          } 
          } 
          //   關閉套接字 
          closesocket(s); 
          
          //   釋放由   winsock   分配的資源 
          WSACleanup(); 
  }