1. 程式人生 > >netty4粘包/拆包/斷包 解決方案

netty4粘包/拆包/斷包 解決方案

粘包、拆包表現形式

現在假設客戶端向服務端連續傳送了兩個資料包,用packet1和packet2來表示,那麼服務端收到的資料可以分為三種,現列舉如下:

第一種情況,接收端正常收到兩個資料包,即沒有發生拆包和粘包的現象,此種情況不在本文的討論範圍內。normal

第二種情況,接收端只收到一個數據包,由於TCP是不會出現丟包的,所以這一個資料包中包含了傳送端傳送的兩個資料包的資訊,這種現象即為粘包。這種情況由於接收端不知道這兩個資料包的界限,所以對於接收端來說很難處理。one

第三種情況,這種情況有兩種表現形式,如下圖。接收端收到了兩個資料包,但是這兩個資料包要麼是不完整的,要麼就是多出來一塊,這種情況即發生了拆包和粘包。這兩種情況如果不加特殊處理,對於接收端同樣是不好處理的。half_one

one_half

粘包問題的解決策略

由於底層的TCP無法理解上層的業務資料,所以在底層是無法保證資料包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決。業界的主流協議的解決方案,可以歸納如下: 
1. 訊息定長,報文大小固定長度,例如每個報文的長度固定為200位元組,如果不夠空位補空格; 
2. 包尾新增特殊分隔符,例如每條報文結束都添加回車換行符(例如FTP協議)或者指定特殊字元作為報文分隔符,接收方通過特殊分隔符切分報文區分; 
3. 將訊息分為訊息頭和訊息體,訊息頭中包含表示資訊的總長度(或者訊息體長度)的欄位; 
4. 更復雜的自定義應用層協議。


Netty粘包和拆包解決方案

Netty提供了多個解碼器,可以進行分包的操作,分別是: 
* LineBasedFrameDecoder (換行)

   LineBasedFrameDecoder是回車換行解碼器,如果使用者傳送的訊息以回車換行符作為訊息結束的標識,則可以直接使用Netty的LineBasedFrameDecoder對訊息進行解碼,只需要在初始化Netty服務端或者客戶端時將LineBasedFrameDecoder正確的新增到ChannelPipeline中即可,不需要自己重新實現一套換行解碼器。
   LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf中的可讀位元組,判斷看是否有“\n”或者“\r\n”,如果有,就以此位置為結束位置,從可讀索引到結束位置區間的位元組就組成了一行。它是以換行符為結束標誌的解碼器,支援攜帶結束符或者不攜帶結束符兩種解碼方式,同時支援配置單行的最大長度。如果連續讀取到最大長度後仍然沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。防止由於資料報沒有攜帶換行符導致接收到ByteBuf無限制積壓,引起系統記憶體溢位。


* DelimiterBasedFrameDecoder(新增特殊分隔符報文來分包)
 
   DelimiterBasedFrameDecoder是分隔符解碼器,使用者可以指定訊息結束的分隔符,它可以自動完成以分隔符作為碼流結束標識的訊息的解碼。
   回車換行解碼器實際上是一種特殊的DelimiterBasedFrameDecoder解碼器。


* FixedLengthFrameDecoder(使用定長的報文來分包) 
    FixedLengthFrameDecoder是固定長度解碼器,它能夠按照指定的長度對訊息進行自動解碼,開發者不需要考慮TCP的粘包/拆包等問題,非常實用。
    對於定長訊息,如果訊息實際長度小於定長,則往往會進行補位操作,它在一定程度上導致了空間和資源的浪費。但是它的優點也是非常明顯的,編解碼比較簡單,因此在實際專案中仍然有一定的應用場景。


* LengthFieldBasedFrameDecoder (自定義解碼器跟編碼器)

   本文介紹的重點LengthFieldBasedFrameDecoder,一般包含了訊息頭(head)、訊息體(body):訊息頭是固定的長度,一般有有以下資訊 -> 是否壓縮(zip)、訊息型別(type or cmdid)、訊息體長度(body length);訊息體長度不是固定的,其大小由訊息頭記載,一般記載業務互動資訊。

  netty對應來說就是編碼器(Encoder)跟解碼器(Decoder),一般其中會有一個基本訊息類對外輸出,egg:

