Netty 原始碼深度解析(九) - 編碼
概述
一個問題


ChannelOutboundHandler
,並將出站資料從 一種格式轉換為另一種格式,和我們方才學習的解碼器的功能正好相反。Netty 提供了一組類, 用於幫助你編寫具有以下功能的編碼器:
- 將訊息編碼為位元組
-
將訊息編碼為訊息
我們將首先從抽象基類 MessageToByteEncoder 開始來對這些類進行考察
1 抽象類 MessageToByteEncoder

MessageToByteEncoder API
解碼器通常需要在Channel
關閉之後產生最後一個訊息(因此也就有了decodeLast()
方法)
這顯然不適於編碼器的場景——在連線被關閉之後仍然產生一個訊息是毫無意義的
1.1 ShortToByteEncoder
其接受一Short
型例項作為訊息,編碼為Short
的原子型別值,並寫入ByteBuf
,隨後轉發給ChannelPipeline
中的下一個ChannelOutboundHandler
每個傳出的 Short 值都將會佔用 ByteBuf 中的 2 位元組

ShortToByteEncoder

1.2 Encoder

Netty 提供了一些專門化的MessageToByteEncoder
,可基於此實現自己的編碼器
Socket/">WebSocket08FrameEncoder
類提供了一個很好的例項

2 抽象類 MessageToMessageEncoder
你已經看到了如何將入站資料從一種訊息格式解碼為另一種
為了完善這幅圖,將展示 對於出站資料將如何從一種訊息編碼為另一種。MessageToMessageEncoder
類的encode()
方法提供了這種能力

MessageToMessageEncoderAPI
IntegerToStringEncoder
擴充套件了MessageToMessageEncoder
-
編碼器將每個出站 Integer 的 String 表示新增到了該 List 中
IntegerToStringEncoder的設計
關於有趣的 MessageToMessageEncoder 的專業用法,請檢視io.netty.handler. codec.protobuf.ProtobufEncoder
類,它處理了由 Google 的 Protocol Buffers 規範所定義 的資料格式。
一個java物件最後是如何轉變成位元組流,寫到socket緩衝區中去的

pipeline中的標準連結串列結構
java物件編碼過程
write:寫佇列
flush:重新整理寫佇列
writeAndFlush: 寫佇列並重新整理
pipeline中的標準連結串列結構

標準的pipeline鏈式結構
資料從head節點流入,先拆包,然後解碼成業務物件,最後經過業務Handler
處理,呼叫write
,將結果物件寫出去
tail
節點,然後通過encoder
節點將物件編碼成ByteBuf
,最後將該ByteBuf
物件傳遞到head
節點,呼叫底層的Unsafe寫到JDK底層管道
Java物件編碼過程
為什麼我們在pipeline中添加了encoder節點,java物件就轉換成netty可以處理的ByteBuf,寫到管道里?
我們先看下呼叫write的code

user
-
然後,user在pipeline中傳遞
AbstractChannel#
DefaultChannelPipeline#
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#
-
情形一
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#
-
情形二
AbstractChannelHandlerContext#
AbstractChannelHandlerContext#invokeWrite0
AbstractChannelHandlerContext#invokeFlush0
handler 如果不覆蓋 flush 方法,就會一直向前傳遞直到 head 節點
落到Encoder
節點,下面是Encoder
的處理流程

按照簡單自定義協議,將Java物件 User 寫到傳入的引數 out中,這個out到底是什麼?
需知User
物件,從BizHandler
傳入到MessageToByteEncoder
時,首先傳到write

1. 判斷當前Handelr是否能處理寫入的訊息(匹配物件)



-
判斷該物件是否是該型別引數匹配器例項可匹配到的型別
TypeParameterMatcher#
具體例項
2 分配記憶體


3 編碼實現
-
呼叫
encode
,這裡就調回到Encoder
這個Handler
中
-
其為抽象方法,因此自定義實現類實現編碼方法
4釋放物件
-
既然自定義Java物件轉換成
ByteBuf
了,那麼這個物件就已經無用,釋放掉 (當傳入的msg
型別是ByteBuf
時,就不需要自己手動釋放了)
5 傳播資料
//112 如果buf中寫入了資料,就把buf傳到下一個節點,直到 header 節點

