Netty 原始碼深度解析(八) - 解碼
就像很多標準的架構模式都被各種專用框架所支援一樣,常見的資料處理模式往往也是目標實現的很好的候選物件,它可以節省開發人員大量的時間和精力。
當然這也適應於本文的主題:編碼和解碼,或者資料從一種特定協議的格式到另一種格式的轉 換。這些任務將由通常稱為 編解碼器
的元件來處理
Netty 提供了多種元件,簡化了為了支援廣泛 的協議而建立自定義的編解碼器的過程
例如,如果你正在構建一個基於 Netty 的郵件伺服器,那 麼你將會發現 Netty 對於編解碼器的支援對於實現 POP3、IMAP 和 SMTP 協議來說是多麼的寶貴
0 什麼是編解碼器
每個網路應用程式都必須定義
- 如何解析在兩個節點之間來回傳輸的原始位元組
- 如何將其和目標應用程式的資料格式做相互轉換
這種轉換邏輯由編解碼器處理,編解碼器由編碼器和解碼器組成,它們每種都可以將位元組流從一種格式轉換為另一種格式
那麼它們的區別是什麼呢?
如果將訊息看作是對於特定的應用程式具有具體含義的結構化的位元組序列— 它的資料。那 麼編碼器是將訊息轉換為適合於傳輸的格式(最有可能的就是位元組流);而對應的解碼器則是將 網路位元組流轉換回應用程式的訊息格式。因此,編碼器操作出站資料,而解碼器處理入站資料。
記住這些背景資訊,接下來讓我們研究一下 Netty 所提供的用於實現這兩種元件的類。
1 Netty解碼概述

1.1 兩個問題

在這一節中,我們將研究 Netty 所提供的解碼器類,這些類覆蓋了兩個不同的用例
- 將位元組解碼為訊息——ByteToMessageDecoder 和 ReplayingDecoder
- 將一種訊息型別解碼為另一種——MessageToMessageDecoder
因為解碼器是負責 將入站資料從一種格式轉換到另一種格式
,所以知道 Netty 的解碼器實
現了 ChannelInboundHandler
也不會讓你感到意外
什麼時候會用到解碼器呢?很簡單:每當需要為 ChannelPipeline
中的下一個 Channel- InboundHandler
轉換入站資料時會用到
此外,得益於 ChannelPipeline
的設計,可以將多個解碼器連線在一起,以實現任意複雜的轉換邏輯,這也是 Netty 是如何支援程式碼的模組化以及複用的一個很好的例子



2 抽象解碼器ByteToMessageDecoder
2.1 示例
將位元組解碼為訊息(或者另一個位元組序列)是一項如此常見的任務,以至於 Netty 特地為它提供了一個抽象的基類:ByteToMessageDecoder
由於 你不可能知道遠端節點是否會一次性地傳送一個完整的訊息
,所以這個類會 對入站資料進行緩衝
,直到它準備好處理

ByteToMessageDecoderAPI
假設你接收了一個包含簡單 int 的位元組流,每個 int 都需要被單獨處理
在這種情況下,你需要從入站ByteBuf
中讀取每個 int,並將它傳遞給
ChannelPipeline
中的下一個
ChannelInboundHandler
為了解碼這個位元組流,你要擴充套件 ByteToMessageDecoder
類(原子型別的 int 在被新增到 List 中時,會被自動裝箱為 Integer)

ToIntegerDecoder
每次從入站 ByteBuf 中讀取 4 位元組,將其解碼為一個 int,然後將它新增到一個 List 中
當沒有更多的元素可以被新增到該 List 中時,它的內容將會被髮送給下一個 Channel- InboundHandler

ToIntegerDecoder類擴充套件了ByteToMessageDecoder
雖然 ByteToMessageDecoder
可以很簡單地實現這種模式,但是你可能會發現,在呼叫 readInt()
前不得不驗證所輸入的 ByteBuf 是否具有足夠的資料有點繁瑣
在下一節中, 我們將討論 ReplayingDecoder,它是一個特殊的解碼器,以少量的開銷消除了這個步驟
2.2 原始碼解析