/**
 * @describe 訊息快取區
 * @author zhikai.chen
 * @date 2018年4月28日 上午10:13:35
 */
public class Message {
	
	/**
	 * 要傳送的資料
	 */
	private String data;
	
	/**
	 * 業務編號
	 */
	private short cmdId;
	
	/**
	 * 訊息型別  0xAF 表示心跳包    0xBF 表示超時包  0xCF 業務資訊包
	 */
	private byte type;
	
	/**
	 * 是否壓縮,1是,0不是
	 */
	private	byte zip = 0 ;
	
    /**
     * 封裝要傳送的資料包
     * @param data 業務資料
     * @param cmdId 業務標識號
     * @param type 訊息型別
     */
	public Message(String data,short cmdId,byte type){
		this.data=data;
		this.cmdId=cmdId;
		this.type=type;
	}

	public String getData() {
		return data;
	}

	public void setData(String data) {
		this.data = data;
	}

	public short getCmdId() {
		return cmdId;
	}

	public void setCmdId(short cmdId) {
		this.cmdId = cmdId;
	}

	public byte getType() {
		return type;
	}

	public void setType(byte type) {
		this.type = type;
	}

	public byte getZip() {
		return zip;
	}

	public void setZip(byte zip) {
		this.zip = zip;
	}
Encoder:
/**
 * @describe 訊息編碼器,封裝
 * @author zhikai.chen
 * @date 2018年4月28日 上午10:17:52
 */
public class MessageEncoder extends MessageToByteEncoder<Message> {
	
    // 編碼格式
	private final Charset charset = Charset.forName("UTF-8");
	// 需要壓縮的長度
	private final int compressLength=1024;
	
    @Override
    protected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
    	String source=msg.getData();
		byte[] body=source.getBytes(charset);
		if(body.length > compressLength){
			msg.setZip((byte)1);
			// 加壓
			body=ZipTool.compress(body);
		}
		
		//cmdId(2)+type(1)+zip(1)+body(4)=8
		//out = Unpooled.directBuffer(8+body.length);
		
		//cmdId
		out.writeShort(msg.getCmdId());
		//type
		out.writeByte(msg.getType());
		//是否加壓
		out.writeByte(msg.getZip());
		//長度
		out.writeInt(body.length);
		//內容
		out.writeBytes(body);
    }
    
}

NioSocketServerInitializer(服務端跟客戶端都一致):

    //TODO 參考Message
    //body(4)+zip(1)+cmdId(2)+type(1)=8
    //最大長度
    private static final int MAX_FRAME_LENGTH = 1024 * 1024;  
    //這個值就是MessageEncoder body.length(4)
    private static final int LENGTH_FIELD_LENGTH = 4; 
    //這個值就是MessageEncoder zip(1)+cmdId(2)+type(1)
    private static final int LENGTH_FIELD_OFFSET = 4;  
    private static final int LENGTH_ADJUSTMENT = 0;  
    private static final int INITIAL_BYTES_TO_STRIP = 0;
        //TODO 粘包處理
        //解碼器
        ch.pipeline().addLast(new MessageDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP));
        //編碼器
        ch.pipeline().addLast(new MessageEncoder());
稍微解釋一下:
1)LENGTH_FIELD_LENGTH指的就是我們這邊CustomMsg中length這個屬性的大小,我們這邊是int型,所以是4
2)LENGTH_FIELD_OFFSET指的就是我們這邊length欄位的起始位置,因為前面有zip(1)+cmdId(2)+type(1)=4,所以是4
3)LENGTH_ADJUSTMENT指的是length這個屬性的值,假如我們的body長度是40,有時候,有些人喜歡將length寫成44,因為length本身還佔有4個位元組,

    這樣就需要調整一下,那麼就需要-4,我們這邊沒有這樣做,所以寫0就可以了   

