Netty4.0原始碼解析:TCP粘包半包問題的解決方案
一、引言
TCP是一個基於流的協議,TCP作為傳輸層協議並不知道應用層協議的具體含義,它會根據TCP緩衝區的實際情況進行資料包的劃分,所以在應用層上認為是一個完整的包,可能會被TCP拆分成多個包進行傳送,也有可能把多個小的包封裝成一個大的資料包傳送,這就是所謂的TCP粘包和半包問題。
Netty提供了多個進站處理器來處理這個問題: 1、LineBasedFrameDecoder:通過換行符來區分每個包 2、DelimiterBasedFrameDecoder:通過特殊分隔符來區分每個包 3、FixedLengthFrameDecoder:通過定長的報文來分包 4、LengthFieldBasedFrameDecoder:跟據包頭部定義的長度來區分包
這幾個類都擁有一個共同的父類:ByteToMessageDecoder
需要注意的是,ByteToMessageDecoder的子類不允許使用@Sharable註解,否則在構造階段會丟擲IllegalStateException異常。
二、ByteToMessageDecoder
ByteToMessageDecoder提供了最基本的位元組轉換為可識別訊息的功能,也就是將多個ByteBuf轉換為一個可識別的ByteBuf。一般放在ChannelPipeline管道的頭部。
ByteToMessageDecoder持有以下成員變數:
//每次和其它ByeBuf訊息碎片合併後的緩衝區
ByteBuf cumulation;
//合併策略,這裡預設為通過一次記憶體複製操作來完成cumulation和讀入的ByteBuf的合併
private Cumulator cumulator = MERGE_CUMULATOR;
//是否僅解碼一條訊息
private boolean singleDecode;
//是否在沒有位元組可讀時嘗試進行獲取更多的位元組進行解碼
private boolean decodeWasNull;
//cumulation是否為null
private boolean first;
//本次解碼的狀態
private byte decodeState = STATE_INIT;
//當讀到多少個零碎的ByteBuf時就將當前cumulation作為壞包丟棄
private int discardAfterReads = 16;
//本次解碼讀到的ByteBuf的數量
private int numReads;
1、channelRead方法
要了解ByteToMessageDecoder解碼機制,我們可以從它的channelRead方法開始分析:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) { //接收Channel讀到的資料,此時的ByteBuf資料可能是不可讀的
//構造一個List,用於存放每個ByteBuf解碼的結果
CodecOutputList out = CodecOutputList.newInstance();
try {
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) //如果cumulation為null,就無需進行ByteBuf的合併
cumulation = data;
else //否則呼叫Cumulator的cumulate方法將當前ByteBuf和cumulation合併
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
callDecode(ctx, cumulation, out);
} catch (DecoderException e) {
throw e;
} catch (Exception e) {
throw new DecoderException(e);
} finally {
//如果cumulation中的位元組已經全部解碼成功,那麼將當前ByteToMessageDecoder復位
if (cumulation != null && !cumulation.isReadable()) {
numReads = 0;
cumulation.release();
cumulation = null;
//否則cumulation還仍有零散的訊息碎片
} else if (++ numReads >= discardAfterReads) {
numReads = 0;
discardSomeReadBytes(); //丟棄已經讀取到的位元組
}
int size = out.size();
decodeWasNull = !out.insertSinceRecycled();
fireChannelRead(ctx, out, size);
out.recycle();
}
} else { //如果不是ByteBuf,就忽略這個訊息傳遞給下一個管道
ctx.fireChannelRead(msg);
}
}
channelRead方法的執行可以分為以下幾個步驟:
- 獲取一個CodecOutputList,用於存放每次channelRead方法呼叫完成後的解碼結果
CodecOutputList實現了java.util.List介面,並通過FastThreadLocal存放在InternalThreadLocalMap中,每個執行緒都預設持有16個CodecOutputList例項,通過CodecOutputList池CodecOutputLists來維護,如果所需的CodecOutputList超出16個,那麼會預設例項化一個新的CodecOutputList例項。
- 將當前ByteBuf與之前讀到的ByteBuf(成員變數cumulation)進行合併
如果cumulation為空,那麼直接將當前ByteBuf引用賦給cumulation 如果cumulation不為空,那麼將根據成員變數cumulator定義的合併策略進行ByteBuf的合併。 Cumulator是ByteToMessageDecoder的內部介面,定義了一個方法:
ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in);
引數alloc為ByteBuf分配器,用於分配一個新的ByteBuf,可以通過呼叫ChannelHandlerContext的alloc方法獲得。cumulation為原ByteBuf緩衝區,in為需要被合併的ByteBuf緩衝區,其返回值為合併後的ByteBuf緩衝區。
ByteToMessageDecoder預設定義了2種Cumulator實現類:ByteToMessageDecoder.MERGE_CUMULATOR和ByteToMessageDecoder.COMPOSITE_CUMULATOR。
MERGE_CUMULATOR的合併策略是通過ByteBufAllocator分配一個大小為cumulation加上in的可讀位元組數,然後將cumulation和in的資料複製到緩衝區中,所以MERGE_CUMULATOR需要一次記憶體複製操作。ByteToMessageDecoder預設採用這種策略合併緩衝區。
COMPOSITE_CUMULATOR的合併策略是通過CompositeByteBuf來完成ByteBuf的合併,它可以持有多個ByteBuf例項,所以不需要進行記憶體複製操作。但是CompositeByteBuf的索引演算法實現較為複雜,可能會比MERGE_CUMULATOR要慢。
- 合併完成後,對合並後的ByteBuf緩衝區(cumulation)的資料進行解碼
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size(); //注:在第一次迴圈中outSize為0
if (outSize > 0) { //如果List的長度大於0,說明已經有解碼好的訊息
//產生一個ChannelRead事件,並將集合out的每個元素傳播到管道
fireChannelRead(ctx, out, outSize);
out.clear(); //清空這個List
if (ctx.isRemoved()) //如果已經從管道中移除,那麼退出迴圈結束
break;
outSize = 0;
}
//獲取可讀位元組數量
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) //如果this已經從管道移除,那麼退出迴圈
break;
if (outSize == out.size()) { //如果集合out元素數量在本次迴圈中沒有改變
//如果在decodeRemovalReentryProtection沒有處理任何資料
if (oldInputLength == in.readableBytes())
break;
else
continue;
}
if (oldInputLength == in.readableBytes()) //一般不會發生
throw new DecoderException(StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
if (isSingleDecode()) //如果僅解碼一條訊息,那麼退出迴圈
break;
}
} catch (DecoderException e) {
throw e;
} catch (Exception cause) {
throw new DecoderException(cause);
}
}
callDecode通過一個while迴圈,不斷嘗試從cumulation中讀入資料。對於每次迴圈,可以分為以下幾個步驟: (1)獲取集合out的元素數量,如果集合out元素數量大於0,說明已經有解碼成功的訊息(ByteBuf物件),那麼產生一個channelRead事件並將每個解碼好的訊息傳遞到管道的下一個ChannelHandler,然後清空這個out集合。如果元素數量為0,那麼就意味著沒有解碼完成的訊息。 (2)獲取cumulation的可讀位元組數oldInputLength,並呼叫decodeRemovalReentryProtection方法對cumulation中的二進位制資料進行解碼。如果成功從cumulation中分離出可讀的訊息,那麼該方法會往out集合中新增這個訊息的ByteBuf物件。 (3)檢查this有沒有從當前管道移除,如果移除則退出迴圈,方法結束。 (4)如果集合out中的元素數量在本次迴圈中沒有改變並且decodeRemovalReentryProtection方法沒有處理cumulation中的位元組,那麼退出迴圈方法結束。如果集合out中的元素沒有改變但是處理了cumulation中的位元組,那麼重新開始迴圈。 (5)如果僅僅需要解碼一條訊息,那麼退出迴圈。
decodeRemovalReentryProtection方法負責呼叫decode方法嘗試對合並後訊息進行解碼:
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
//呼叫抽象方法decode進行解碼,由子類進行實現
decode(ctx, in, out);
} finally {
//如果已經成功將多個ByteBuf轉換為一個可識別訊息
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT; //將解碼狀態修改為STATE_INIT
if (removePending)
handlerRemoved(ctx);
}
}
decode方法為抽象方法,需要子類根據自己的分離訊息的策略進行實現。 ByteToMessageDecoder規定:如果decode方法成功從cumulation中分離出一條可讀的訊息,那麼會將這個訊息新增到集合out中。如果尚未提取出可讀的訊息,則無需改動集合out中的訊息。
2、channelReadComplete方法
當JDK的SocketChannel已經讀完本次客戶端傳送過來的所有位元組後,channelReadComplete方法隨即會被呼叫:
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
numReads = 0;
discardSomeReadBytes(); //丟棄cumulation已經讀完的位元組
if (decodeWasNull) {
decodeWasNull = false;
if (!ctx.channel().config().isAutoRead())
ctx.read(); //嘗試從Channel獲取更多的位元組
}
//將channelReadComplete事件傳遞給下一個Handler
ctx.fireChannelReadComplete();
}
channelReadComplete方法實現比較簡單,首先丟棄cumulation中已經讀取完成的位元組(也就是丟棄0~readIndex範圍內的位元組)。然後,呼叫ChannelHandlerContext的read方法請求從Channel通道獲取更多的位元組。 回顧下DefaultChannelPipeline管道模型,read方法最終會呼叫到管道頭結點HeadContext引用的Unsafe物件的beginRead方法,beginRead方法會將OP_READ事件新增到這個JDK Channel感興趣的事件中。 最後,再將ChannelReadComplete事件傳遞到下一個ChannelHandler。
三、FixedLengthFrameDecoder
FixedLengthFrameDecoder通過定長的報文來分包。在構造FixedLengthFrameDecoder時,需要傳入一個int型的變數,代表單個包的位元組數。 FixedLengthFrameDecoder的實現比較簡單:
public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
private final int frameLength; //每個包的位元組數
public FixedLengthFrameDecoder(int frameLength) {
if (frameLength <= 0)
throw new IllegalArgumentException("frameLength must be a positive integer: " + frameLength);
this.frameLength = frameLength;
}
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null)
out.add(decoded);
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
if (in.readableBytes() < frameLength) {
return null;
} else {
return in.readSlice(frameLength).retain();
}
}
}
FixedLengthFrameDecoder直接通過ByteBuf的readSlice方法來擷取包。 decode方法執行步驟如下: (1)如果傳入的ByteBuf的刻度位元組數小於包的長度,那麼方法返回結束 (2)否則,呼叫這個ByteBuf的readSlice方法將當前ByteBuf的緩衝區陣列的readIndex ~ readIndex+frameLength處的位元組串擷取下來生成一個新的ByteBuf例項,然後將這個新ByteBuf的引用計數器加1。這個ByteBuf例項包含的資料就是一個可讀的包。然後,將這個ByteBuf例項新增到out集合中。解碼完成。
四、LineBasedFrameDecoder
LineBasedFrameDecoder通過換行符"\r\n"或"\n"來劃分每個包。 LineBasedFrameDecoder提供了兩個public構造方法:
public LineBasedFrameDecoder(final int maxLength) {
this(maxLength, true, false);
}
public LineBasedFrameDecoder(final int maxLength, final boolean stripDelimiter, final boolean failFast) {
this.maxLength = maxLength;
this.failFast = failFast;
this.stripDelimiter = stripDelimiter;
}
這三個變數的含義是: maxLength:包的最大長度 stripDelimiter:是否在分包時去除掉換行符 failFast:當位元組流長度超過maxLength並且仍然沒有找到換行符時,是否向管道傳入一個ExceptionCaught事件,並伴隨TooLongFrameException異常物件。
我們來看它的decode方法:
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
final int eol = findEndOfLine(buffer); //找到換行符在緩衝區buffer的位置
if (!discarding) { //如果之前沒有包因為找不到換行符並且超出最大長度而被丟棄
if (eol >= 0) { //如果找到了
final ByteBuf frame;
final int length = eol - buffer.readerIndex(); //獲取訊息的長度
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1; //判斷換行符是"\r\n"還是"\n"
if (length > maxLength) { //如果訊息長度大於允許的最大長度
buffer.readerIndex(eol + delimLength); //將剛才讀到的資料標記為已讀
fail(ctx, length); //向管道傳入ExceptionCaught事件
return null;
}
if (stripDelimiter) { //如果需要移除換行符
frame = buffer.readSlice(length); //將單個包資料從buffer擷取下來
buffer.skipBytes(delimLength); //將buffer的換行符所在的區域標記為已讀
} else { //如果無需移除換行符,則直接從buffer將包資訊擷取下來
frame = buffer.readSlice(length + delimLength);
}
return frame.retain(); //將擷取下來的ByteBuf的引用計數加1
} else { //如果沒有找到換行符
final int length = buffer.readableBytes(); //獲取buffer可讀位元組數
if (length > maxLength) { //如果可讀位元組數超出了最大包長度
discardedBytes = length; //記錄這段資料的長度
buffer.readerIndex(buffer.writerIndex()); //將所有資料標記為已讀,即忽略這段資料
discarding = true;
offset = 0;
if (failFast) //向管道傳入ExceptionCaught事件
fail(ctx, "over " + discardedBytes);
}
return null;
}
} else { //如果有包因為找不到換行符並且超出最大長度而被丟棄
if (eol >= 0) { //如果此時找到了換行符
//丟棄這個換行符之前的所有資料
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) //向管道傳入ExceptionCaught事件
fail(ctx, length);
} else { //如果依然沒有找到換行符,那麼仍然丟棄這段資料(通過更改讀索引等於寫索引的方式)
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
}
return null;
}
}
decode方法的實現其實不難,我們來一步步分析: (1)既然是通過換行符來分包,那麼首先第一步是需要找到換行符在緩衝區中的位置,也就是下標。這裡呼叫了findEndOfLine方法:
private int findEndOfLine(final ByteBuf buffer) {
int totalLength = buffer.readableBytes(); //獲取所有的位元組數
//通過buffer的forEachByte方法找出換行符'\n'的在緩衝區陣列的中的位置
int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteBufProcessor.FIND_LF);
if (i >= 0) { //如果大於等於0,說明找到換行符
offset = 0;
//檢查換行符型別是不是"\r\n"型別,如果是,則將下標減1,即回退到'\r'的位置
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
} else { //如果小於0,則說明沒有找到換行符
offset = totalLength;
}
return i;
}
如果沒有找到換行符,那麼直接返回-1。如果找到了換行符並且換行符是‘\n’型別,那麼返回這個’\n’字元所在的緩衝區陣列的下標。如果是"\r\n"型別,那麼返回的陣列下標指向’\r’
(2)如果之前沒有包因為找不到換行符並且超出最大長度而被丟棄,並且找到了換行符,那麼判斷這段資料有沒有超過最大包長度,如果超出,那麼丟棄這段資料,向管道傳入ExceptionCaught事件並伴隨TooLongFrameException異常物件。如果沒有超出,那麼將緩衝區的readIndex到換行符這段資料擷取下來生成一個新的ByteBuf。這個新的ByteBuf就是一個完整的包,decode將它新增到集合out中後,方法結束。
如果在緩衝區的readIndex ~ writeIndex範圍內沒有找到換行符,那麼首先判斷writeIndex減去readIndex的值有沒有超出最大包長度,如果沒有超出則什麼也不做,方法返回結束。 如果超出最大長度,那麼會將讀索引readIndex移動到writeIndex位置,即丟棄這段資料,並將變數discarding標記為true,再下次呼叫decode方法的時候做特殊處理。
(3)如果discarding變數為true,就說明上次呼叫decode方法的時候,緩衝區中的可讀資料因為找不到換行符並且超過了最大包長度而被丟棄。因為沒有找到換行符的原因,所以在本次處理的時候,需要將本次傳入的緩衝區資料的readIndex到換行符下標的資料移除,因為無法保證這個包是完整可讀的。最後將discarding標記為false。在下次呼叫decode方法的時候按照第二步的方式正常處理。
但是,如果本次讀取依然沒有找到換行符,那麼繼續丟棄這段資料,discarding變數仍然為true。
五、DelimiterBasedFrameDecoder
DelimiterBasedFrameDecoder可以通過自定義分隔符來區分每個包。 DelimiterBasedFrameDecoder提供了6個public構造方法,可以指定4種類型的引數: maxFrameLength:每個包的最大長度 stripDelimiter:是否不將分隔符加入到每個包中 failFast:當位元組流長度超過maxFrameLength並且仍然沒有找到換行符時,是否向管道傳入一個ExceptionCaught事件,並伴隨TooLongFrameException異常物件 delimiters:持有分隔符資料的ByteBuf物件,可以指定多個ByteBuf,每個ByteBuf對應一種型別的分隔符
public DelimiterBasedFrameDecoder(int maxFrameLength, boolean stripDelimiter, boolean failFast,
ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength); //maxFrameLength不能小於等於0
if (delimiters == null)
throw new NullPointerException("delimiters");
if (delimiters.length == 0)
throw new IllegalArgumentException("empty delimiters");
//如果delimiters有基於換行符的分隔符,並且當前類不是DelimiterBasedFrameDecoder的子類
if (isLineBased(delimiters) && !isSubclass()) {
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else { //將這些分隔符ByteBuf複製一份儲存到delimiters陣列中
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d); //驗證ByteBuf是否可讀
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
DelimiterBasedFrameDecoder持有以下成員變數:
//存放分隔符,每個ByteBuf代表一個分隔符
private final ByteBuf[] delimiters;
//包的最大允許長度
private final int maxFrameLength;
//包內容是否不包含分隔符
private final boolean stripDelimiter;
//是否在位元組過長並且沒有讀到換行符時向管道傳入ExceptionCaught事件
private final boolean failFast;
//在上次呼叫decode方法時是否有包因為太長導致被丟棄
private boolean discardingTooLongFrame;
private int tooLongFrameLength;
//只有當基於換行符實現時
private final LineBasedFrameDecoder lineBasedDecoder;
同樣,我們從decode方法開始解析:
@Override
protected final void