1. 程式人生 > >網絡編程解決黏包現象

網絡編程解決黏包現象

end one deb block () 保護 img shel 一起

個人感悟:
首先我覺的要想學好一門語言不論是是什麽語言:java , C ,python ,PHP,都要弄明白網絡請求與響應,過程,osi模型等,我覺的這個最基礎的 ,但是今天我需要解決的是 網絡交互時發生的"黏包"問題,行了不啰嗦了讓我們進入正題吧。

黏包現象

讓我們基於tcp先制作一個遠程執行命令的程序(命令ls -l ; lllllll ; pwd)

這塊我們需要註意下

res=subprocess.Popen(cmd.decode(‘utf-8‘),
shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)

的結果的編碼是以當前所在的系統為準的,如果是windows,那麽res.stdout.read()讀出的就是GBK編碼的,在接收端需要用GBK解碼

且只能從管道裏讀一次結果

同時執行多條命令之後,得到的結果很可能只有一部分,在執行其他命令的時候又接收到之前執行的另外一部分結果,這種顯現就是黏包


那麽我們現在來實現個黏包現象吧 看我是怎麽解決的的
基於tcp協議實現的黏包
tcp - server

#_*_coding:utf-8_*_
from socket import *
import subprocess

ip_port=(‘127.0.0.1‘,8888)
BUFSIZE=1024

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

while True:
    conn,addr=tcp_socket_server.accept()
    print(‘客戶端‘,addr)

    while True:
        cmd=conn.recv(BUFSIZE)
        if len(cmd) == 0:break

        res=subprocess.Popen(cmd.decode(‘utf-8‘),shell=True,
                         stdout=subprocess.PIPE,
                         stdin=subprocess.PIPE,
                         stderr=subprocess.PIPE)

        stderr=res.stderr.read()
        stdout=res.stdout.read()
        conn.send(stderr)
        conn.send(stdout)

tcp - client

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=(‘127.0.0.1‘,8888)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)

while True:
    msg=input(‘>>: ‘).strip()
    if len(msg) == 0:continue
    if msg == ‘quit‘:break

    s.send(msg.encode(‘utf-8‘))
    act_res=s.recv(BUFSIZE)

    print(act_res.decode(‘utf-8‘),end=‘‘)

基於udp協議實現的黏包
udp - server

#_*_coding:utf-8_*_
from socket import *
import subprocess

ip_port=(‘127.0.0.1‘,9000)
bufsize=1024

udp_server=socket(AF_INET,SOCK_DGRAM)
udp_server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
udp_server.bind(ip_port)

while True:
    #收消息
    cmd,addr=udp_server.recvfrom(bufsize)
    print(‘用戶命令----->‘,cmd)

    #邏輯處理
    res=subprocess.Popen(cmd.decode(‘utf-8‘),shell=True,stderr=subprocess.PIPE,stdin=subprocess.PIPE,stdout=subprocess.PIPE)
    stderr=res.stderr.read()
    stdout=res.stdout.read()

    #發消息
    udp_server.sendto(stderr,addr)
    udp_server.sendto(stdout,addr)
udp_server.close()

udp - client

from socket import *
ip_port=(‘127.0.0.1‘,9000)
bufsize=1024

udp_client=socket(AF_INET,SOCK_DGRAM)

while True:
    msg=input(‘>>: ‘).strip()
    udp_client.sendto(msg.encode(‘utf-8‘),ip_port)
    err,addr=udp_client.recvfrom(bufsize)
    out,addr=udp_client.recvfrom(bufsize)
    if err:
        print(‘error : %s‘%err.decode(‘utf-8‘),end=‘‘)
    if out:
        print(out.decode(‘utf-8‘), end=‘‘)
"註意:只有TCP有粘包現象,UDP永遠不會粘包"

為什麽黏包

TCP協議中的數據傳遞
tcp協議的拆包機制

當發送端緩沖區的長度大於網卡的MTU時,tcp會將這次發送的數據拆成幾個數據包發送出去。
MTU是Maximum Transmission Unit的縮寫。意思是網絡上傳送的最大數據包。MTU的單位是字節。 大部分網絡設備的MTU都是1500。如果本機的MTU比網關的MTU大,大的數據包就會被拆開來傳送,這樣會產生很多數據包碎片,增加丟包率,降低網絡速度。

