1. 程式人生 > >Netty 中的訊息解析和編解碼器

Netty 中的訊息解析和編解碼器

本篇內容主要梳理一下 Netty 中編解碼器的邏輯和編解碼器在 Netty 整個鏈路中的位置。 前面我們在分析 ChannelPipeline 的時候說到入站和出站事件的處理都在 pipeline 中維護著,通過list的形式將處理事件的 handler 按照先後關係儲存為一個列表,有對應的事件過來就按照列表順序取出 handler 來處理事件。 如果是入站事件按照 list 自然順序呼叫 handler 來處理,如果是出站事件則反序呼叫 handler 來處理。所有的入站事件處理器都繼承自 ChannelInboundHandler,出站事件處理器都繼承自 ChannelOutboundHandler。channelPipeline 上的註釋有說明 inbound 事件的傳播順序是: ```txt * 入棧事件傳播方法 *
  • {@link ChannelHandlerContext#fireChannelRegistered()}
  • *
  • {@link ChannelHandlerContext#fireChannelActive()}
  • *
  • {@link ChannelHandlerContext#fireChannelRead(Object)}
  • *
  • {@link ChannelHandlerContext#fireChannelReadComplete()}
  • *
  • {@link ChannelHandlerContext#fireExceptionCaught(Throwable)}
  • *
  • {@link ChannelHandlerContext#fireUserEventTriggered(Object)}
  • *
  • {@link ChannelHandlerContext#fireChannelWritabilityChanged()}
  • *
  • {@link ChannelHandlerContext#fireChannelInactive()}
  • *
  • {@link ChannelHandlerContext#fireChannelUnregistered()}
  • * * ``` 即 handler 中的方法呼叫順序是如上所示,我們主要關注的點在 `channelRead()` 方法上。下面就由 `channelRead()` 出發,去看看編解碼器的使用。 #### 1. channelRead 解析 inbound 事件的入口在 NioEventLoop #run() 方法#processSelectedKeys()#processSelectedKeysPlain()#processSelectedKey()#unsafe.read()。 這裡的 UnSafe 是定義在 Channel 介面中的子介面,並不是 JDK 的 UnSafe 類。UnSafe作為 channel 的內部類承擔著 channel 網路讀寫相關的功能,這裡可以抽出一節討論,不是本篇的重點。我們繼續看 UnSafe 的子類 NioByteUnsafe 重寫的 read() 方法: ```java @Override public final void read() { final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); //allocator負責建立緩衝區 final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { //分配記憶體 byteBuf = allocHandle.allocate(allocator); //讀取socketChannel資料到分配的byteBuf,對寫入的大小進行一個累計疊加 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // nothing was read. release the buffer. byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; break; } allocHandle.incMessagesRead(1); readPending = false; //觸發pipeline的ChannelRead事件來對byteBuf進行後續處理 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); // 記錄總共讀取的大小 allocHandle.readComplete(); pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } } } ``` read()方法從記憶體讀取資料給到 ByteBuf,上一節我們提到了ByteBuf,Netty 自己實現的 byte 位元組累加器。下面有一個while迴圈,每次讀取的 bytebuf 會給到 `pipeline.fireChannelRead(byteBuf)`方法去處理。繼續看 ChannelPipeline 的預設實現類 DefaultChannelPipeline 中的實現: ```java @Override public final ChannelPipeline fireChannelRead(Object msg) { AbstractChannelHandlerContext.invokeChannelRead(head, msg); return this; } ``` 呼叫了 `AbstractChannelHandlerContext#invokeChannelRead()`方法: ```java static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRead(m); } }); } } private void invokeChannelRead(Object msg) { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelRead(this, msg); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRead(msg); } } ``` 重點就在 `invokeChannelRead()` 的這一句: ```java ((ChannelInboundHandler) handler()).channelRead(this, msg); ``` 最終觸發了 `ChannelInboundHandler#channelRead(ChannelHandlerContext ctx, Object msg)` 方法。 所有的入站事件都實現了 ChannelInboundHandler 介面,不難理解我們的 handler 就是這樣接收到 bytebuf 然後進行下一步處理的。 #### 2. Read 事件一次可以讀多少位元組 說編解碼器之前我們先解決一個問題,如果不使用任何的編解碼器,預設的傳輸物件應該是 byteBuf,那麼 Netty 預設一次是讀取多少位元組呢?前面在講粘包的文章裡我在 packageEvent1工程示例中演示了不使用任何編解碼工具讀取資料,預設一次會話會讀取1024位元組,大家有興趣可以回到上一篇看看 [Netty 中的粘包和拆包](https://www.cnblogs.com/rickiyang/p/12904552.html),在 handler 中打上斷點就知道當前一次讀取包的長度。既然知道是1024,就好奇到底是在哪裡設定的,出發點肯定還是上面提到的 read() 方法: ```java byteBuf = allocHandle.allocate(allocator); ``` 這一句就是從記憶體中拿出位元組分配到 bytebuf,`allocate()` 是 RecvByteBufAllocator 介面中的方法,這個介面有很多實現類,那到底預設是哪個實現類生效呢? 我們再回到 NioSocetChannel ,看他的構造方法: ```java public NioSocketChannel(Channel parent, SocketChannel socket) { super(parent, socket); config = new NioSocketChannelConfig(this, socket.socket()); } private final class NioSocketChannelConfig extends DefaultSocketChannelConfig { private NioSocketChannelConfig(NioSocketChannel channel, Socket javaSocket) { super(channel, javaSocket); } @Override protected void autoReadCleared() { clearReadPending(); } } ``` 這裡會生成一些配置資訊,主要是一些 socket 預設引數以供初始化連線使用。NioSocketChannelConfig 構造方法裡面呼叫了父類 DefaultSocketChannelConfig 的構造方法: ```java public DefaultSocketChannelConfig(SocketChannel channel, Socket javaSocket) { super(channel); if (javaSocket == null) { throw new NullPointerException("javaSocket"); } this.javaSocket = javaSocket; // Enable TCP_NODELAY by default if possible. if (PlatformDependent.canEnableTcpNoDelayByDefault()) { try { setTcpNoDelay(true); } catch (Exception e) { // Ignore. } } } ``` 同樣這裡又往上呼叫了父類 DefaultChannelConfig : ```java public DefaultChannelConfig(Channel channel) { this(channel, new AdaptiveRecvByteBufAllocator()); } protected DefaultChannelConfig(Channel channel, RecvByteBufAllocator allocator) { setRecvByteBufAllocator(allocator, channel.metadata()); this.channel = channel; } ``` 怎樣,是不是看到了 AdaptiveRecvByteBufAllocator, 他就是 RecvByteBufAllocator 的實現類之一。所以我們只要看它是怎樣設定預設值即可。 AdaptiveRecvByteBufAllocator 的預設構造方法: ```java public AdaptiveRecvByteBufAllocator() { this(DEFAULT_MINIMUM, DEFAULT_INITIAL, DEFAULT_MAXIMUM); } ``` 這3個引數的預設值為: ```java static final int DEFAULT_MINIMUM = 64; static final int DEFAULT_INITIAL = 1024; static final int DEFAULT_MAXIMUM = 65536; ``` `DEFAULT_MINIMUM` 是緩衝區最小值,`DEFAULT_INITIAL` 是緩衝區預設值,`DEFAULT_MAXIMUM`是緩衝區最大值,到這裡我們就找到了預設值是從哪裡來的了。 預設大小是1024,但是並不是固定不變,它會有一個動態調整的動作。除了這三個欄位外,還定義了兩個動態調整容量的步長索引引數: ```java private static final int INDEX_INCREMENT = 4; private static final int INDEX_DECREMENT = 1; ``` 擴張的步進索引為4,收縮的步進索引為1。 ```java private static final int[] SIZE_TABLE; static { List sizeTable = new ArrayList(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); } } ``` SIZE_TABLE 為長度向量表,作用就是儲存步長。上面的 static 修飾的程式碼塊作用就是初始化長度向量表。從16開始,每次遞增16,直到512,這裡陣列的下標為30。下標31的初始值為512, i遞增的值為左移一位,左移一位相當於乘以2,所以每次遞增是以當前值的倍數增加的,最終增加到的值直到 Integer 能達到的最大值。 長度向量表的值可以得出: ```txt 0-->16 1-->32 2-->48 3-->64 4-->80 5-->96 6-->112 7-->128 8-->144 9-->160 10-->176 11-->192 12-->208 13-->224 14-->240 15-->256 16-->272 17-->288 18-->304 19-->320 20-->336 21-->352 22-->368 23-->384 24-->400 25-->416 26-->432 27-->448 28-->464 29-->480 30-->496 31-->512 32-->1024 33-->2048 34-->4096 35-->8192 36-->16384 37-->32768 38-->65536 39-->131072 40-->262144 41-->524288 42-->1048576 43-->2097152 44-->4194304 45-->8388608 46-->16777216 47-->33554432 48-->67108864 49-->134217728 50-->268435456 51-->536870912 52-->1073741824 ``` SIZE_TABLE 裡面的值是幹啥用的呢,剛才提到會將 byte 資料先預讀到緩衝區,初始預設大小為1024,當目前沒有這麼多位元組需要讀的時候,會動態縮小緩衝區,而預判待讀取的位元組有很多的時候會擴大緩衝區。 動態預估下一次可能會有多少資料待讀取的操作在哪裡呢?還是回到 read()方法,while 迴圈完一輪之後,會執行一句: ```java allocHandle.readComplete(); ``` 對應到 `AdaptiveRecvByteBufAllocator` 中: ```java @Override public void readComplete() { record(totalBytesRead()); } //根據當前的actualReadBytes大小,對nextReceiveBufferSize進行更新 private void record(int actualReadBytes) { //如果actualReadBytes 小於 當前索引-INDEX_DECREMENT-1 的值,說明容量需要縮減 if (actualReadBytes <= SIZE_TABLE[Math.max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { //則取 當前索引-INDEX_DECREMENT 與 minIndex的最大值 index = Math.max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } //讀到的值大於緩衝大小 } else if (actualReadBytes >
    = nextReceiveBufferSize) { // INDEX_INCREMENT=4 index前進4 index = Math.min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } } ``` 通過上一次的流大小來預測下一次的流大小,可針對不同的應用場景來進行緩衝區的分配。像IM訊息可能是幾K ,檔案傳輸可能是幾百M,不同的場景用到的記憶體緩衝大小不一樣對效能的影響也不同。如果所有的場景都是同一種記憶體空間分配,客戶端連線多的情況下,執行緒數過多可能導致記憶體溢位。 #### 3. Netty 中的編解碼器 上面兩小節聊到訊息從哪裡來,預設訊息格式為 ByteBuf,緩衝區大小預設為1024,會動態預估下次緩衝區大小。下面我們就正式來說一下編解碼相關的內容,編解碼相關的原始碼都在 codec 包中: ![](https://img2020.cnblogs.com/blog/1607781/202005/1607781-20200523102852563-1539772634.png) 因為編碼器要實現的是對輸出的內容編碼,都是實現 ChannelOutboundHandler 介面,解碼器對接收的內容解碼,都是實現 ChannelInboundHandler 介面,所以可以完全適配 ChannelPipeline 將編解碼器作為一種外掛的形式做一些靈活的搭配。 ##### 3.1 decoder 解碼器負責將輸入的訊息解析為指定的格式。訊息輸入都來自inbound,即繼承 ChannelInboundHandler 介面,頂級的解碼器有兩種型別: - 將位元組解碼為訊息:`ByteToMessageDecoder` - 將一種訊息型別解碼為另一種 型別:`MessageToMessageDecoder`。 ![](https://img2020.cnblogs.com/blog/1607781/202005/1607781-20200523102905155-1246356571.png) 位元組碼解析為訊息這應該是最普通,最基本的使用方式,這裡所謂的位元組碼就是上面我們講到的 ByteBuf 序列,預設包含1024位元組的位元組陣列。關於 ByteToMessageDecoder 的分析上一節在講粘包的時候順帶提及,大家有興趣可以回去看看:[ByteToMessageDecoder 分析](https://www.cnblogs.com/rickiyang/p/12904552.html)。 MessageToMessageDecoder 更好理解,比如訊息的型別為Integer,需要將 Integer 轉為 String。那麼就可以繼承 MessageToMessageDecoder 實現自己的轉換方法。我們先簡單看一下它的實現: ```java @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { CodecOutputList out = CodecOutputList.newInstance(); try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked") I cast = (I) msg; try { decode(ctx, cast, out); } finally { ReferenceCountUtil.release(cast); } } else { out.add(msg); } } catch (DecoderException e) { throw e; } catch (Exception e) { throw new DecoderException(e); } finally { int size = out.size(); for (int i = 0; i < size; i ++) { ctx.fireChannelRead(out.getUnsafe(i)); } out.recycle(); } } protected abstract void decode(ChannelHandlerContext ctx, I msg, List out) throws Exception; ``` 上面的 `channelRead()`方法中將 msg 轉為訊息原本的型別,然後進入 `decode()`方法。 `decode()` 是一個抽象方法,言意之下你想轉為啥型別,你就實現該方法去轉便是。 ##### 3.2 encoder 編碼器主要的作用是將出站事件的訊息按照指定格式編碼輸出。那麼編碼器應該是繼承 outBound 事件,看一下主要的類圖: ![](https://img2020.cnblogs.com/blog/1607781/202005/1607781-20200523102917141-1645224302.png) 編碼器的基本型別與解碼器相反:將物件拆解為位元組,將物件編碼為另一種物件。 關於基本編解碼器的使用和自定義編解碼器上一節我們已經講過,這裡就不再複述。下一篇單獨看看在 Netty 中使用protobuf編碼格式進行資料