1. 程式人生 > >TCP/IP實現(九) 插口I/O

TCP/IP實現(九) 插口I/O

一.插口快取(套接字快取)

struct	sockbuf {
	u_long	sb_cc;		// 快取中的資料大小
	u_long	sb_hiwat;	/* max actual char count */
	u_long	sb_mbcnt;	// 快取mbuf的數量
	u_long	sb_mbmax;	/* max chars of mbufs to use */
	long	sb_lowat;	/* low water mark */
	struct	mbuf *sb_mb;	// 快取連結串列
	struct	selinfo sb_sel;	// 用於記錄用select監聽該插口的程序
	short	sb_flags;	// 快取的一些狀態標誌,比如該快取是否已上鎖,是否有程序在等待上鎖等等
	short	sb_timeo;	// 用於限制一個程序讀寫套接字快取時的超時時限,預設為0,即無限等待
                        // 可以使用setsockopt函式通過SO_SNDTIMEO和SO_RCVTIMEO選項進行修改
} so_rcv, so_snd;

         注意,程序訪問套接字快取時是加鎖的,因此多個程序訪問套接字快取是安全的。

 

二.寫系統呼叫

       寫系統呼叫有write,writev,send,sendto,sendmsg,所有的這些系統呼叫都會都會直接或間接呼叫sosend函式,該函式會將程序傳來的資料複製到核心,並傳給與插口相關的協議。且前四個系統呼叫都可以用sendmsg函式來進行替換(但是隻有前兩個函式呼叫可以作用於其它描述符,後三個只能用於介面描述符),因此,此處只對sendmsg函式進行說明。

1.sendMsg的實現概述

     sendmsg會間接呼叫sosend函式將資料交付給相應的協議層,當絕不將資料直接新增到套接字快取中,因為這是資料層該做的工作,比如UDP就不會將資料放入快取。sosend函式首先會sblock函式獲取socket傳送快取的鎖,接著根據協議型別來進行不同的交付方式,對於有邊界的報文協議(如UDP),必須等到有足夠的快取時,一次性拷貝到核心的儲存空間mbuf中,再交付給協議層,否則(如TCP),每次交付一部分資料(一個mbuf)至協議層(只有當套接字快取可用空間高於低水位時才交付,否則等待套接字快取空閒)。當若設定了非阻塞模式,當空間不夠時,立刻返回EWOULDBLOCK(即EAGAIN,請求資源不足

)。對於邊界報文的協議而言,若一次性通過writev寫入的資料過大(查過了套接字結構中的高水位sb_hiwat),則也立刻返回EMSGSIZE,因為對於資料報協議而言,呼叫一次writev就是傳送一個數據報

    另外,sosend函式會首先檢查套接字是否被禁止(已關閉寫so->so_state & SS_CANTSENDMORE為真),若是則返回EPIPE並向所屬程序傳送SIGPIPE訊號(該訊號的預設行為是中止程序,muduo對該訊號做了忽略處理)。接著檢查套接字是否已連線,若不是則返回ENOTCONN。對於無連線協議若未指定目的地址則返回EDESTADDRREQ

 

三.讀系統呼叫

1.recvmsg函式的使用

    recvmsg函式的第二個引數比較複雜,在次對其進行講解,並對控制資訊引數的使用進行舉例說明:

    

       下面舉一個獲取UDP資料報首部目的地址的例子(一般用於獲取廣播報文):.h在前,.cpp在後

#include <sys/socket.h>
#include <string>
#include <arpa/inet.h>
#include <iostream>

class UDP_Server
{
public:
    UDP_Server(std::string ip, int port);

    void recvData();

private:
    int _socket;
    int _port;
    std::string _ip;

    sockaddr_in _servAddr;

};

.cpp:

#include <string.h>
#include <netinet/in.h>
#include <sys/param.h>