6 釋放記憶體
//115否則,釋放buf,將空資料傳到下一個節點
// 120 如果當前節點不能處理傳入的物件,直接扔給下一個節點處理
// 127 當buf在pipeline中處理完之後,釋放

Encoder處理傳入的Java物件
-
判斷當前
Handler
是否能處理寫入的訊息- 如果能處理,進入下面的流程
- 否則,直接扔給下一個節點處理
-
將物件強制轉換成
Encoder
可以處理的Response
物件 -
分配一個
ByteBuf
-
呼叫
encoder
,即進入到 Encoder 的 encode方法,該方法是使用者程式碼,使用者將資料寫入ByteBuf - 既然自定義Java物件轉換成ByteBuf了,那麼這個物件就已經無用了,釋放掉(當傳入的msg型別是ByteBuf時,無需自己手動釋放)
- 如果buf中寫入了資料,就把buf傳到下一個節點,否則,釋放buf,將空資料傳到下一個節點
- 最後,當buf在pipeline中處理完之後,釋放節點
總結就是,Encoder
節點分配一個ByteBuf
,呼叫encode
方法,將Java物件根據自定義協議寫入到ByteBuf,然後再把ByteBuf傳入到下一個節點,在我們的例子中,最終會傳入到head節點
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }
這裡的msg就是前面在Encoder節點中,載有java物件資料的自定義ByteBuf物件
write - 寫buffer佇列


ChannelOutboundInvoker#


write(Object msg, boolean flush, ChannelPromise promise)




HeadContext in DefaultChannelPipeline#write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)

Unsafe in Channel#write(Object msg, ChannelPromise promise)
以下過程分三步講解

direct ByteBuf


AbstractChannel#filterOutboundMessage(Object msg)
-
首先,呼叫
assertEventLoop
確保該方法的呼叫是在reactor
執行緒中 -
然後,呼叫
filterOutboundMessage()
,將待寫入的物件過濾,把非ByteBuf
物件和FileRegion
過濾,把所有的非直接記憶體轉換成直接記憶體DirectBuffer
AbstractNioChannel#newDirectBuffer
插入寫佇列
- 接下來,估算出需要寫入的ByteBuf的size

-
最後,呼叫 ChannelOutboundBuffer 的addMessage(msg, size, promise) 方法,所以,接下來,我們需要重點看一下這個方法幹了什麼事情
ChannelOutboundBuffer
想要理解上面這段程式碼,須掌握寫快取中的幾個訊息指標

ChannelOutboundBuffer 裡面的資料結構是一個單鏈表結構,每個節點是一個 Entry,Entry 裡面包含了待寫出ByteBuf 以及訊息回撥 promise下面分別是
三個指標的作用
-
flushedEntry
表第一個被寫到OS Socket緩衝區中的節點
ChannelOutboundBuffer
-
unFlushedEntry
表第一個未被寫入到OS Socket緩衝區中的節點
ChannelOutboundBuffer
-
tailEntry
表ChannelOutboundBuffer
緩衝區的最後一個節點
ChannelOutboundBuffer
圖解過程
-
初次呼叫write 即
addMessage
後fushedEntry
指向空,unFushedEntry
和tailEntry
都指向新加入節點 -
第二次呼叫
addMessage
後 -
第n次呼叫
addMessage
後
可得,呼叫n次addMessage
後
-
flushedEntry
指標一直指向null
,表此時尚未有節點需寫到Socket緩衝區 -
unFushedEntry
後有n個節點,表當前還有n個節點尚未寫到Socket緩衝區
設定寫狀態

ChannelOutboundBuffer#addMessage
-
統計當前有多少位元組需要需要被寫出
ChannelOutboundBuffer#addMessage(Object msg, int size, ChannelPromise promise)
-
當前緩衝區中有多少待寫位元組
ChannelOutboundBuffer#


ChannelConfig#getWriteBufferHighWaterMark()


-
所以預設不能超過64k
WriteBufferWaterMark

