1. 程式人生 > >linux UDP併發伺服器

linux UDP併發伺服器

摘要:
本文將討論UDP的併發實現機制。給出了兩種實現方法。第一種是最為常見的,TFTP傳輸的方式。
第二種是對UDP進一步封裝,以達到併發的可能。主要是採用佇列、多執行緒的方法。後面會給出一個簡單的實現例子,以供大家參考。功能方面較為簡單,以後會慢慢完善。
現將思路整理如下,有興趣的同學可以一起討論。程式碼稍後公佈。

    眾所周知,通常所見的的TCP伺服器都是併發實現的,即服務同時處理多個請求,
而不是等待前一個完成再處理下一個請求,這個實現得益於TCP的listen()與connect()的分工處理機制。

而對於 UDP 沒有這種監聽和連線機制,所以它必須等待前一處理完成才能繼續處理下一個客戶的請求。
但並不是說UDP實現併發伺服器是不可能的,只是與上面的實現稍有不同。
123456

UDP伺服器併發的兩種方法:
一、比較常用的處理方法是:
伺服器(知名埠)等待一下客戶的到來,當一個客戶到來後,記下其IP和port,然後同理,
伺服器fork一個子程序,建立一個socket再bind一個隨機埠,然後建立與客戶的連線,
並處理該客戶的請求。父程序繼續迴圈,等待下一個客戶的到來。在tftpd中就是使用這種技術的。

    大概的實現如下:
        for ( ; ; )
        {
           /* 等待新的客戶端連線 */
            recvform( &from_addr)

            /* 建立一個新的程序,由該程序去處理 */
if (fork() == 0) break; //子程序跳出迴圈 } //child now here peer = socket(AF_INET, SOCK_DGRAM, 0); //繫結一個隨機埠 myaddr.sin_port = htons(0); bind(peer,(struct sockaddr *)&myaddr, / sizeof myaddr) /* 把這個套接字跟客戶端的地址連線起來 這也就意味之後之後套接字使用 send recv這些函式時 都是直接跟指定的客戶端進行通訊的 */
connect(peer, (struct sockaddr *)&from, sizeof from)
以上方式簡單實用,但是每來個客戶端都需要建立一個新的 socket,為每個客戶端分配一個新的臨時埠,然後客戶端,之後的通訊需要跟新的埠進行資料傳輸。
二、如果對上述不滿意。我們可以採用新的策略。對UDP進行封裝,以此實現型別TCP的功能。 我們來看下一個簡單 TCP 伺服器的原型:
        int main()
        {
            /* 初始化socket套接字 */
            sockfd = init_socket(); 
            /* 開始監聽 */
            if(listen(sock_fd, BACKLOG) == -1)
            {
                perror("listen is error\r\n");
                exit(1);
            }
            while(1)
            {
                /* 等待新的客戶端連線 */
                if((new_fd = accept(sock_fd, (struct sockaddr *)&their_addr, &sin_size)) == -1)
                {
                    perror("accept");
                    continue;
                }
                /* fork出一個程序,由該程序去處理這個連線 */
                if(!fork())
                {
                }
            }
        }     
    我們封裝出幾個跟上面的TCP相似的函式介面。使用這些介面,可以很簡單寫出一個UDP併發伺服器。例如:
        /* 主函式 */
        int main(int argc, char *argv[])
        {
            /* 定義一個listen指標。該結構體是自己定義的 */
            struct listen *_listen;
            /* 初始化socket,這個初始化過程跟普通的UDP初始化 socket套接字一樣 */
            sockfd = init_socket(); 

            /*
                開始監聽這個socket. 最大的連線數為10,也就是說最多隻有10個客戶端
                封裝好的一個函式,功能有點類似於 TCP協議中的 listen 函式
            */
            server_listen(&sockfd, 10);

            while(1)
            {
                /* 
                獲得一個連線。類似於TCP的 accept 函式 
                需要注意的是,如果沒有連線, server_accept 函式將進入休眠狀態,直到有一個新的客戶端資料
                客戶端只有在第一次發生資料過來的時候,才會建立一個新的 listen ,並喚醒 server_accept 函式
                之後,這個客戶端的所有資料都將傳送到 這個新的 listen 的資料佇列中。
                所以。通過這個 listen ,我們可以建立一個程序,由該程序去處理這個客戶端之後的請求
                這裡,listen 有點像 TCP 協議中的 accept 函式新建的 sockfd
                */
                _listen = server_accept();

                /* 
                雖然說 server_accept 會進入休眠,但是仍然會被其它訊號喚醒,所以要做個判斷
                判斷下是否為 NULL 。為 NULL 則說明沒有新的連線 
                */
                if(_listen == NULL){
                    continue;
                }

                printf("new client \r\n");
                /* 
                啟動一個 listen_phread 執行緒,並且,由該執行緒去處理這個連線
                類似於TCP 的fork
                */
                listen_pthread(_listen, listen_phread);
            }
        }
    listen_phread 執行緒簡單實現:
12
        void *listen_phread(void *pdata)
        {
            int ret;
            char buf[1204];
            struct sockaddr_in clientaddr;

            /* 獲得 listen */
            struct listen *_listen;
            _listen = (struct listen *)pdata;

            while(1)
            {
                /*
                recv_from_listen 也是一個封裝好的函式,功能是從這個 lsiten 中獲取資料
                最後一個引數表示無資料時休眠的時間
                -1 表示永久休眠。知道有資料為止
                */
                ret = recv_from_listen(_listen, &clientaddr, buf, 1204, -1);
                if(ret == -1)
                {
                    printf("%p recv is err \r\n", _listen);
                }else{
                    printf("%p recv %d byte data is [%s]\r\n", _listen, ret, buf);
                    if((ret = sendto(sockfd, buf, ret, 0, (struct sockaddr *)(&(_listen->addr)), 
                                            sizeof(struct sockaddr))) == -1)
                    {
                        perror("sendto :");
                    }
                    printf("sento [%s]\r\n", buf);
                }
            }
            /* 關閉連線,會釋放記憶體,注意,一個listen 被建立後,需要使用這個函式釋放記憶體 */
            listen_close(_listen);
        }
    lsiten 結構體原型:
12
        struct listen{      
            struct sockaddr addr;       /* 資料包地址資訊 */
            int data_num;               /* 資料包數量 */

            int list_flg;               /* 是否已經被監聽了 */

            pthread_mutex_t mutex;  /* 執行緒鎖 */

            /* 這兩個條件變數相關的 */

            pthread_mutex_t recv_mtx;
            pthread_cond_t recv_cond;

            struct list_head head;      /* 資料包佇列 */

            struct list_head listen_list;       /*接收的執行緒佇列 */
        };
實現原理:
    這個介面函式是基於佇列、多執行緒實現的。這裡簡單地說下原理,稍後有時間我會對程式碼進一步分析


1.  listen 佇列:
    系統會建立一個佇列,該佇列的成員為一個 listen ,每個 listen 的 addr 元素會記錄下自己要接收的
    客戶端。
    之後,server_listen 建立一個執行緒,由該執行緒去接收資料。
    接收到網路資料後,會遍歷 listen 連結串列,找到一個想要接收這個資料的 listen 。
    如果沒有,會建立一個新的 listen ,並將這個 listen 加入到 listen 佇列中去

2   資料包佇列
    找到 listen 後,每個 listen 其實就是一個 資料包佇列頭。系統會把資料放到 這個 listen 資料包佇列中去
    然後喚醒 recv_from_listen 

也就是說,系統的佇列結構如下

listen 佇列
    listen(1) -> listen(2) -> listen(3) -> listen(4) -> .......
        |           |           |
      data(1)     data         data
        |           |
      data(1)     data

每個listen本身就是一個數據包佇列頭


recv_from_listen 函式會試圖去從一個 listen 的資料包佇列中獲取資料,如果沒有資料,則進入休眠狀態。

主要函式介面:

void listen_head_init(struct list_head *head)
初始化一個 連結串列頭

int listen_add(struct list_head *head, listen_t *listen)
將要監聽的 listen 新增到這個連結串列頭

recv_from_listen_head
從連結串列中獲取資料

示例:

    //我們建立兩個 listen_head 
    struct list_head poll_head_1, poll_head_2;

    int main(int argc, char *argv[])
    {
        int poll_num = 0;
        struct listen *_listen;

        /* 初始化socket */
        sockfd = init_socket(); 

        /*
            開始監聽這個socket. 執行最大的連線數為10
            該函式類似於TCP協議中的 int listen(SOCKET sockfd, int backlog)
        */
        server_listen(&sockfd, 10);

        /* 初始化這個poll 機制 */
        listen_head_init(&poll_head_1);
        listen_head_init(&poll_head_2);

        while(1)
        {
            /* 獲得一個連線。類似於TCP的
            int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen); 
            */
            _listen = server_accept();

            if(_listen == NULL){
                continue;
            }

            printf("new client \r\n");

            if(poll_num < 5)
            {
                /* 前面五個連線者新增到 poll_head_1 */
                poll_num ++;
                listen_add(&poll_head_1, _listen);
            }else{
                /* 新增到 poll_head_2 */
                poll_num ++;
                listen_add(&poll_head_2, _listen);
            }
        }
    }
​```
 然後我們就可以從中兩個 listen_head 中讀取資料了
​```
    while(1)
    {
        /*
        從 poll_head_1 中讀取資料。
        此時,前面五個 listen 被掛鉤到這個 poll_head_1,所以這五個listen中任何一個有了資料
        recv_from_listen_head 都會返回,而且將 _listen 指向這個 listen 
        這樣,我們就可以知道是哪個listen有資料了
        */
        ret = recv_from_listen_head(poll_head_1, &_listen, (struct sockaddr *)&clientaddr, buf, 1204, -1);
        if(ret == -1)
        {
            printf("%p recv is err \r\n", _listen);
        }else{
            printf("__ poll %p recv %d byte data is [%s]\r\n", _listen, ret, buf);
            if((ret = sendto(sockfd, buf, ret, 0, (struct sockaddr *)(&(_listen->addr)), 
                                    sizeof(struct sockaddr))) == -1)
            {
                perror("sendto :");
            }
            printf("sento [%s]\r\n", buf);
        }
    }