1. 程式人生 > >Linux網路程式設計 -- select/epoll得知socket有資料可讀,如何判斷資料全部被讀取完畢?

Linux網路程式設計 -- select/epoll得知socket有資料可讀,如何判斷資料全部被讀取完畢?

http://blog.csdn.net/ldd909/article/details/6168077

 

補充一點:只有在使用epoll ET(Edge Trigger)模式的時候,才需要關注資料是否讀取完畢了。使用select或者epoll的LT模式,其實根本不用關注資料是否讀完了,select/epoll檢測到有資料可讀去讀就OK了。

 

這裡有兩種做法:

 

1. 針對TCP,呼叫recv方法,根據recv方法的返回值,如果返回值小於我們指定的recv buffer的大小,則認為資料已經全部接收完成。在Linux epoll的manual中,也有類似的描述:

 

For stream-oriented files (e.g., pipe, FIFO, stream socket), the condition that the read/write I/O space is exhausted can also be detected  by  checking the  amount  of data read from / written to the target file descriptor.  For example, if you call read(2) by asking to read a certain amount of data and read(2) returns a lower number of bytes, you can be sure of having exhausted the read I/O space for the file descriptor.  The same is true when  writing using write(2).  (Avoid this latter technique if you cannot guarantee that the monitored file descriptor always refers to a stream-oriented file.)

 

2. TCP和UDP都適用。將socket設成NONBLOCK(使用fcntl函式),然後select到該socket可讀之後,使用read/recv來讀取資料。當函式返回-1,同時errno是EAGAIN或EWOULDBLOCK的時候,表示資料已經全部讀取完畢。

 

實驗結論:

 

第一種方法是錯誤的。簡單來說,如果傳送了4K位元組,recv的時候使用一個2K的buffer,那麼,recv兩次之後就再也沒有資料可以recv了,此時recv就會block。永遠不會出現recv返回值小於2K的情況(注:recv/read返回0表示對端socket已經關閉)。

 

所以推薦使用第二種方法,第二種方法正確而且對TCP和UDP都管用。事實上,不論什麼平臺編寫網路程式,我認為都應該使用select+NONBLOCK socket的方式。這樣可以保證你的程式至少不會在recv/send/accept/connect這些操作上發生block從而將整個網路服務都停下來。不好的地方就是不太利於Debug,如果是block的socket,那麼GDB一跟就能知道阻塞在什麼地方了。。。

 

其實所謂讀取完畢指的是kernel中該socket對應的input data queue中的資料全部被讀取了出來,從而該socket在kernel中被設定成了unreadable的狀態。所以如果比如在區域網內,sender一直不斷髮送資料,則select到recv socket可讀之後,我們就可以一直不停的讀取到資料。所以,如果一個網路程式接收端想一次把資料全部接收完並且將所有接收到的資料都儲存在記憶體中的話,就需要考慮到這種情況,避免佔用過多的記憶體。

 

下面是測試程式碼,程式碼中client讀取了4K了之後就退出了,因為sender每次傳送4K,所以client select到一次readable之後,就只會讀取到4K。

Client.c:

#include  < stdio.h > 
#include  < stdlib.h > 
#include  < errno.h > 
#include  < string .h > 
#include  < netdb.h > 
#include  < sys / types.h > 
#include  < netinet / in .h > 
#include  < sys / socket.h > 
#include  < fcntl.h > 
#include  < unistd.h > 
#include  < sys / select.h > 

#define  SERVPORT 3333 
#define  RECV_BUF_SIZE 1024 

void  setnonblocking( int  sock)
{
     int  opts;
    opts = fcntl(sock,F_GETFL);
     if (opts < 0 )
    {
        perror( " fcntl(sock,GETFL) " );
        exit( 1 );
    }
    opts  =  opts | O_NONBLOCK;
     if (fcntl(sock,F_SETFL,opts) < 0 )
    {
        perror( " fcntl(sock,SETFL,opts) " );
        exit( 1 );
    }
}

int  main( int  argc,  char   * argv[])
{
     int  sockfd, iResult;
     char  buf[RECV_BUF_SIZE];
     struct  sockaddr_in serv_addr;
    fd_set readset, testset;

    sockfd  =  socket(AF_INET, SOCK_STREAM,  0 );
    setnonblocking(sockfd);

    memset( & serv_addr,  0 ,  sizeof (serv_addr));
    serv_addr.sin_family = AF_INET;
    serv_addr.sin_port = htons(SERVPORT);
    serv_addr.sin_addr.s_addr  =  inet_addr( " 127.0.0.1 " );

    connect(sockfd, ( struct  sockaddr  * ) & serv_addr,  sizeof (serv_addr));

    FD_ZERO( & readset);
    FD_SET(sockfd,  & readset);

    testset  =  readset;
    iResult  =  select(sockfd  +   1 ,  & testset, NULL, NULL, NULL);

     while  ( 1 ) {
        iResult  =  recv(sockfd, buf, RECV_BUF_SIZE,  0 );
         if  (iResult  ==   - 1 ) {
             if  (errno  ==  EAGAIN  ||  errno  ==  EWOULDBLOCK) {
                printf( " recv finish detected, quit.../n " );
                 break ;
            }
        }
        printf( " Received %d bytes/n " , iResult);
    }

    printf( " Final iResult: %d/n " , iResult);
     return   0 ;
}

 

 

Server.c:

#include  < stdio.h > 
#include  < stdlib.h > 
#include  < errno.h > 
#include  < string .h > 
#include  < sys / types.h > 
#include  < netinet / in .h > 
#include  < sys / socket.h > 
#include  < sys / wait.h > 

#define  SERVPORT 3333 
#define  BACKLOG 10 
#define  SEND_BUF_SIZE 4096 

int  main( int  argc,  char   * argv[])
{
     int  sockfd, client_fd, i;
     struct  sockaddr_in my_addr;
     char   * buffer  =  NULL;

    sockfd  =  socket(AF_INET, SOCK_STREAM,  0 );
    memset( & my_addr,  0 ,  sizeof (my_addr));
    my_addr.sin_family = AF_INET;
    my_addr.sin_port = htons(SERVPORT);
    my_addr.sin_addr.s_addr  =  inet_addr( " 127.0.0.1 " );

    bind(sockfd, ( struct  sockaddr  * ) & my_addr,  sizeof ( struct  sockaddr));
    listen(sockfd, BACKLOG);

    client_fd  =  accept(sockfd, NULL, NULL);

    buffer  =  malloc(SEND_BUF_SIZE); 

     for  (i  =   0 ; i  <   100 ; i ++ ) {
        send(client_fd, buffer, SEND_BUF_SIZE,  0 );
        sleep( 1 );
    }

    sleep( 10 );
    close(client_fd);
    close(sockfd);
    free(buffer);
     return   0 ;