Netty實現自定義協議和原始碼分析
本篇 主要講的是自定義協議是如何實現的,以及自定義協議中會出現的問題和Netty是如何支援的。
分為4個部分
|– 粘包 拆包 資料包不全 和解決方案
|– 程式碼實現
|– ByteToMessageDecoder的原始碼分析
|– 過程流程圖
粘包
TCP是以位元組流流的方式來傳輸的,資料是儲存在緩衝區。雖然傳送資料是以每個包傳送的,但如果網路出現延遲,在第一個包的資料還儲存在緩衝區的時候,第二個包就傳送過來了。此時第二個包的資料也儲存到緩衝區,這時候,就不知道第一個包在哪兒結束,第二個包的資料是從哪裡開始讀取了。這就是TCP粘包。
拆包
因為存在粘包問題,所以拆包是解決粘包的問題的。我們需要將第一個包的資料和第二個包的資料拆分開來,否則我們就無法獲取到準確的資料。
資料包不全
在傳輸資料太多的時候,TCP是會將資料塊分包傳送的,也就是在網路延遲的問題,本來一個完整資料塊分成兩個包,在開始讀取資料的時候,只收到了一個數據包,這個時候,怎麼辦?如果先處理一個數據包,這樣資料不完整。等待?那如何知道資料完整了呢?
解決方案
方案一:
解決方案其實就是,如何去自定義定義這個協議包,去解決粘包的問題和資料包不全的問題。
自定義協議包括如下:
- 一個開始標誌:比如定義一個Int型別,4個位元組的標誌。那麼在讀到這個開始標誌的時候就判斷為是一個數據包的開始。
- 資料的長度:也是Int4個位元組,表明這個資料塊的大小是多小個位元組,這樣根據這個長度就可以知道資料包是否已經接受完畢,如果還沒有,那麼就等待。
- 資料:真正的傳輸資料
程式碼實現
協議包物件類
/**
*
* 自己定義的協議
* 資料包格式
* +——----——+——-----——+——----——+
* |協議開始標誌| 長度 | 資料 |
* +——----——+——-----——+——----——+
* 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
* 2.傳輸資料的長度contentLength,int型別
* 3.要傳輸的資料
*
*/
public class CustomDate {
/**
* 訊息開頭的資訊標誌
* 是一個常量 X077
*/
private final int head_Date = Costom.HEAD_DATA.getVaule();
/**
* 訊息的長度
*/
private int contentLength;
/**
* 訊息的內容
*/
private byte[] conctent;
public CustomDate() {
super();
}
public CustomDate(int contentLength, byte[] conctent) {
this.contentLength = contentLength;
this.conctent = conctent;
}
public int getContentLength() {
return contentLength;
}
public void setContentLength(int contentLength) {
this.contentLength = contentLength;
}
public byte[] getConctent() {
return conctent;
}
public void setConctent(byte[] conctent) {
this.conctent = conctent;
}
public int getHead_Date() {
return head_Date;
}
}
解碼類
核心思想:
- 1 在開始讀取資料的時候先判斷位元組大小是否基本資料長度 (標誌+資料長度)
- 2 如果緩衝區資料太大,這種情況不正常,應該移動2048個位元組,直接處理後面的位元組。因為,可能是網路延遲導致,或者是惡意傳送大量資料。
- 3 開始讀取緩衝區了,對緩衝區的操作。首先標記一下閱讀標記點,然後開始尋找開始標記,如果不是開始標記,那麼就跳過一個標記節點。
- 4 如果找到了開始標記,那麼就繼續獲取長度。如果長度大小大於緩衝區的可讀長度,那麼就證明還有資料還沒到。就回滾到閱讀標記點。繼續等待資料。
- 5 如果資料已經到達了,那麼就開始讀取資料區。
繼承 ByteToMessageDecoder 類。該類主要作用是將從網路緩衝區讀取的位元組轉換成有意義的訊息物件的
/**
*
* 自己定義的協議
* 資料包格式
* +——----——+——-----——+——----——+
* |協議開始標誌| 長度 | 資料 |
* +——----——+——-----——+——----——+
* 1.協議開始標誌head_data,為int型別的資料,16進製表示為0X76
* 2.傳輸資料的長度contentLength,int型別
* 3.要傳輸的資料,長度不應該超過2048,防止socket流的攻擊
*
*/
public class CustomDecoder extends ByteToMessageDecoder {
/**
* 協議開始的標準head_data,int型別,佔據4個位元組.
* 表示資料的長度contentLength,int型別,佔據4個位元組.
*/
private final int BASE_LENGTH = 4 + 4;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
//1. 首先確認可讀長度大於基本長度
if (buffer.readableBytes() > BASE_LENGTH) {
//2.
// 防止socket位元組流攻擊
// 防止,客戶端傳來的資料過大
// 因為,太大的資料,是不合理的
if (buffer.readableBytes() > 2048) {
//將readerIndex移動
buffer = buffer.skipBytes(buffer.readableBytes());
}
//3. 記錄閱讀開始
int beginRead;
while (true) {
//獲取包頭開始的index;
beginRead = buffer.readerIndex();
// 標記包頭開始的index
buffer.markReaderIndex();
//如果讀到了資料包的協議開頭,那麼就結束迴圈
if (buffer.readInt() == Costom.HEAD_DATA.getVaule()) {
break;
}
//沒讀到協議開頭,退回到標記
buffer.resetReaderIndex();
//跳過一個位元組
buffer.readByte();
//如果可讀長度小於基本長度
//
if (buffer.readableBytes() < BASE_LENGTH) {
return;
}
}
//獲取訊息的長度
int length = buffer.readInt();
//判斷請求資料包是否到齊
if (buffer.readableBytes() < length) {
buffer.resetReaderIndex();
return;
}
byte[] date = new byte[length];
buffer.readBytes(date);
CustomDate customDate = new CustomDate(length, date);
out.add(customDate);
}
}
}
編碼類
- 其實就是往緩衝區裡面寫資料。
- 繼承 MessageToByteEncoder
public class CustomEncoder extends MessageToByteEncoder<CustomDate> {
@Override
protected void encode(ChannelHandlerContext ctx, CustomDate msg, ByteBuf out) throws Exception {
out.writeInt(msg.getHead_Date());
out.writeInt(msg.getContentLength());
out.writeBytes(msg.getConctent());
}
}
新增到管道
public class ServerHandlerInitializer extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new CustomDecoder());
pipeline.addLast(new CustomEncoder());
pipeline.addLast(new ServerHandler());
}
}
輸出協議資料
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof CustomDate) {
CustomDate customDate= (CustomDate) msg;
byte[] conctent = customDate.getConctent();
System.out.println("獲取到的內容"+new String(conctent));
ReferenceCountUtil.release(msg);
}
}
}
過程分析
我們研究一下解碼的過程。
1 自定義的解碼類是繼承ByteToMessageDecoder類。先看下ByteToMessageDecoder類
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{}
可以看到ByteToMessageDecoder 是繼承ChannelInboundHandlerAdapter,那也就是說,資料處理應該是通過重寫channelRead()類了。
2 那就繼續看ByteToMessageDecoder 的channelRead() 方法
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
CodecOutputList out = CodecOutputList.newInstance();
try {
// 獲取到緩衝區
ByteBuf data = (ByteBuf) msg;
first = cumulation == null;
if (first) {
cumulation = data;
} else {
cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
}
// 2 開始解碼
callDecode(ctx, cumulation, out);
} finally {
// 資源釋放程式碼
}
} else {
ctx.fireChannelRead(msg);
}
}
重點是2 callDecode()方法。該方法是開始解碼。繼續往下看該方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
// 1
while (in.isReadable()) {
int outSize = out.size();
// 2
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
//3
int oldInputLength = in.readableBytes();
//4
decodeRemovalReentryProtection(ctx, in, out);
if (ctx.isRemoved()) {
break;
}
//5
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException( );
}
if (isSingleDecode()) {
break;
}
}
}
}
1 用while迴圈不斷處理緩衝區,判斷條件是如果緩衝區還有可讀資料,就繼續執行。
2 這個是非常好的設計,Out變數儲存的是解碼生成的物件。如果out裡面已經有物件,那麼就把該物件通過fireChannelRead()方法傳到下一個handler(也就是本程式中的輸出handler)。
出現這種情況是因為:粘包!!!!! 當處理完一個數據包的資料後,緩衝區還有下一個資料包的資料,所以先把處理完的資料包交給下一個handler處理後,再進行緩衝區的讀取。
3 做一個標記。記錄這次解碼對緩衝區資料有沒有被讀取(也就是有沒有讀取資料)。如果沒有,下面就會結束while迴圈。
為什麼會要做這個標記呢?為什麼要結束迴圈呢?
因為:緩衝區的資料沒有讀取,也就是說資料還沒全部到齊,需要等待資料完整再處理。所以就需要結束while迴圈。等待下一次的處理。
4 decodeRemovalReentryProtection() 就是呼叫自己重寫的decode()方法了。
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
// 自己重寫的decode
decode(ctx, in, out);
} finally {
//省略
}
}
5 這裡就是判斷3 中的標記,是否退出迴圈。
過程流程圖
看完程式碼分析還是一頭霧水? 那就再看一下流程圖吧