解碼步驟
2.2.1 累加位元組流
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //基於 ByteBuf 進行解碼的,如果不是直接將當前物件向下傳播 if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; //若當前累加器為空,說明是第一次從 IO 流中讀取資料 first = cumulation == null; if (first) { //第一次會將累加器賦值為剛讀進來的 ByteBuf 物件資料 cumulation = data; } else { //非第一次,則將當前累加器中的資料和讀取進來的資料進行累加 cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } //呼叫子類的解碼方法去解析 callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } int size = out.size(); decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }
其中的 cumulator
為

MERGE_CUMULATOR
public static final Cumulator MERGE_CUMULATOR = new Cumulator() { @Override public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) { ByteBuf buffer; //當前的寫指標後移一定位元組,若超過最大容量,則進行擴容 if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1) { // Expand cumulation (by replace it) when either there is not more room in the buffer // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or // duplicate().retain(). // // See: // - https://github.com/netty/netty/issues/2327 // - https://github.com/netty/netty/issues/1764 buffer = expandCumulation(alloc, cumulation, in.readableBytes()); } else { buffer = cumulation; } //將當前資料寫到累加器中 buffer.writeBytes(in); //將讀進的資料物件釋放 in.release(); return buffer; } };
2.2.2 呼叫子類的 decode方法進行解析

進入該方法檢視原始碼
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { try { // 只要累加器有資料,迴圈就會繼續執行下去 while (in.isReadable()) { int outSize = out.size(); // 判斷當前list 裡是否已經有物件(首次執行時,肯定是不會執行此段程式碼的) if (outSize > 0) { // 有,則通過事件傳播機制向下傳播 fireChannelRead(ctx, out, outSize); out.clear(); // Check if this handler was removed before continuing with decoding. // If it was removed, it is not safe to continue to operate on the buffer. // // See: // - https://github.com/netty/netty/issues/4635 if (ctx.isRemoved()) { break; } outSize = 0; } // 記錄當前可讀資料長度 int oldInputLength = in.readableBytes(); decode(ctx, in, out); // Check if this handler was removed before continuing the loop. // If it was removed, it is not safe to continue to operate on the buffer. // // See https://github.com/netty/netty/issues/1664 if (ctx.isRemoved()) { break; } //說明什麼物件都沒解析出來 if (outSize == out.size()) { if (oldInputLength == in.readableBytes()) { break; } else { continue; } } //說明沒有從當前累加器中讀取資料 if (oldInputLength == in.readableBytes()) { throw new DecoderException( StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message."); } if (isSingleDecode()) { break; } } } catch (DecoderException e) { throw e; } catch (Throwable cause) { throw new DecoderException(cause); } }
2.2.2 將解析到的 ByteBuf 向下傳播
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof ByteBuf) { CodecOutputList out = CodecOutputList.newInstance(); try { ByteBuf data = (ByteBuf) msg; first = cumulation == null; if (first) { cumulation = data; } else { cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data); } callDecode(ctx, cumulation, out); } catch (DecoderException e) { throw e; } catch (Throwable t) { throw new DecoderException(t); } finally { if (cumulation != null && !cumulation.isReadable()) { numReads = 0; cumulation.release(); cumulation = null; } else if (++ numReads >= discardAfterReads) { // We did enough reads already try to discard some bytes so we not risk to see a OOME. // See https://github.com/netty/netty/issues/4275 numReads = 0; discardSomeReadBytes(); } // 記錄當前 list 的長度 int size = out.size(); // 將解析到的一個物件向下進行傳播 decodeWasNull = !out.insertSinceRecycled(); fireChannelRead(ctx, out, size); out.recycle(); } } else { ctx.fireChannelRead(msg); } }


編解碼器中的引用計數
對於編碼器和解碼器來說:一旦訊息被編碼或者解碼,它就會被 ReferenceCountUtil.release(message)
呼叫自動釋放
如果你需要保留引用以便稍後使用,那麼你可以呼叫 ReferenceCountUtil.retain(message)
這將會增加該引用計數,從而防止該訊息被釋放
3 基於固定長度解碼器分析
/** * A decoder that splits the received {@link ByteBuf}s by the fixed number * of bytes. For example, if you received the following four fragmented packets: * <pre> * +---+----+------+----+ * | A | BC | DEFG | HI | * +---+----+------+----+ * </pre> * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the * following three packets with the fixed length: * <pre> * +-----+-----+-----+ * | ABC | DEF | GHI | * +-----+-----+-----+ * </pre> */ public class FixedLengthFrameDecoder extends ByteToMessageDecoder { private final int frameLength; /** * Creates a new instance. * * @param frameLength the length of the frame */ public FixedLengthFrameDecoder(int frameLength) { if (frameLength <= 0) { throw new IllegalArgumentException( "frameLength must be a positive integer: " + frameLength); } this.frameLength = frameLength; } @Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } } /** * Create a frame out of the {@link ByteBuf} and return it. * * @paramctxthe {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to * @paraminthe {@link ByteBuf} from which to read data * @returnframethe {@link ByteBuf} which represent the frame or {@code null} if no frame could *be created. */ protected Object decode( @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception { //判斷當前累加器裡的位元組是否小於frameLength if (in.readableBytes() < frameLength) { return null; } else { return in.readRetainedSlice(frameLength); } } }
4 行解碼器分析

非丟棄模式處理
4.1 定位行尾



4.2 非丟棄模式


4.2.1 找到換行符情況下

4.2.2 找不到換行符情況下


解析出長度超過最大可解析長度
直接進入丟棄模式,讀指標移到寫指標位(即丟棄)
並傳播異常
4.3 丟棄模式

找到換行符

記錄當前丟棄了多少位元組(已丟棄 + 本次將丟棄的)
鎖定換行符型別
將讀指標直接移到換行符後
丟棄位元組置零
重置為非丟棄狀態
所有位元組丟棄後才觸發快速失敗機制
找不到換行符

直接記錄當前丟棄位元組(已丟棄 + 當前可讀位元組數)
將讀指標直接移到寫指標
5 基於分隔符解碼器分析

-
構造器
傳入一系列分隔符,通過解碼器將二進位制流分成完整資料包
-
decode 方法
5.1 分析解碼步驟
5.1.1 行處理器
-
行處理器決斷
-
定義位置
-
初始化位置
-
判斷分隔符
5.1.2 找到最小分隔符


遍歷所有分隔符,計算以每一個分隔符分割的資料包的長度
5.1.3 解碼
5.1.3.1 找到分隔符

非空,說明已經找到分隔符
和之前一樣,在此先判斷當前是否處於丟棄模式

非丟棄模式
顯然第一次時為 false, 因此非丟棄模式

當前資料包大於允許解析最大資料長度時,直接將該段資料包連同最小分隔符跳過(丟棄)

沒有超過的就是正常合理邏輯的資料包的長度,判斷解析出的資料包是否包含分隔符

丟棄模式


5.1.3.2 未找到分隔符


5.1.3.2.1 非丟棄模式

當前可讀位元組長大於允許解析最大資料長度時,記錄該丟棄位元組數
5.1.3.2.2 丟棄模式

6 基於長度域解碼器引數分析

重要引數

-
maxFrameLength (包的最大長度)
防止太大導致記憶體溢位,超出包的最大長度 Netty 將會做一些特殊處理
-
lengthFieldOffset (訊息體長度)
ByteBuf
的什麼位置開始就是length
欄位
-
lengthFieldLength
length
欄位的長度
-
lengthAdjustment
有些情況可能會把header也包含到length長度中,或者length欄位後面還有一些不包括在length長度內的,可以通過lengthAdjustment調節
-
initialBytesToStrip
起始截掉的部分,如果傳遞給後面的Handler的資料不需要訊息頭了,可以通過這個設定
可以通過訊息中的一個表示訊息長度的欄位值動態分割收到的ByteBuf
6.1 基於長度

這類資料包協議比較常見,前幾個位元組表示資料包長度(不包括長度域),後面為具體資料
拆完後資料包是一個完整的帶有長度域的資料包(之後即可傳遞到應用層解碼器進行解碼),
建立一個如下方式的 LengthFieldBasedFrameDecoder
即可實現這類協議

6.2 基於長度截斷
若應用層解碼器不需用到長度欄位,那麼我們希望 Netty 拆包後,如此

長度域被截掉,我們只需指定另一個引數 initialBytesToStrip
即可實現
表 Netty 拿到一個完整資料包後向業務解碼器傳遞之前,應該跳過多少位元組

initialBytesToStrip
為4,表獲取一個完整資料包後,忽略前面4個位元組,應用解碼器拿到的就是
不帶長度域
的資料包
6.3 基於偏移長度

此方式二進位制協議更為普遍,前幾個固定位元組表示協議頭,通常包含一些 magicNumber
, protocol version
之類的 meta
資訊,緊跟著後面的是一個長度域,表示包體有多少位元組的資料
只需要基於第一種情況,調整第二個引數既可以實現

lengthFieldOffset
為4,表示跳過4個位元組才是長度域
6.4 基於可調整長度的拆包
有些時候,二進位制協議可能會設計成如下方式

header
在後
- 長度域在資料包最前面表示無偏移,
lengthFieldOffset
為 0 - 長度域的長度為3,即
lengthFieldLength
為3 - 長度域表示的包體的長度略過了header,這裡有另外一個引數
lengthAdjustment
,包體長度調整的大小,長度域的數值表示的長度加上這個修正值表示的就是帶header的包,這裡是12+2
,header和包體一共佔14位元組
6.5 基於偏移可調整長度的截斷
二進位制協議帶有兩個header

拆完後, HDR1
丟棄,長度域丟棄,只剩下第二個 header
和有效包體
HDR1
可以表示
magicNumber
,表示應用只接受以該
magicNumber
開頭的二進位制資料,RPC 裡面用的較多
引數設定
- 長度域偏移為1,即
lengthFieldOffset
為1 - 長度域長度為2,即
lengthFieldLength
為2 - 長度域表示的包體的長度略過
HDR2
,但拆包時HDR2
也被 Netty 當作包體的一部分來拆,HDR2
的長度為1,即lengthAdjustment
為1 - 拆完後,截掉前面三個位元組,即
initialBytesToStrip
為 3
6.6 基於偏移可調整變異長度的截斷
前面所有的長度域表示的都是不帶 header
的包體的長度
如果讓長度域表示的含義包含整個資料包的長度,如下

HDR1
的長度為1,
HDR2
的長度為1,包體的長度為12,
1+1+2+12=16
引數設定
除長度域表示的含義和上一種情況不一樣外,其他都相同,因為 Netty 不瞭解業務情況,需告訴 Netty ,長度域後再跟多少位元組就可形成一個完整資料包,這裡顯然是13位元組,長度域為16,因此減掉3才是真是的拆包所需要的長度, lengthAdjustment
為-3
若你的協議基於長度,即可考慮不用位元組來實現,而是直接拿來用,或者繼承他,簡單修改即可
7 基於長度域解碼器分析
7.1 構造方法

public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { // 省略引數校驗 this.byteOrder = byteOrder; this.maxFrameLength = maxFrameLength; this.lengthFieldOffset = lengthFieldOffset; this.lengthFieldLength = lengthFieldLength; this.lengthAdjustment = lengthAdjustment; lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength; this.initialBytesToStrip = initialBytesToStrip; this.failFast = failFast; }
把傳引數儲存在 field即可
- byteOrder
位元組流表示的資料是大端還是小端,用於長度域的讀取 - lengthFieldEndOffset
緊跟長度域欄位後面的第一個位元組的在整個資料包中的偏移量 - failFast
- 為true 表讀取到長度域,TA的值的超過
maxFrameLength
,就拋TooLongFrameException
- 為
false
表只有當真正讀取完長度域的值表示的位元組之後,才拋TooLongFrameException
,預設設為true
,建議不要修改,否則可能會造成記憶體溢位
- 為true 表讀取到長度域,TA的值的超過
7.2 實現拆包抽象
具體的拆包協議只需要實現
void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
in
表目前為止還未拆的資料,拆完之後的包新增到 out
這個list中即可實現包向下傳遞
-
第一層實現
過載的 protected
方法 decode
實現真正的拆包,以下三步走

1 計算需要抽取的資料包的長度
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { // 拿到實際的未調整過的包長度 long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder); if (frameLength < lengthFieldEndOffset) { failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset); } if (frameLength > maxFrameLength) { exceededFrameLength(in, frameLength); return null; } }
-
拿到長度域的實際位元組偏移
-
調整包的長度
-
如果當前可讀位元組還未達到長度長度域的偏移,那說明肯定是讀不到長度域的,直接不讀
上面有個 getUnadjustedFrameLength
,若你的長度域代表的值表達的含義不是基本的int,short等基本型別,可重寫該方法

比如,有的奇葩的長度域裡面雖然是4個位元組,比如 0x1234,但是TA的含義是10進位制,即長度就是十進位制的1234,那麼覆蓋這個函式即可實現奇葩長度域拆包
- 長度校驗
-
整個資料包的長度還沒有長度域長,直接拋異常
-
資料包長度超出最大包長度,進入丟棄模式
- 當前可讀位元組已達到
frameLength
,直接跳過frameLength
個位元組,丟棄之後,後面有可能就是一個合法的資料包 - 當前可讀位元組未達到
frameLength
,說明後面未讀到的位元組也需丟棄,進入丟棄模式,先把當前累積的位元組全部丟棄
- 當前可讀位元組已達到
bytesToDiscard
表還需丟棄多少位元組

- 最後,呼叫
failIfNecessary
判斷是否需要丟擲異常- 不需要再丟棄後面的未讀位元組(
bytesToDiscard == 0
),重置丟棄狀態- 如果沒有設定快速失敗(
!failFast
),或者設定了快速失敗並且是第一次檢測到大包錯誤(firstDetectionOfTooLongFrame
),丟擲異常,讓handler處理 -
如果設定了快速失敗,並且是第一次檢測到打包錯誤,丟擲異常,讓handler去處理
- 如果沒有設定快速失敗(
- 不需要再丟棄後面的未讀位元組(
前面我們可以知道 failFast
預設為 true
,而這裡 firstDetectionOfTooLongFrame
為 true
,所以,第一次檢測到大包肯定會丟擲異常

3 丟棄模式的處理
LengthFieldBasedFrameDecoder.decoder
方法入口處還有一段程式碼

-
若當前處在丟棄模式,先計算需要丟棄多少位元組,取當前還需可丟棄位元組和可讀位元組的最小值,丟棄後,進入
failIfNecessary
,對照著這個函式看,預設情況下是不會繼續丟擲異常,而如果設定了 failFast為false,那麼等丟棄完之後,才會丟擲異常
2 跳過指定位元組長度的邏輯處理
在丟棄模式的處理及長度校驗都通過後
-
initialBytesToStrip
CorruptedFrameException
先驗證當前是否已讀到足夠的位元組,若讀到了,在下一步抽取一個完整的資料包之前,需根據
initialBytesToStrip
的設定來跳過某些位元組,當然,跳過的位元組不能大於資料包的長度,否則拋CorruptedFrameException
異常
抽取frame
-
拿到當前累積資料的讀指標,然後拿到待抽取資料包的實際長度進行抽取,抽取之後,移動讀指標
- 抽取的過程即呼叫了一下
ByteBuf
的retainedSlice
API,該API無記憶體copy的開銷
小結
LengthFieldBasedFrameDecoder LengthFieldBasedFrameDecoder
8 解碼器總結
8.1 ByteToMessageDecoder 解碼步驟

8.2 基於長度解碼器步驟

8.3 兩個問題