typedef union { // 注意這是一個聯合體,便於重新解讀這塊記憶體
    cmsghdr msghdr;  // cmsghdr的首部結構
    // 用於儲存控制資訊的緩衝區
    char control[CMSG_SPACE(sizeof(in_pktinfo))]; // CMSG_SPACE巨集的作用:sizeof(in_pktinfo) + sizeof(cmsghdr);
} cmsg_un;


UDP_Server::UDP_Server(std::string ip, int port) : _port(port),
                                                   _ip(ip)
{
    _socket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
    int on = 1;
    setsockopt(_socket, IPPROTO_IP, IP_PKTINFO, &on,sizeof(int));
    setsockopt(_socket, SOL_SOCKET, SO_BROADCAST, &on, sizeof(int)); // 設定套接字為允許接收廣播報文

    bzero(&_servAddr, sizeof(sockaddr_in));
    if(_ip != "") {
        inet_pton(AF_INET, _ip.c_str(), &_servAddr.sin_addr);
    }
    else {
        _servAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    }
    _servAddr.sin_port = htons(_port);

    if(bind(_socket, (sockaddr*)&_servAddr, sizeof(sockaddr)) < 0) {
        std::cout<< "bind error: " << errno <<std::endl;
        if(errno == EADDRNOTAVAIL) {
            std::cout<< "EADDRNOTAVAIL" <<std::endl;
        }
    }
}

void UDP_Server::recvData()
{
    char recvBuf[65536];
    msghdr msg;
    iovec iov;
    cmsg_un ctrlMsg;
    cmsghdr *cmsgptr;
    in_pktinfo *pi;
    int namelen =sizeof(sockaddr_in);

    iov.iov_base = recvBuf;
    iov.iov_len = sizeof(recvBuf);

    msg.msg_name = NULL;
    msg.msg_namelen =   0;
    msg.msg_control = &ctrlMsg.msghdr; // 指向用於儲存控制資訊的buf
    msg.msg_controllen = sizeof(cmsg_un);
    msg.msg_iov = &iov;
    msg.msg_iovlen = 1;
    msg.msg_flags = 0;
    int n = 0;
    if( (n = recvmsg(_socket, &msg, 0)) < 0) {
        std::cout<< "recv error: " << errno <<std::endl;
    }
    recvBuf[n] = '\0';
    std::cout<<"recv date: "<<recvBuf<<std::endl;

    // CMSG_FIRSTHDR巨集用於獲取緩衝中第一個cmsghdr結構,CMSG_NXTHDR用於指向下一個
    for(cmsgptr = CMSG_FIRSTHDR(&msg); cmsgptr; cmsgptr = CMSG_NXTHDR(&msg,cmsgptr)) {
        // 注意在測試的centOS下,獲取目的地址的型別是IP_PKTINFO,而非IP_RECVDSTADDR
        if(cmsgptr->cmsg_level == IPPROTO_IP && cmsgptr->cmsg_type == IP_PKTINFO) {
            // CMSG_DATA返回指向資料部分的指標
            pi = (in_pktinfo*)CMSG_DATA(cmsgptr);
            sockaddr_in lAddr;
            memcpy(&lAddr.sin_addr, &pi->ipi_addr, sizeof(in_addr));
            inet_ntop(AF_INET, &lAddr.sin_addr, recvBuf, sizeof(recvBuf));
            std::cout<<"des ip: "<<recvBuf<<std::endl;
        }
        //break;
    }
    //if()
}

2.帶外資料的讀取實現

      當要讀取帶外資料時,插口層只是為帶外資料分配一塊額外的快取,並向相關協議進行起碼請求。因此關於TCP帶外資料的讀取實現到討論TCP協議時進行說明

四.帶外資料與緊急模式

    帶外資料即OOB資料,一般用於通知重要事件,其擁有比普通資料更高的優先順序。許多運輸層確實提供了真正的帶外資料:使用同一個連線的獨立的邏輯資料通道作為正常的資料通道。但TCP沒有真正的帶外資料,而應該稱之為緊急模式。UDP無任何帶外資料。

