死磕Netty原始碼之Netty編解碼原始碼解析
前言
Netty的底層是基於TCP實現的,TCP協議在傳輸資料的過程中一個完整的業務可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,因此我們需要考慮Netty的粘包拆包問題
訊息解碼器
Netty提供了拆包的基類ByteToMessageDecoder,如果我們為引用程式添加了解碼器每次從TCP緩衝區讀到資料都會呼叫到ByteToMessageDecoder的channelRead方法,它是Netty解碼的入口
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
callDecode(ctx, cumulation, out );
} catch (DecoderException e) {
throw e;
} catch (Throwable t) {
throw new DecoderException(t);
} finally {
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null ;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else {
ctx.fireChannelRead(msg);
}
}
從上訴程式碼可以看出Netty拆包的過程主要分為一下四個流程
1.累加資料
2.將累加到的資料傳遞給業務進行業務拆包
3.清理位元組容器
4.傳遞業務資料包給業務解碼器處理
累加資料
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
累加資料是通過如上程式碼實現的,主要功能就是將讀取到的資料塞到ButeBuf中去。上面涉及到一個累加器cumulator(實現的功能就是往ByteBuf追加資料),在該類中定義瞭如下兩個累加器(預設情況下會使用MERGE_CUMULATOR)
public static final Cumulator MERGE_CUMULATOR
public static final Cumulator COMPOSITE_CUMULATOR
下面我們看一下MERGE_CUMULATOR是如何將新讀取到的資料累加到位元組容器裡的
public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
@Override
public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
final ByteBuf buffer;
if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes() || cumulation.refCnt() > 1 || cumulation.isReadOnly()) {
buffer = expandCumulation(alloc, cumulation, in.readableBytes());
} else {
buffer = cumulation;
}
buffer.writeBytes(in);
in.release();
return buffer;
}
};
Netty中ByteBuf的抽象使得累加非常簡單,通過一個簡單的API呼叫buffer.writeBytes(in)便將新資料累加到位元組容器中,為了防止位元組容器大小不夠在累加之前還進行了擴容處理,擴容也是一個記憶體拷貝操作新增的大小即是新讀取資料的大小
static ByteBuf expandCumulation(ByteBufAllocator alloc, ByteBuf cumulation, int readable) {
ByteBuf oldCumulation = cumulation;
cumulation = alloc.buffer(oldCumulation.readableBytes() + readable);
cumulation.writeBytes(oldCumulation);
oldCumulation.release();
return cumulation;
}
將累加到的資料傳遞給業務進行拆包
到這一步位元組容器裡的資料已是目前未拆包部分的所有的資料了
CodecOutputList out = CodecOutputList.newInstance();
callDecode(ctx, cumulation, out);
callDecode將嘗試將位元組容器的資料拆分成業務資料包塞到業務資料容器out中
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
while (in.isReadable()) {
// 記錄一下位元組容器中有多少位元組待拆
int oldInputLength = in.readableBytes();
decode(ctx, in, out);
if (out.size() == 0) {
// 拆包器未讀取任何資料
if (oldInputLength == in.readableBytes()) {
break;
} else {
// 拆包器已讀取部分資料,還需要繼續
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(StringUtil.simpleClassName(getClass()) + ".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
}
在解碼之前先記錄一下位元組容器中有多少位元組待拆,然後呼叫抽象函式decode進行拆包
protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;
Netty中對各種使用者協議的支援就體現在這個抽象函式中,所有的拆包器最終都實現了該抽象方法
業務拆包完成之後如果發現並沒有拆到一個完整的資料包,這個時候又分兩種情況
1.拆包器什麼資料也沒讀取,可能資料還不夠業務拆包器處理,直接break等待新的資料
2.拆包器已讀取部分資料,說明解碼器仍然在工作,繼續解碼
業務拆包完成之後如果發現已經解析到資料包但是並沒有讀取任何資料,這個時候就會丟擲一個Runtime異常,告訴你什麼資料都沒讀取卻解析出一個業務資料包這是有問題的
清理位元組容器
業務拆包完成之後只是從位元組容器中取走了資料,但是這部分空間對於位元組容器來說依然保留著,而位元組容器每次累加位元組資料的時候都是將位元組資料追加到尾部,如果不對位元組容器做清理那麼時間一長就會OOM
正常情況下其實每次讀取完資料,Netty都會在下面這個方法中將位元組容器清理
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes();
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead()) {
ctx.read();
}
}
ctx.fireChannelReadComplete();
}
但是當傳送端傳送資料過快channelReadComplete沒有辦法及時清理可能會引發OOM,所以為防止傳送端傳送資料過快Netty會在每次讀取到一次資料,業務拆包之後對位元組位元組容器做清理,清理部分的程式碼如下
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes();
}
如果位元組容器當前已無資料可讀取,直接釋放該容器並且將cumulation置為null減少下次拆包時計數器累加的工作,如果連續16次(discardAfterReads的預設值)位元組容器中仍然有未被業務拆包器讀取的資料那就做一次壓縮,將有效資料段移到容器首部
discardSomeReadBytes之前,位元組累加器中的資料分佈
+--------------+----------+----------+
| readed | unreaded | writable |
+--------------+----------+----------+
discardSomeReadBytes之後,位元組容器中的資料分佈
+----------+-------------------------+
| unreaded | writable |
+----------+-------------------------+
這樣位元組容器又可以承載更多的資料了
傳遞業務資料包給業務解碼器處理
以上三個步驟完成之後,就可以將拆成的包丟到業務解碼器處理了,程式碼如下
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
最後呼叫fireChannelRead將拆到的業務資料包都傳遞到後續的handler,如果未解析到有效的資料包此處的msgs長度為0,即如果在拆包過程中未解析到有效的資料包,讀事件不會往下傳遞
static void fireChannelRead(ChannelHandlerContext ctx, CodecOutputList msgs, int numElements) {
for (int i = 0; i < numElements; i ++) {
ctx.fireChannelRead(msgs.getUnsafe(i));
}
}
這樣就可以把一個個完整的業務資料包傳遞到後續的業務解碼器進行解碼,隨後處理業務邏輯
訊息編碼器
關於訊息編碼器原理與訊息解碼器類似,不同的是訊息編碼器的抽象是MessageToByteDecoder。所以此處不展開分析