Decoder
/**
 * @describe 訊息解碼器
 * @author zhikai.chen
 * @date 2018年4月28日 上午11:09:15
 */
public class MessageDecoder extends LengthFieldBasedFrameDecoder {
	
    //body(4)+zip(1)+cmdId(2)+type(1)=8
    private static final int HEADER_SIZE = 8;
    
    // 編碼格式
 	private final Charset charset = Charset.forName("UTF-8");

    public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
    }
    
    public MessageDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip) {
        super(maxFrameLength, lengthFieldOffset, lengthFieldLength,lengthAdjustment,initialBytesToStrip);
    }

    @Override
    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
        if (in == null) {
            return null;
        }

        // 訊息頭讀取不完整,不做解析返回null,直到讀完整為止
        if (in.readableBytes() <= HEADER_SIZE) {
            return null;
        }

        in.markReaderIndex();

        short cmdId = in.readShort();
        byte type = in.readByte();
        byte zip = in.readByte();
        int dataLength = in.readInt();

        // TODO 網路訊號不好,沒有接收到完整資料
        if (in.readableBytes() < dataLength) {
        	//儲存當前讀到的資料,下一次繼續讀取
        	//斷包處理:檢視ByteToMessageDecoder的channelRead方法,ByteBuf cumulation屬性
            in.resetReaderIndex();
            return null;
        }

        byte[] data = new byte[dataLength];
        in.readBytes(data);
        
        // TODO 手動釋放記憶體
        //in.release(); // or ReferenceCountUtil.release(in);
        
        //判斷是否壓縮
        if(zip==1){
        	data=ZipTool.uncompress(data);
        }
        String body = new String(data, charset);
        Message msg = new Message(body, cmdId, type);
        return msg;
    }
}

Decoder的解碼順序是跟Encoder一致的。

對應的訊息接收handler read處理:

NioSocketServerHandler server=new NioSocketServerHandler();
pipeline.addLast(server);
public class NioSocketServerHandler extends SimpleChannelInboundHandler<Message> {
	
    private final Logger log=LoggerFactory.getLogger(this.getClass());

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Message msg) throws Exception {
    	log.info("server read msg:{}", JSON.toJSONString(msg));
    }}

write處理:

Message msg=new Message("ok",ModuleID.HEART_BEAT,(byte)0xAF);
ctx.channel().writeAndFlush(msg);

斷包處理

眼尖的童靴其實已經發現(Decoder):

//TODO 訊息頭都讀取不完整,不做解析返回null,直到讀完整為之
        if (in.readableBytes() <= HEADER_SIZE) {
            return null;
        }

        in.markReaderIndex();

        short cmdId = in.readShort();
        byte type = in.readByte();
        byte zip = in.readByte();
        int dataLength = in.readInt();

        // TODO 網路訊號不好,沒有接收到完整資料
        if (in.readableBytes() < dataLength) {
        	//儲存當前讀到的資料,下一次繼續讀取
        	//斷包處理:檢視ByteToMessageDecoder的channelRead方法,ByteBuf cumulation屬性
            in.resetReaderIndex();
            return null;
        }

   說明註釋已經講解了,我們來看看訊息體一次讀不完(斷包)了,netty底層是怎麼處理的:解碼器繼承LengthFieldBasedFrameDecoder,而LengthFieldBasedFrameDecoder繼承ByteToMessageDecoder,ByteToMessageDecoder中有一個channelRead方法:

@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof ByteBuf) {
            CodecOutputList out = CodecOutputList.newInstance();
            try {
                ByteBuf data = (ByteBuf) msg;
                first = cumulation == null;
                if (first) {
                    cumulation = data;
                } else {
                    cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                }
                callDecode(ctx, cumulation, out);
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable t) {
                throw new DecoderException(t);
            } finally {
                if (cumulation != null && !cumulation.isReadable()) {
                    numReads = 0;
                    cumulation.release();
                    cumulation = null;
                } else if (++ numReads >= discardAfterReads) {
                    // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                    // See https://github.com/netty/netty/issues/4275
                    numReads = 0;
                    discardSomeReadBytes();
                }

                int size = out.size();
                decodeWasNull = !out.insertSinceRecycled();
                fireChannelRead(ctx, out, size);
                out.recycle();
            }
        } else {
            ctx.fireChannelRead(msg);
        }
    }