面向流的通信特點和Nagle算法

TCP(transport control protocol,傳輸控制協議)是面向連接的,面向流的,提供高可靠性服務。
收發兩端(客戶端和服務器端)都要有一一成對的socket,因此,發送端為了將多個發往接收端的包,更有效的發到對方,使用了優化方法(Nagle算法),將多次間隔較小且數據量小的數據,合並成一個大的數據塊,然後進行封包。
這樣,接收端,就難於分辨出來了,必須提供科學的拆包機制。 即面向流的通信是無消息保護邊界的。
對於空消息:tcp是基於數據流的,於是收發的消息不能為空,這就需要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即便是你輸入的是空內容(直接回車),也可以被發送,udp協議會幫你封裝上消息頭發送過去。
可靠黏包的tcp協議:tcp的協議數據不會丟,沒有收完包,下次接收,會繼續上次繼續接收,己端總是在收到ack時才會清除緩沖區內容。數據是可靠的,但是會粘包。

基於tcp協議特點的黏包現象成因

技術分享圖片

socket數據傳輸過程中的用戶態與內核態說明

發送端可以是一K一K地發送數據,而接收端的應用程序可以兩K兩K地提走數據,當然也有可能一次提走3K或6K數據,或者一次只提走幾個字節的數據。
也就是說,應用程序所看到的數據是一個整體,或說是一個流(stream),一條消息有多少字節對應用程序是不可見的,因此TCP協議是面向流的協議,這也是容易出現粘包問題的原因。
而UDP是面向消息的協議,每個UDP段都是一條消息,應用程序必須以消息為單位提取數據,不能一次提取任意字節的數據,這一點和TCP是很不同的。
怎樣定義消息呢?可以認為對方一次性write/send的數據為一個消息,需要明白的是當對方send一條信息的時候,無論底層怎樣分段分片,TCP協議層會把構成整條消息的數據段排序完成後才呈現在內核緩沖區。

例如基於tcp的套接字客戶端往服務端上傳文件,發送時文件內容是按照一段一段的字節流發送的,在接收方看了,根本不知道該文件的字節流從何處開始,在何處結束

此外,發送方引起的粘包是由TCP協議本身造成的,TCP為提高傳輸效率,發送方往往要收集到足夠多的數據後才發送一個TCP段。若連續幾次需要send的數據都很少,通常TCP會根據優化算法把這些數據合成一個TCP段後一次發送出去,這樣接收方就收到了粘包數據。

UDP不會發生黏包

UDP(user datagram protocol,用戶數據報協議)是無連接的,面向消息的,提供高效率服務。
不會使用塊的合並優化算法,, 由於UDP支持的是一對多的模式,所以接收端的skbuff(套接字緩沖區)采用了鏈式結構來記錄每一個到達的UDP包,在每個UDP包中就有了消息頭(消息來源地址,端口等信息),這樣,對於接收端來說,就容易進行區分處理了。 即面向消息的通信是有消息保護邊界的。
對於空消息:tcp是基於數據流的,於是收發的消息不能為空,這就需要在客戶端和服務端都添加空消息的處理機制,防止程序卡住,而udp是基於數據報的,即便是你輸入的是空內容(直接回車),也可以被發送,udp協議會幫你封裝上消息頭發送過去。
不可靠不黏包的udp協議:udp的recvfrom是阻塞的,一個recvfrom(x)必須對唯一一個sendinto(y),收完了x個字節的數

補充說明:
udp和tcp一次發送數據長度的限制

用UDP協議發送時,用sendto函數最大能發送數據的長度為:65535- IP頭(20) – UDP頭(8)=65507字節。用sendto函數發送數據時,如果發送數據長度大於該值,則函數會返回錯誤。(丟棄這個包,不進行發送)

會發生黏包的兩種情況

情況一 發送方的緩存機制
發送端需要等緩沖區滿才發送出去,造成粘包(發送數據時間間隔很短,數據了很小,會合到一起,產生粘包)
服務端

