1. 程式人生 > >tcp粘包和拆包的處理方案

tcp粘包和拆包的處理方案

隨著智慧硬體越來越流行,很多後端開發人員都有可能接觸到socket程式設計。而很多情況下,伺服器與端上需要保證資料的有序,穩定到達,自然而然就會選擇基於tcp/ip協議的socekt開發。開發過程中,經常會遇到tcp粘包,拆包的問題,本文將從產生原因,和解決方案以及workerman是如何處理粘包拆包問題的,這幾個層面來說明這個問題。

什麼是粘包拆包

對於什麼是粘包、拆包問題,我想先舉兩個簡單的應用場景:

  1. 客戶端和伺服器建立一個連線,客戶端傳送一條訊息,客戶端關閉與服務端的連線。

  2. 客戶端和伺服器簡歷一個連線,客戶端連續傳送兩條訊息,客戶端關閉與服務端的連線。

對於第一種情況,服務端的處理流程可以是這樣的:當客戶端與服務端的連線建立成功之後,服務端不斷讀取客戶端傳送過來的資料,當客戶端與服務端連線斷開之後,服務端知道已經讀完了一條訊息,然後進行解碼和後續處理...。對於第二種情況,如果按照上面相同的處理邏輯來處理,那就有問題了,我們來看看第二種情況下客戶端傳送的兩條訊息遞交到服務端有可能出現的情況:

第一種情況:

服務端一共讀到兩個資料包,第一個包包含客戶端發出的第一條訊息的完整資訊,第二個包包含客戶端發出的第二條訊息,那這種情況比較好處理,伺服器只需要簡單的從網路緩衝區去讀就好了,第一次讀到第一條訊息的完整資訊,消費完再從網路緩衝區將第二條完整訊息讀出來消費。

沒有發生粘包、拆包示意圖

第二種情況:

服務端一共就讀到一個數據包,這個資料包包含客戶端發出的兩條訊息的完整資訊,這個時候基於之前邏輯實現的服務端就蒙了,因為服務端不知道第一條訊息從哪兒結束和第二條訊息從哪兒開始,這種情況其實是發生了TCP粘包。

TCP粘包示意圖

第三種情況:

服務端一共收到了兩個資料包,第一個資料包只包含了第一條訊息的一部分,第一條訊息的後半部分和第二條訊息都在第二個資料包中,或者是第一個資料包包含了第一條訊息的完整資訊和第二條訊息的一部分資訊,第二個資料包包含了第二條訊息的剩下部分,這種情況其實是傳送了TCP拆,因為發生了一條訊息被拆分在兩個包裡面傳送了,同樣上面的伺服器邏輯對於這種情況是不好處理的。

TCP拆包示意圖

產生tcp粘包和拆包的原因

我們知道tcp是以流動的方式傳輸資料,傳輸的最小單位為一個報文段(segment)。tcp Header中有個Options標識位,常見的標識為mss(Maximum Segment Size)指的是,連線層每次傳輸的資料有個最大限制MTU(Maximum Transmission Unit),一般是1500位元,超過這個量要分成多個報文段,mss則是這個最大限制減去TCP的header,光是要傳輸的資料的大小,一般為1460位元。換算成位元組,也就是180多位元組。

tcp為提高效能,傳送端會將需要傳送的資料傳送到緩衝區,等待緩衝區滿了之後,再將緩衝中的資料傳送到接收方。同理,接收方也有緩衝區這樣的機制,來接收資料。

發生TCP粘包、拆包主要是由於下面一些原因:

  1. 應用程式寫入的資料大於套接字緩衝區大小,這將會發生拆包。

  2. 應用程式寫入資料小於套接字緩衝區大小,網絡卡將應用多次寫入的資料傳送到網路上,這將會發生粘包。

  3. 進行mss(最大報文長度)大小的TCP分段,當TCP報文長度-TCP頭部長度>mss的時候將發生拆包。

  4. 接收方法不及時讀取套接字緩衝區資料,這將發生粘包。

  5. ……

如何解決拆包粘包

既然知道了tcp是無界的資料流,且協議本身無法避免粘包,拆包的發生,那我們只能在應用層資料協議上,加以控制。通常在制定傳輸資料時,可以使用如下方法:

  1. 使用帶訊息頭的協議、訊息頭儲存訊息開始標識及訊息長度資訊,服務端獲取訊息頭的時候解析出訊息長度,然後向後讀取該長度的內容。

  2. 設定定長訊息,服務端每次讀取既定長度的內容作為一條完整訊息。

  3. 設定訊息邊界,服務端從網路流中按訊息編輯分離出訊息內容。

a)先基於第三種方法,假設區分資料邊界的標識為換行符"\n"(注意請求資料本身內部不能包含換行符),資料格式為Json,例如下面是一個符合這個規則的請求包。

{"type":"message","content":"hello"}\n

注意上面的請求資料末尾有一個換行字元(在PHP中用雙引號字串"\n"表示),代表一個請求的結束。

b)基於第一種方法,可以制定,首部固定10個位元組長度用來儲存整個資料包長度,位數不夠補0的資料協議

0000000036{"type":"message","content":"hello"}

c)基於第一種方法,可以制定,首部4位元組網路位元組序unsigned int,標記整個包的長度

****{"type":"message","content":"hello all"}

其中首部四位元組*號代表一個網路位元組序的unsigned int資料,為不可見字元,緊接著是Json的資料格式的包體資料。

基於workerman的解決方案

制定了資料協議,那我們下面來通過程式碼具體分析一下,php中workerman,是如何解決上述問題的。為了便於理解,可以看下下面的流程圖