我們來看看cumulation的定義,

    ByteBuf cumulation;
    private Cumulator cumulator = MERGE_CUMULATOR;
    private boolean singleDecode;
    private boolean decodeWasNull;
    private boolean first;
    private int discardAfterReads = 16;
    private int numReads;
cumulation為ByteBuf物件,是一個緩衝區,即如果訊息體一次讀不完,下一次繼續讀取,直到讀完整訊息頭給定的長度為止。

相關推薦

tcp協議下問題的產生及解決方案

ont phone 系統 pac 接收 實的 msg 來源 erro 1、粘包產生原因: (1)TCP為提高傳輸效率,發送方往往要收集到足夠多的數據後才發送一個TCP段。若連續幾次需要send的數據都很少,通常TCP會根據優化算法(Nagle)把這些數據合成一個TCP段後一

Socket問題的3種解決方案,最後一種最完美!

在 Java 語言中,傳統的 Socket 程式設計分為兩種實現方式,這兩種實現方式也對應著兩種不同的傳輸層協議:TCP 協議和 UDP 協議,但作為網際網路中最常用的傳輸層協議 TCP,在使用時卻會導致粘包和半包問題,於是為了徹底的解決此問題,便誕生了此篇文章。 # 什麼是 TCP 協議? TCP 全稱

重新看待Jar沖突問題及解決方案

ani des 聲明 堆棧 變更 and 系列 linux操作 manage Jar包沖突是老生常談的問題,幾乎每一個Java程序猿都不可避免地遇到過,並且也都能想到通常的原因一般是同一個Jar包由於maven傳遞依賴等原因被引進了多個不同的版本而導致,可采用依賴排除、

問題的成因與解決方案

發送數據包 IV 內存 多次 struct 可靠 就會 maximum 讀取文件 原文鏈接地址:https://www.cnblogs.com/kakawith/p/8378425.html 一、黏包成因 tcp協議的拆包機制 當發送端緩沖區的長度大於網卡的MTU時,tc

Maven JAR衝突問題排查及解決方案

前言 寫這篇文章的初衷是因為今天在使用mvn dependency:tree命令時,突然想起一年前面試阿里的一道面試題。面試題是說假設線上發生JAR包衝突,應該怎麼排查?我那時候的回答是IDEA有個Maven Helper的外掛,可以幫忙分析依賴衝突,然後還有

【Maven依賴衝突】Maven jar衝突問題排查及解決方案

前言 寫這篇文章的初衷是因為今天在使用mvn dependency:tree命令時,突然想起一年前面試阿里的一道面試題。面試題是說假設線上發生JAR包衝突,應該怎麼排查?我那時候的回答是IDEA有個Maven Helper的外掛,可以幫忙分析依賴衝突,然後還有一種辦法是如果