-
自旋鎖+CAS 操作,通過 pipeline 將事件傳播到channelhandler 中監控
flush:重新整理buffer佇列
新增重新整理標誌並設定寫狀態
-
不管呼叫
channel.flush()
,還是ctx.flush()
,最終都會落地到pipeline
中的head
節點DefaultChannelPipeline#flush
-
之後進入到
AbstractUnsafe
AbstractChannel#flush()
-
flush方法中,先呼叫
ChannelOutboundBuffer#addFlush
ChannelOutboundBuffer#decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability)
和之前那個例項相同,不再贅述
-
結合前面的圖來看,上述過程即
首先拿到
unflushedEntry
指標,然後將flushedEntry
指向unflushedEntry
所指向的節點,呼叫完畢後
遍歷 buffer 佇列,過濾bytebuf
-
接下來,呼叫
flush0()
-
發現這裡的核心程式碼就一個
doWrite
AbstractChannel#
AbstractNioByteChannel
- 繼續跟
protected void doWrite(ChannelOutboundBuffer in) throws Exception { int writeSpinCount = -1; boolean setOpWrite = false; for (;;) { // 拿到第一個需要flush的節點的資料 Object msg = in.current(); if (msg instanceof ByteBuf) { boolean done = false; long flushedAmount = 0; // 拿到自旋鎖迭代次數 if (writeSpinCount == -1) { writeSpinCount = config().getWriteSpinCount(); } // 自旋,將當前節點寫出 for (int i = writeSpinCount - 1; i >= 0; i --) { int localFlushedAmount = doWriteBytes(buf); if (localFlushedAmount == 0) { setOpWrite = true; break; } flushedAmount += localFlushedAmount; if (!buf.isReadable()) { done = true; break; } } in.progress(flushedAmount); // 寫完之後,將當前節點刪除 if (done) { in.remove(); } else { break; } } } }
-
第一步,呼叫
current()
先拿到第一個需要flush
的節點的資料ChannelOutboundBuffer#current
-
第二步,拿到自旋鎖的迭代次數
-
-
拿到自旋鎖迭代次數

image.png
-
在併發程式設計中使用自旋鎖可以提高記憶體使用率和寫的吞吐量,預設值為16
ChannelConfig
-
繼續看原始碼
AbstractNioByteChannel#
-
javaChannel()
,表明 JDK NIO Channel 已介入此次事件NioSocketChannel#
ByteBuf#readBytes(GatheringByteChannel out, int length)
-
得到向JDK 底層已經寫了多少位元組
PooledDirectByteBuf#
-
從 Netty 的 bytebuf 寫到 JDK 底層的 bytebuffer
-
第四步,刪除該節點
節點的資料已經寫入完畢,接下來就需要刪除該節點
flush
掉的節點(flushedEntry
所指)
然後拿到該節點的回撥物件ChannelPromise
, 呼叫removeEntry()
移除該節點
safeSuccess
回撥,使用者程式碼可以在回撥裡面做一些記錄,下面是一段Example
ctx.write(xx).addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { // 回撥 } })
最後,呼叫recycle
,將當前節點回收
writeAndFlush: 寫佇列並重新整理
writeAndFlush
在某個Handler
中被呼叫之後,最終會落到TailContext
節點

public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { write(msg, true, promise); return promise; }

AbstractChannelHandlerContext#

AbstractChannelHandlerContext#
最終,通過一個boolean
變數,表示是呼叫invokeWriteAndFlush
,還是invokeWrite
,invokeWrite
便是我們上文中的write過程

AbstractChannelHandlerContext#
可以看到,最終呼叫的底層方法和單獨呼叫write
和flush
一樣的


invokeWriteAndFlush
基本等價於write
之後再來一次flush
總結
-
呼叫
write
並沒有將資料寫到Socket緩衝區中,而是寫到了一個單向連結串列的資料結構中,flush
才是真正的寫出 -
writeAndFlush
等價於先將資料寫到netty的緩衝區,再將netty緩衝區中的資料寫到Socket緩衝區中,寫的過程與併發程式設計類似,用自旋鎖保證寫成功 -
netty中的緩衝區中的ByteBuf為DirectByteBuf
當 BizHandler 通過 writeAndFlush 方法將自定義物件往前傳播時,其實可以拆分成兩個過程
- 通過 pipeline逐漸往前傳播,傳播到其中的一個 encode 節點後,其負責重寫 write 方法將自定義的物件轉化為 ByteBuf,接著繼續呼叫 write 向前傳播
-
pipeline中的編碼器原理是建立一個
ByteBuf
,將Java物件轉換為ByteBuf
,然後再把ByteBuf
繼續向前傳遞,若沒有再重寫了,最終會傳播到 head 節點,其中緩衝區列表拿到快取寫到 JDK 底層 ByteBuffer