1.TCP緊急模式

       TCP緊急模式中對用於描述緊急資料的欄位很少,只有首部中URG位元與一個16bit的緊急指標被置為一個正的偏移量。URG位元用於通知對端,緊急模式已啟動。當套接字傳送快取中存在一個OOB資料時,便會將下一個傳送分節的TCP首部的URG標誌置位,但OOB資料不一定隨下一分節發出,當接收端收到URG後便會進入緊急模式,直至越過緊急資料。而16bit緊急指標是一個偏移量,用於計算緊急欄位的最後一個位元組的序號(緊急指標 + 首部的32位序號)。

      當緊急指標所指資料到達TCP接收端時,該位元組資料即可能被拉出帶外,也可能被留在帶內,即線上留存。當設定了SO_OOBINLINE套接字選項時(預設情況下該選項是禁止的),會將該位元組資料留在套接字快取中,否則將被放至該連線的一個獨立的單位元組帶外緩衝區

2.幾種讀取緊急資料的方式

1)讀取放在套接字快取中的緊急資料

       當將緊急資料放在帶內時,即套接字快取中時,只能通過sockatmark或者ioclt函式先檢查帶外標記,插口層確保當一個套接字快取中存在緊急資料時,進行一次讀系統呼叫最多隻會讀到緊急資料位元組之前的資料,這是可用通過呼叫sockatmark或ioctl函式來檢查帶外資料標記,若返回真,則說明之後一個數據是緊急資料。程式碼如下:

void OOBSErver::recvOOBInline()
{
    int n = 0, on = 1;
    pollfd listenFds[10];
    char recvBuf[65536];

    listenFds[0].fd = _connFd;
    listenFds[0].events = POLLRDNORM | POLLPRI;

    setsockopt(_connFd, SOL_SOCKET, SO_OOBINLINE, &on, sizeof(on)); // 設定為將緊急資料放在帶內

    while(1) {
        int activeNum = ::poll(listenFds, 2, 10000); // 相較於UNP中的資料在此處添加了poll,這是需要的,原因在下面給出
        if(sockatmark(_connFd)) { // 檢查帶外標記
            std::cout<< "at OOB mark"<<std::endl;
        }

        if( (n = read(_connFd, recvBuf, sizeof(recvBuf) - 1)) == 0 ) {
            std::cout<<errno;
            std::cout<<"recv FIN"<<std::endl;
            exit(0);
        }
        recvBuf[n] = 0;
        std::cout << "read " <<n<<" bytes date: " << recvBuf << std::endl;
    }
}

    在該段程式中新增poll的原因是存在以下這種情況:前一次read呼叫將套接字快取的資料多空,接下來呼叫sockatmark進行判斷帶外標記為假,之後準備呼叫read,但這時到達了一條帶外標記,則不會打印出“at OOB mark”,程序也無法得知這是一條什麼資料。

    而且經測試發現,若傳送端連續多次呼叫send(,,,MSG_OOB)傳送帶外資料,接收端並未將舊對帶外資料丟棄,而是將舊的緊急資料做為普通資料讀入。若將緊急資料留在帶外單位元組快取中則不會出現該問題,此時若舊的緊急資料未讀又來了新的則會將舊的丟棄。因此不推薦使用SO_OOBINLINE選項。

2)使用poll函式關注高優先順序資料POLLPRI

      可以使用poll關注某個套接字上的POLLPRI(高優先順序資料可讀事件)和POLLRDNORM(普通資料可讀事件),當某個套接字可讀時根據返回的事件型別進行判斷是普通資料(POLLRDNORM)還是緊急資料(POLLPRI),程式碼如下:

void OOBSErver::usePollRecv()
{
    pollfd listenFds[10];
    int n = 0;
    char recvBuf[65536];

    listenFds[0].fd = _connFd;
    listenFds[0].events = POLLRDNORM | POLLPRI;

    while(1) {
        int activeNum = ::poll(listenFds, 2, 10000);

        if(listenFds[0].revents > 0) {
            if(listenFds[0].revents & POLLPRI) { // 高優先順序資料可讀
                // 若緊急資料放在帶外單位元組快取,需要通過引數MSG_OOB進行讀取
                n = recv(_connFd, recvBuf, sizeof(recvBuf), MSG_OOB); 
                if(n < 0) {
                    if(errno == EINVAL) {
                        std::cout << "EINVAL"<<std::endl; // 帶外資料尚未到達
                        continue;
                    }
                }
                recvBuf[n] = 0;
                std::cout << "read " <<n<<" bytes OOB date: " << recvBuf << std::endl;
            }
            if(listenFds[0].revents & POLLRDNORM) { // 普通資料可讀
                n = recv(_connFd, recvBuf, sizeof(recvBuf), 0);
                if(n == 0) {
                    std::cout<<"recv FIN"<<std::endl;
                    exit(0);
                }
                recvBuf[n] = 0;
                std::cout << "read " <<n<<" bytes normal date: " << recvBuf << std::endl;
            }
        }
    }
}

3)使用select

       通過select等待普通資料(將套接字描述符新增到集合ret,即可讀集合)或帶外資料(將描述符新增到xset,即異常集合)。由於select是水平觸發,因此當程序進入緊急狀態後,便會一直觸發異常,直到程序讀入越過帶外資料。UNP中給出了一種解決辦法,即只在讀入普通資料後才select異常條件。這樣是可行的,因為若前一次沒讀到帶外資料則說明帶外資料還未到,帶外資料之前肯定有普通資料(否則第一個含URG的報文就應該含有OOB資料),則應先讀普通資料,讀後再嘗試關注帶外資料。

void OOBSErver::useSelectRecv()
{
    fd_set rset, xset;
    int n = 0;
    bool justreadoob = false;
    char recvBuf[65536];
    while(1) {
        FD_SET(_connFd, &rset); // 新增至讀集合
        if(!justreadoob)
            FD_SET(_connFd, &xset); // 新增至寫集合

        select(_connFd + 1, &rset, NULL, &xset, NULL);

        if(FD_ISSET(_connFd, &xset)) {
            n = recv(_connFd, recvBuf, sizeof(recvBuf), MSG_OOB);
            justreadoob = true;
            FD_CLR(_connFd, &xset);
            if(n < 0) {
                if(errno == EINVAL) {
                    std::cout << "EINVAL"<<std::endl;
                    continue;
                }
            }
            recvBuf[n] = 0;
            std::cout << "read " <<n<<" bytes OOB date: " << recvBuf << std::endl;
        }

        if(FD_ISSET(_connFd, &rset)) {
            n = recv(_connFd, recvBuf, sizeof(recvBuf), 0);
            if(n == 0) {
                std::cout<<"recv FIN"<<std::endl;
                exit(0);
            }
            recvBuf[n] = 0;
            std::cout << "read " <<n<<" bytes normal date: " << recvBuf << std::endl;
            justreadoob = false;
        }
    }
}

4)使用SIGURG訊號

       主要程式碼如下:

typedef std::function<void()> SigCallBack;
SigCallBack sigurgCb;

void sig_urg(int signo) // 關聯到訊號的函式
{
    static int num = 0;
    std::cout << "recv URG: "<<++num<<std::endl;
    sigurgCb();  // 回撥handerSIGURG函式
}

void OOBSErver::useSigUrgRecv()
{
    sigurgCb = std::bind(&OOBSErver::handerSIGURG, this);
    fcntl(_connFd, F_SETOWN, getpid());
    signal(SIGURG, sig_urg); // 關聯訊號的處理函式
}

void OOBSErver::handerSIGURG() // 真正處理SIGURG訊號
{
    int n = 0;
    char recvBuf[65536];
    n = recv(_connFd, recvBuf, sizeof(recvBuf), MSG_OOB);
    recvBuf[n] = 0;
    std::cout << "read " <<n<<" bytes OOB date: " << recvBuf << std::endl;
}