使用Charles進行HTTPS抓(包括安裝信任證書以及抓 出現無法抓unknown和證書無效解決方案

背景: 在進行App測試或定位線上問題時,經常會遇到抓取HTTPS資料包的需求。一般在windows上會使用fiddler,Mac上使用Charles。對於https請求,抓到的資料因為經過了加密,只能看到亂碼。 本文介紹如何使用Charles來抓取https網路報文

Android專案gradle依賴以及新增的jar過多出現的問題解決方案

隨著專案引用的庫以及jar包越來越多,今天需要給客戶發apk時,出現了一個問題。執行沒問題,但是打包就出現問題。 於是各種百度,發現說引用衝突,感覺是即時通訊的v4包跟自己的v4包衝突了,於是就把app的v4引用刪了,結果還是報錯。 於是我仔細看了看錯誤

go語言,第三方相對路徑匯入引起的問題及解決方案(goquery)

對go語言而言,跟蹤init很顯然包有且僅有一次被匯入的可能。 但是重複引用了goquery包,後編譯出現問題  專案涉及相關目錄 ├── main.go└── parse    └── parse.go parse包和main.go都匯入了 goquery包 main

Charles的安裝破解與https抓完整指引(含VPN解決方案)

Charles的安裝與破解 app抓包工具在Mac常用Charles,window用fiddler, 本文詳細介紹Charles安裝破解以及配合手機抓取https資訊的使用方法,所有步驟均本人親測 (本文以window環境/iOS手機為例,Mac步驟類似) 首

Intellij IDEA建立(package)問題解決方案

問題 在使用IDEA建立包時會出現這樣一種場景,就是當一個空包很長時,比如com.secbro.drools.model。這個時候如果你想給drools或model建立同級的包,你會發現,預設建立的包不是同級的,而是在model下面的。 如下圖,如果建立dr

asp.net core 3.1 引用的元dll版本相容性問題解決方案

自從.netcore 3.1出來後,大家都想立馬升級到最新版本。我也是如此,微軟也對.netcore 3.1的官方元件不斷升級,幾乎每隔幾天就會有部分元包可以升級。每次開啟Nuget包管理器,“更新”的tab處總會有個數字暗示著你快點升級!一向有程式碼潔癖的同學(包括我),都會毫不猶豫

centos6&7 ssh 經常解決方案

1.首先編輯ssh配置/etc/ssh/sshd_config以下幾項必須要設定(1)關閉DNS反向解析UseDNS no(2)關閉ssh的gssapi認證GSSAPIAuthentication noGSSAPICleanupCredentials no(3)伺服器端向客戶

mac電腦iTerm2連結linux伺服器解決方案

問題 用windows連結linux伺服器,方式很多,我最經常用的是xshell。但是xshell沒有mac版,那用mac電腦,比較好的命令列軟體是什麼呢?我用的是iTerm2 ,這個功能確實蠻強大,很多人都在用。但是,你用iTerm2 連結遠端linux伺服器,假如你

netty4// 解決方案

粘包、拆包表現形式現在假設客戶端向服務端連續傳送了兩個資料包,用packet1和packet2來表示,那麼服務端收到的資料可以分為三種,現列舉如下:第一種情況,接收端正常收到兩個資料包,即沒有發生拆包和粘包的現象,此種情況不在本文的討論範圍內。第二種情況,接收端只收到一個數據

Netty4.x】Netty TCP/問題的解決辦法(二)

一、什麼是TCP粘包/拆包  如圖所示,假如客戶端分別傳送兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,故可能存在以下4中情況:第一種情況:Server端分別讀取到D1和D2,沒有產生粘包和拆包的情況。第二種情況:Server端一次接收到兩個資料包,

Netty2:/問題與使用LineBasedFrameDecoder的解決方案

substr string 技術分享 query coder 消息頭 handle decode sync 什麽是粘包、拆包 粘包、拆包是Socket編程中最常遇見的一個問題,本文來研究一下Netty是如何解決粘包、拆包的,首先我們從什麽是粘包、拆包開始說起: TCP是個

TCP基本解決方案

scu fonts println mar 是我 perf throws 自己 切割 上個小節我們淺析了在Netty的使用的時候TCP的粘包和拆包的現象,Netty對此問題提供了相對比較豐富的解決方案 Netty提供了幾個常用的解碼器,幫助我們解決這些問題,其實上述

netty權威指南學習筆記四——TCP/問題解決

方法 pan 對象 protect row 學習 ddl .get font   發生了粘包,我們需要將其清晰的進行拆包處理,這裏采用LineBasedFrameDecoder來解決 LineBasedFrameDecoder的工作原理是它依次遍歷ByteBuf中的可讀字節

Netty權威指南_札記04_TCP/問題解決

文章目錄 Netty權威指南_札記04_TCP粘包/拆包問題解決 1. TCP粘包/拆包 1.1 TCP粘包/拆包問題說明 1.2 TCP粘包/拆包發生的原因 1.3 粘包問題解決策略