#_*_coding:utf-8_*_
from socket import *
ip_port=(‘127.0.0.1‘,8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

conn,addr=tcp_socket_server.accept()

data1=conn.recv(10)
data2=conn.recv(10)

print(‘----->‘,data1.decode(‘utf-8‘))
print(‘----->‘,data2.decode(‘utf-8‘))

conn.close()

客戶端

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=(‘127.0.0.1‘,8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)

s.send(‘hello‘.encode(‘utf-8‘))
s.send(‘egg‘.encode(‘utf-8‘))

情況二 接收方的緩存機制

接收方不及時接收緩沖區的包,造成多個包接收(客戶端發送了一段數據,服務端只收了一小部分,服務端下次再收的時候還是從緩沖區拿上次遺留的數據,產生粘包)

服務端

#_*_coding:utf-8_*_
from socket import *
ip_port=(‘127.0.0.1‘,8080)

tcp_socket_server=socket(AF_INET,SOCK_STREAM)
tcp_socket_server.bind(ip_port)
tcp_socket_server.listen(5)

conn,addr=tcp_socket_server.accept()

data1=conn.recv(2) #一次沒有收完整
data2=conn.recv(10)#下次收的時候,會先取舊的數據,然後取新的

print(‘----->‘,data1.decode(‘utf-8‘))
print(‘----->‘,data2.decode(‘utf-8‘))

conn.close()

客戶端

#_*_coding:utf-8_*_
import socket
BUFSIZE=1024
ip_port=(‘127.0.0.1‘,8080)

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex(ip_port)

s.send(‘hello egg‘.encode(‘utf-8‘))

總結

黏包現象只發生在tcp協議中:

1.從表面上看,黏包問題主要是因為發送方和接收方的緩存機制、tcp協議面向流通信的特點。

2.實際上,主要還是因為接收方不知道消息之間的界限,不知道一次性提取多少字節的數據所造成的

黏包的解決方案

解決方案一
問題的根源在於,接收端不知道發送端將要傳送的字節流的長度,所以解決粘包的方法就是圍繞,如何讓發送端在發送數據前,把自己將要發送的字節流總大小讓接收端知曉,然後接收端來一個死循環接收完所有數據。

技術分享圖片

服務端

#_*_coding:utf-8_*_
import socket,subprocess
ip_port=(‘127.0.0.1‘,8080)
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

s.bind(ip_port)
s.listen(5)

while True:
    conn,addr=s.accept()
    print(‘客戶端‘,addr)
    while True:
        msg=conn.recv(1024)
        if not msg:break
        res=subprocess.Popen(msg.decode(‘utf-8‘),shell=True,                            stdin=subprocess.PIPE,                         stderr=subprocess.PIPE,                         stdout=subprocess.PIPE)
        err=res.stderr.read()
        if err:
            ret=err
        else:
            ret=res.stdout.read()
        data_length=len(ret)
        conn.send(str(data_length).encode(‘utf-8‘))
        data=conn.recv(1024).decode(‘utf-8‘)
        if data == ‘recv_ready‘:
            conn.sendall(ret)
    conn.close()

客戶端

    _*_coding:utf-8_*_
import socket,time
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex((‘127.0.0.1‘,8080))

while True:
    msg=input(‘>>: ‘).strip()
    if len(msg) == 0:continue
    if msg == ‘quit‘:break

    s.send(msg.encode(‘utf-8‘))
    length=int(s.recv(1024).decode(‘utf-8‘))
    s.send(‘recv_ready‘.encode(‘utf-8‘))
    send_size=0
    recv_size=0
    data=b‘‘
    while recv_size < length:
        data+=s.recv(1024)
        recv_size+=len(data)

    print(data.decode(‘utf-8‘))

存在的問題:
程序的運行速度遠快於網絡傳輸速度,所以在發送一段字節前,先用send去發送該字節流長度,這種方式會放大網絡延遲帶來的性能損耗

解決方案進階
剛剛的方法,問題在於我們我們在發送

我們可以借助一個模塊,這個模塊可以把要發送的數據長度轉換成固定長度的字節。這樣客戶端每次接收消息之前只要先接受這個固定長度字節的內容看一看接下來要接收的信息大小,那麽最終接受的數據只要達到這個值就停止,就能剛好不多不少的接收完整的數據了。

struct模塊
該模塊可以把一個類型,如數字,轉成固定長度的bytes

>>> struct.pack(‘i‘,1111111111111)
struct.error: ‘i‘ format requires -2147483648 <= number <= 2147483647 #這個是範圍

技術分享圖片

import json,struct
#假設通過客戶端上傳1T:1073741824000的文件a.txt

#為避免粘包,必須自定制報頭
header={‘file_size‘:1073741824000,‘file_name‘:‘/a/b/c/d/e/a.txt‘,‘md5‘:‘8f6fbf8347faa4924a76856701edb0f3‘} #1T數據,文件路徑和md5值

#為了該報頭能傳送,需要序列化並且轉為bytes
head_bytes=bytes(json.dumps(header),encoding=‘utf-8‘) #序列化並轉成bytes,用於傳輸

#為了讓客戶端知道報頭的長度,用struck將報頭長度這個數字轉成固定長度:4個字節
head_len_bytes=struct.pack(‘i‘,len(head_bytes)) #這4個字節裏只包含了一個數字,該數字是報頭的長度

#客戶端開始發送
conn.send(head_len_bytes) #先發報頭的長度,4個bytes
conn.send(head_bytes) #再發報頭的字節格式
conn.sendall(文件內容) #然後發真實內容的字節格式

#服務端開始接收
head_len_bytes=s.recv(4) #先收報頭4個bytes,得到報頭長度的字節格式
x=struct.unpack(‘i‘,head_len_bytes)[0] #提取報頭的長度

head_bytes=s.recv(x) #按照報頭長度x,收取報頭的bytes格式
header=json.loads(json.dumps(header)) #提取報頭

#最後根據報頭的內容提取真實的數據,比如
real_data_len=s.recv(header[‘file_size‘])
s.recv(real_data_len)

關於struct的詳細用法

#_*_coding:utf-8_*_
#http://www.cnblogs.com/coser/archive/2011/12/17/2291160.html
__author__ = ‘Linhaifeng‘
import struct
import binascii
import ctypes

values1 = (1, ‘abc‘.encode(‘utf-8‘), 2.7)
values2 = (‘defg‘.encode(‘utf-8‘),101)
s1 = struct.Struct(‘I3sf‘)
s2 = struct.Struct(‘4sI‘)

print(s1.size,s2.size)
prebuffer=ctypes.create_string_buffer(s1.size+s2.size)
print(‘Before : ‘,binascii.hexlify(prebuffer))
# t=binascii.hexlify(‘asdfaf‘.encode(‘utf-8‘))
# print(t)

s1.pack_into(prebuffer,0,*values1)
s2.pack_into(prebuffer,s1.size,*values2)

print(‘After pack‘,binascii.hexlify(prebuffer))
print(s1.unpack_from(prebuffer,0))
print(s2.unpack_from(prebuffer,s1.size))

s3=struct.Struct(‘ii‘)
s3.pack_into(prebuffer,0,123,123)
print(‘After pack‘,binascii.hexlify(prebuffer))
print(s3.unpack_from(prebuffer,0))

使用struct解決黏包
借助struct模塊,我們知道長度數字可以被轉換成一個標準大小的4字節數字。因此可以利用這個特點來預先發送數據長度。

發送時 接收時
先發送struct轉換好的數據長度4字節 先接受4個字節使用struct轉換成數字來獲取要接收的數據長度
再發送數據 再按照長度接收數據

服務端(自定制報頭)

import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加

phone.bind((‘127.0.0.1‘,8080))

phone.listen(5)

while True:
    conn,addr=phone.accept()
    while True:
        cmd=conn.recv(1024)
        if not cmd:break
        print(‘cmd: %s‘ %cmd)

        res=subprocess.Popen(cmd.decode(‘utf-8‘),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        err=res.stderr.read()
        print(err)
        if err:
            back_msg=err
        else:
            back_msg=res.stdout.read()

        conn.send(struct.pack(‘i‘,len(back_msg))) #先發back_msg的長度
        conn.sendall(back_msg) #在發真實的內容

    conn.close()

客戶端(自定制報頭)

#_*_coding:utf-8_*_
import socket,time,struct

s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
res=s.connect_ex((‘127.0.0.1‘,8080))

while True:
    msg=input(‘>>: ‘).strip()
    if len(msg) == 0:continue
    if msg == ‘quit‘:break

    s.send(msg.encode(‘utf-8‘))

    l=s.recv(4)
    x=struct.unpack(‘i‘,l)[0]
    print(type(x),x)
    # print(struct.unpack(‘I‘,l))
    r_s=0
    data=b‘‘
    while r_s < x:
        r_d=s.recv(1024)
        data+=r_d
        r_s+=len(r_d)

    # print(data.decode(‘utf-8‘))
    print(data.decode(‘gbk‘)) #windows默認gbk編碼
我們還可以把報頭做成字典,字典裏包含將要發送的真實數據的詳細信息,然後json序列化,然後用struck將序列化後的數據長度打包成4個字節(4個自己足夠用了)
發送時 接收時
先發報頭長度 先收報頭長度,用struct取出來
再編碼報頭內容然後發送 根據取出的長度收取報頭內容,然後解碼,反序列化
最後發真實內容 從反序列化的結果中取出待取數據的詳細信息,然後去取真實的數據內容

服務端:定制稍微復雜一點的報頭

import socket,struct,json
import subprocess
phone=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
phone.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) #就是它,在bind前加

phone.bind((‘127.0.0.1‘,8080))

phone.listen(5)

while True:
    conn,addr=phone.accept()
    while True:
        cmd=conn.recv(1024)
        if not cmd:break
        print(‘cmd: %s‘ %cmd)

        res=subprocess.Popen(cmd.decode(‘utf-8‘),
                             shell=True,
                             stdout=subprocess.PIPE,
                             stderr=subprocess.PIPE)
        err=res.stderr.read()
        print(err)
        if err:
            back_msg=err
        else:
            back_msg=res.stdout.read()

        headers={‘data_size‘:len(back_msg)}
        head_json=json.dumps(headers)
        head_json_bytes=bytes(head_json,encoding=‘utf-8‘)

        conn.send(struct.pack(‘i‘,len(head_json_bytes))) #先發報頭的長度
        conn.send(head_json_bytes) #再發報頭
        conn.sendall(back_msg) #在發真實的內容

    conn.close()

客戶端

import socket
import struct
import json
import os

class MYTCPClient:
    address_family = socket.AF_INET

    socket_type = socket.SOCK_STREAM

    allow_reuse_address = False

    max_packet_size = 8192

    coding=‘utf-8‘

    request_queue_size = 5

    def __init__(self, server_address, connect=True):
        self.server_address=server_address
        self.socket = socket.socket(self.address_family,
                                    self.socket_type)
        if connect:
            try:
                self.client_connect()
            except:
                self.client_close()
                raise

    def client_connect(self):
        self.socket.connect(self.server_address)

    def client_close(self):
        self.socket.close()

    def run(self):
        while True:
            inp=input(">>: ").strip()
            if not inp:continue
            l=inp.split()
            cmd=l[0]
            if hasattr(self,cmd):
                func=getattr(self,cmd)
                func(l)

    def put(self,args):
        cmd=args[0]
        filename=args[1]
        if not os.path.isfile(filename):
            print(‘file:%s is not exists‘ %filename)
            return
        else:
            filesize=os.path.getsize(filename)

        head_dic={‘cmd‘:cmd,‘filename‘:os.path.basename(filename),‘filesize‘:filesize}
        print(head_dic)
        head_json=json.dumps(head_dic)
        head_json_bytes=bytes(head_json,encoding=self.coding)

        head_struct=struct.pack(‘i‘,len(head_json_bytes))
        self.socket.send(head_struct)
        self.socket.send(head_json_bytes)
        send_size=0
        with open(filename,‘rb‘) as f:
            for line in f:
                self.socket.send(line)
                send_size+=len(line)
                print(send_size)
            else:
                print(‘upload successful‘)

client=MYTCPClient((‘127.0.0.1‘,8080))

client.run()

老鐵們今天就到這裏明天 再來一炮soket進階。。。。

網絡編程解決黏包現象