workerman是基於策略模式來設計處理tcp粘包,拆包問題的。具體資料協議的制定在應用目錄Applications/YourApp/Protocols目錄下,實現則是在框架目錄Workerman/Connection/TcpConnection.php中。這樣的好處就是使用者可以隨意定製自己的資料協議格式,而框架程式碼都能處理。

我們現在Applications/YourApp/Protocols目錄下,建一個jsonNL.php,來實現自己制定自己定義的資料協議。

JsonNL.php的實現

namespace Protocols;
class JsonNL
{
    /**
     * 檢查包的完整性
     * 如果能夠得到包長,則返回包的在buffer中的長度,否則返回0繼續等待資料
     * 如果協議有問題,則可以返回false,當前客戶端連線會因此斷開
     * @param string $buffer
     * @return int
     */
    public static function input($buffer)
    {
        // 獲得換行字元"\n"位置
        $pos = strpos($buffer, "\n");
        // 沒有換行符,無法得知包長,返回0繼續等待資料
        if($pos === false)
        {
            return 0;
        }
        // 有換行符,返回當前包長(包含換行符)
        return $pos+1;
    }

    /**
     * 打包,當向客戶端傳送資料的時候會自動呼叫
     * @param string $buffer
     * @return string
     */
    public static function encode($buffer)
    {
        // json序列化,並加上換行符作為請求結束的標記
        return json_encode($buffer)."\n";
    }

    /**
     * 解包,當接收到的資料位元組數等於input返回的值(大於0的值)自動呼叫
     * 並傳遞給onMessage回撥函式的$data引數
     * @param string $buffer
     * @return string
     */
    public static function decode($buffer)
    {
        // 去掉換行,還原成陣列
        return json_decode(trim($buffer), true);
    }
}

再看下TcpConnection.php中,接收資料時,如何處理。

public function baseRead($socket, $check_eof = true)
    {
        $buffer = fread($socket, self::READ_BUFFER_SIZE);

        // Check connection closed.
        if ($buffer === '' || $buffer === false) {
            if ($check_eof && (feof($socket) || !is_resource($socket) || $buffer === false)) {
                $this->destroy();
                return;
            }
        } else {
            $this->_recvBuffer .= $buffer;
        }

        // If the application layer protocol has been set up.
        if ($this->protocol) {
            $parser = $this->protocol;
            while ($this->_recvBuffer !== '' && !$this->_isPaused) {
                // The current packet length is known.
                if ($this->_currentPackageLength) {
                    // Data is not enough for a package.
                    if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                        break;
                    }
                } else {
                    // Get current package length.
                    $this->_currentPackageLength = $parser::input($this->_recvBuffer, $this);
                    // The packet length is unknown.
                    if ($this->_currentPackageLength === 0) {
                        break;
                    } elseif ($this->_currentPackageLength > 0 && $this->_currentPackageLength <= self::$maxPackageSize) {
                        // Data is not enough for a package.
                        if ($this->_currentPackageLength > strlen($this->_recvBuffer)) {
                            break;
                        }
                    } // Wrong package.
                    else {
                        echo 'error package. package_length=' . var_export($this->_currentPackageLength, true);
                        $this->destroy();
                        return;
                    }
                }

                // The data is enough for a packet.
                self::$statistics['total_request']++;
                // The current packet length is equal to the length of the buffer.
                if (strlen($this->_recvBuffer) === $this->_currentPackageLength) {
                    $one_request_buffer = $this->_recvBuffer;
                    $this->_recvBuffer  = '';
                } else {
                    // Get a full package from the buffer.
                    $one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
                    // Remove the current package from the receive buffer.
                    $this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);
                }
                // Reset the current packet length to 0.
                $this->_currentPackageLength = 0;
                if (!$this->onMessage) {
                    continue;
                }
                try {
                    // Decode request buffer before Emitting onMessage callback.
                    call_user_func($this->onMessage, $this, $parser::decode($one_request_buffer, $this));
                } catch (\Exception $e) {
                    Worker::log($e);
                    exit(250);
                } catch (\Error $e) {
                    Worker::log($e);
                    exit(250);
                }
            }
            return;
        }

        if ($this->_recvBuffer === '' || $this->_isPaused) {
            return;
        }

        // Applications protocol is not set.
        self::$statistics['total_request']++;
        if (!$this->onMessage) {
            $this->_recvBuffer = '';
            return;
        }
        try {
            call_user_func($this->onMessage, $this, $this->_recvBuffer);
        } catch (\Exception $e) {
            Worker::log($e);
            exit(250);
        } catch (\Error $e) {
            Worker::log($e);
            exit(250);
        }
        // Clean receive buffer.
        $this->_recvBuffer = '';
    }

上面的程式碼比較多,不需要細讀,幾個關鍵的地方可以看出處理的思路,先把接收的資料包追加到_recvBuffer變數中,然後呼叫使用者自己定義的資料協議中的input方法。input方法則會判斷資料中是否包含邊界符,如果不包含則返回0,包含則返回當前資料包的大小。框架中接收到input的返回值後,如果接收值為0,則跳出迴圈不做處理,如果接收值不為0,則將擷取的資料包賦值給one_request_buffer,並且重置_recvBuffer

// Get a full package from the buffer.
$one_request_buffer = substr($this->_recvBuffer, 0, $this->_currentPackageLength);
// Remove the current package from the receive buffer.
$this->_recvBuffer = substr($this->_recvBuffer, $this->_currentPackageLength);

最後:tcp雖然是個強大的協議,能保證資料的穩定性,一致性,但在實際開發中,我們還需要根據實際的資料協議,來控制每次獲取的包是客戶端發過來的一個完整的可以解析的包。