Netty原始碼分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
本篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協議。通常,協議的格式如下:
LengthFieldBasedFrameDecoder是netty解決拆包粘包問題的一個重要的類,主要結構就是header+body結構。我們只需要傳入正確的引數就可以傳送和接收正確的資料,那麼重點就在於這幾個引數的意義。下面我們就具體瞭解一下這幾個引數的意義。先來看一下LengthFieldBasedFrameDecoder主要的構造方法:
public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip)
那麼這幾個重要的引數如下:
- maxFrameLength:最大幀長度。也就是可以接收的資料的最大長度。如果超過,此次資料會被丟棄。
- lengthFieldOffset:長度域偏移。就是說資料開始的幾個位元組可能不是表示資料長度,需要後移幾個位元組才是長度域。
- lengthFieldLength:長度域位元組數。用幾個位元組來表示資料長度。
- lengthAdjustment:資料長度修正。因為長度域指定的長度可以使header+body的整個長度,也可以只是body的長度。如果表示header+body的整個長度,那麼我們需要修正資料長度。
- initialBytesToStrip:跳過的位元組數。如果你需要接收header+body的所有資料,此值就是0,如果你只想接收body資料,那麼需要跳過header所佔用的位元組數。
下面我們根據幾個例子的使用來具體說明這幾個引數的使用。
LengthFieldBasedFrameDecoder 的用法
需求1
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
傳送的資料 (14 bytes) 接收到資料 (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 12 | "HELLO, WORLD" | | 12 | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
留心的你肯定發現了,長度域只是實際內容的長度,不包括長度域的長度。下面是引數的值:
- lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=0:資料長度修正為0,因為長度域只包含資料的長度,所以不需要修正。
- initialBytesToStrip=0:傳送和接收的資料完全一致,所以不需要跳過任何位元組。
需求2
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
傳送的資料 (14 bytes) 接收到資料 (12 bytes) +--------+----------------+ +----------------+ | Length | Actual Content |----->| Actual Content | | 12 | "HELLO, WORLD" | | "HELLO, WORLD" | +--------+----------------+ +----------------+
引數值如下:
- lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=0:資料長度修正為0,因為長度域只包含資料的長度,所以不需要修正。
- initialBytesToStrip=2:我們發現接收的資料沒有長度域的資料,所以要跳過長度域的2個位元組。
需求3
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes) +--------+----------------+ +--------+----------------+ | Length | Actual Content |----->| Length | Actual Content | | 14 | "HELLO, WORLD" | | 14 | "HELLO, WORLD" | +--------+----------------+ +--------+----------------+
留心的你肯定又發現了,長度域表示的長度是總長度 也就是header+body的總長度。引數如下:
- lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=-2:因為長度域為總長度,所以我們需要修正資料長度,也就是減去2。
- initialBytesToStrip=0:我們發現接收的資料沒有長度域的資料,所以要跳過長度域的2個位元組。
需求4
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | meta | Length | Actual Content |----->| meta | Length | Actual Content | | 0xCAFE | 12 | "HELLO, WORLD" | | 0xCAFE | 12 | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
我們發現,資料的結構有點變化,變成了 meta+header+body的結構。meta一般表示元資料,魔數等。我們定義這裡meta有三個位元組。引數如下:
- lengthFieldOffset=3:開始的3個位元組是meta,然後才是長度域,所以長度域偏移為3。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=0:長度域指定的長度位資料長度,所以資料長度不需要修正。
- initialBytesToStrip=0:傳送和接收資料相同,不需要跳過資料。
需求5
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | Length | meta | Actual Content |----->| Length | meta | Actual Content | | 12 | 0xCAFE | "HELLO, WORLD" | | 12 | 0xCAFE | "HELLO, WORLD" | +----------+----------+----------------+ +----------+----------+----------------+
我們發現,資料的結構有點變化,變成了 header+meta+body的結構。meta一般表示元資料,魔數等。我們定義這裡meta有三個位元組。引數如下:
- lengthFieldOffset=0:開始的2個位元組就是長度域,所以不需要長度域偏移。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=3:我們需要把meta+body當做body處理,所以資料長度需要加3。
- initialBytesToStrip=0:傳送和接收資料相同,不需要跳過資料。
需求6
長度域為2個位元組,我們要求傳送和接收的資料如下所示:
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes) +------+--------+------+----------------+ +------+----------------+ | HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content | | 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" | +------+--------+------+----------------+ +------+----------------+
我們發現,資料的結構有點變化,變成了 hdr1+header+hdr2+body的結構。我們定義這裡hdr1和hdr2都只有1個位元組。引數如下:
- lengthFieldOffset=1:開始的1個位元組是長度域,所以需要設定長度域偏移為1。
- lengthFieldLength=2:長度域2個位元組。
- lengthAdjustment=1:我們需要把hdr2+body當做body處理,所以資料長度需要加1。
- initialBytesToStrip=3:接收資料不包括hdr1和長度域相同,所以需要跳過3個位元組。
LengthFieldBasedFrameDecoder 原始碼剖析
實現拆包抽象
在前面的文章中我們知道,具體的拆包協議只需要實現
void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
其中 in 表示目前為止還未拆的資料,拆完之後的包新增到 out這個list中即可實現包向下傳遞,第一層實現比較簡單
@Override protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
過載的protected函式decode做真正的拆包動作
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { if (this.discardingTooLongFrame) { long bytesToDiscard = this.bytesToDiscard; int localBytesToDiscard = (int)Math.min(bytesToDiscard, (long)in.readableBytes()); in.skipBytes(localBytesToDiscard); bytesToDiscard -= (long)localBytesToDiscard; this.bytesToDiscard = bytesToDiscard; this.failIfNecessary(false); } // 如果當前可讀位元組還未達到長度長度域的偏移,那說明肯定是讀不到長度域的,直接不讀 if (in.readableBytes() < this.lengthFieldEndOffset) { return null; } else { // 拿到長度域的實際位元組偏移,就是長度域的開始下標 // 這裡就是需求4,開始的幾個位元組並不是長度域 int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset; // 拿到實際的未調整過的包長度 // 就是讀取長度域的十進位制值,最原始傳過來的包的長度 long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder); // 如果拿到的長度為負數,直接跳過長度域並丟擲異常 if (frameLength < 0L) { in.skipBytes(this.lengthFieldEndOffset); throw new CorruptedFrameException("negative pre-adjustment length field: " + frameLength); } else { // 調整包的長度 frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset); // 整個資料包的長度還沒有長度域長,直接丟擲異常 if (frameLength < (long)this.lengthFieldEndOffset) { in.skipBytes(this.lengthFieldEndOffset); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than lengthFieldEndOffset: " + this.lengthFieldEndOffset); // 資料包長度超出最大包長度,進入丟棄模式 } else if (frameLength > (long)this.maxFrameLength) { long discard = frameLength - (long)in.readableBytes(); this.tooLongFrameLength = frameLength; if (discard < 0L) { in.skipBytes((int)frameLength); } else { this.discardingTooLongFrame = true; this.bytesToDiscard = discard; in.skipBytes(in.readableBytes()); } this.failIfNecessary(true); return null; } else { int frameLengthInt = (int)frameLength; //當前可讀的位元組數小於包中的length,什麼都不做,等待下一次解碼 if (in.readableBytes() < frameLengthInt) { return null; //跳過的位元組不能大於資料包的長度,否則就丟擲 CorruptedFrameException 的異常 } else if (this.initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip); } else { //根據initialBytesToStrip的設定來跳過某些位元組 in.skipBytes(this.initialBytesToStrip); //拿到當前累積資料的讀指標 int readerIndex = in.readerIndex(); //拿到待抽取資料包的實際長度 int actualFrameLength = frameLengthInt - this.initialBytesToStrip; //進行抽取 ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength); //移動讀指標 in.readerIndex(readerIndex + actualFrameLength); return frame; } } } } }
下面分幾個部分來分析一下這個重量級函式
獲取frame長度
獲取需要待拆包的包大小
// 拿到長度域的實際位元組偏移,就是長度域的開始下標 // 這裡就是需求4,開始的幾個位元組並不是長度域 int actualLengthFieldOffset = in.readerIndex() + this.lengthFieldOffset; // 拿到實際的未調整過的包長度 // 就是讀取長度域的十進位制值,最原始傳過來的包的長度 long frameLength = this.getUnadjustedFrameLength(in, actualLengthFieldOffset, this.lengthFieldLength, this.byteOrder); // 調整包的長度 frameLength += (long)(this.lengthAdjustment + this.lengthFieldEndOffset);
上面這一段內容有個擴充套件點 getUnadjustedFrameLength,如果你的長度域代表的值表達的含義不是正常的int,short等基本型別,你可以重寫這個函式
protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) { buf = buf.order(order); long frameLength; switch (length) { case 1: frameLength = buf.getUnsignedByte(offset); break; case 2: frameLength = buf.getUnsignedShort(offset); break; case 3: frameLength = buf.getUnsignedMedium(offset); break; case 4: frameLength = buf.getUnsignedInt(offset); break; case 8: frameLength = buf.getLong(offset); break; default: throw new DecoderException( "unsupported lengthFieldLength: " + lengthFieldLength + " (expected: 1, 2, 3, 4, or 8)"); } return frameLength; }
跳過指定位元組長度
int frameLengthInt = (int)frameLength; //當前可讀的位元組數小於包中的length,什麼都不做,等待下一次解碼 if (in.readableBytes() < frameLengthInt) { return null; //跳過的位元組不能大於資料包的長度,否則就丟擲 CorruptedFrameException 的異常 } else if (this.initialBytesToStrip > frameLengthInt) { in.skipBytes(frameLengthInt); throw new CorruptedFrameException("Adjusted frame length (" + frameLength + ") is less " + "than initialBytesToStrip: " + this.initialBytesToStrip); } //根據initialBytesToStrip的設定來跳過某些位元組 in.skipBytes(this.initialBytesToStrip);
先驗證當前是否已經讀到足夠的位元組,如果讀到了,在下一步抽取一個完整的資料包之前,需要根據initialBytesToStrip的設定來跳過某些位元組(見文章開篇),當然,跳過的位元組不能大於資料包的長度,否則就丟擲 CorruptedFrameException 的異常
抽取frame
//根據initialBytesToStrip的設定來跳過某些位元組 in.skipBytes(this.initialBytesToStrip); //拿到當前累積資料的讀指標 int readerIndex = in.readerIndex(); //拿到待抽取資料包的實際長度 int actualFrameLength = frameLengthInt - this.initialBytesToStrip; //進行抽取 ByteBuf frame = this.extractFrame(ctx, in, readerIndex, actualFrameLength); //移動讀指標 in.readerIndex(readerIndex + actualFrameLength); return frame;
到了最後抽取資料包其實就很簡單了,拿到當前累積資料的讀指標,然後拿到待抽取資料包的實際長度進行抽取,抽取之後,移動讀指標
protected ByteBuf extractFrame(ChannelHandlerContext ctx, ByteBuf buffer, int index, int length) { return buffer.retainedSlice(index, length); }
抽取的過程是簡單的呼叫了一下 ByteBuf 的retainedSliceapi,該api無記憶體copy開銷
自定義解碼器
協議實體的定義
public class MyProtocolBean { //型別 系統編號 0xA 表示A系統,0xB 表示B系統 private byte type; //資訊標誌 0xA 表示心跳包 0xB 表示超時包 0xC 業務資訊包 private byte flag; //內容長度 private int length; //內容 private String content; //省略get/set }
伺服器端
服務端的實現
public class Server { private static final int MAX_FRAME_LENGTH = 1024 * 1024; //最大長度 private static final int LENGTH_FIELD_LENGTH = 4; //長度欄位所佔的位元組數 private static final int LENGTH_FIELD_OFFSET = 2; //長度偏移 private static final int LENGTH_ADJUSTMENT = 0; private static final int INITIAL_BYTES_TO_STRIP = 0; private int port; public Server(int port) { this.port = port; } public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyProtocolDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_FIELD_LENGTH,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false)); ch.pipeline().addLast(new ServerHandler()); }; }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); // 繫結埠,開始接收進來的連線 ChannelFuture future = sbs.bind(port).sync(); System.out.println("Server start listen at " + port ); future.channel().closeFuture().sync(); } catch (Exception e) { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port; if (args.length > 0) { port = Integer.parseInt(args[0]); } else { port = 8080; } new Server(port).start(); } }
自定義解碼器MyProtocolDecoder
public class MyProtocolDecoder extends LengthFieldBasedFrameDecoder { private static final int HEADER_SIZE = 6; /** * @param maxFrameLength 幀的最大長度 * @param lengthFieldOffset length欄位偏移的地址 * @param lengthFieldLength length欄位所佔的位元組長 * @param lengthAdjustment 修改幀資料長度欄位中定義的值,可以為負數 因為有時候我們習慣把頭部記入長度,若為負數,則說明要推後多少個欄位 * @param initialBytesToStrip 解析時候跳過多少個長度 * @param failFast 為true,當frame長度超過maxFrameLength時立即報TooLongFrameException異常,為false,讀取完整個幀再報異 */ public MyProtocolDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip, failFast); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { //在這裡呼叫父類的方法,實現指得到想要的部分,我在這裡全部都要,也可以只要body部分 in = (ByteBuf) super.decode(ctx,in); if(in == null){ return null; } if(in.readableBytes()<HEADER_SIZE){ throw new Exception("位元組數不足"); } //讀取type欄位 byte type = in.readByte(); //讀取flag欄位 byte flag = in.readByte(); //讀取length欄位 int length = in.readInt(); if(in.readableBytes()!=length){ throw new Exception("標記的長度不符合實際長度"); } //讀取body byte []bytes = new byte[in.readableBytes()]; in.readBytes(bytes); return new MyProtocolBean(type,flag,length,new String(bytes,"UTF-8")); } }
服務端Hanlder
public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { MyProtocolBean myProtocolBean = (MyProtocolBean)msg; //直接轉化成協議訊息實體 System.out.println(myProtocolBean.getContent()); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); } }
客戶端和客戶端Handler
public class Client { static final String HOST = System.getProperty("host", "127.0.0.1"); static final int PORT = Integer.parseInt(System.getProperty("port", "8080")); static final int SIZE = Integer.parseInt(System.getProperty("size", "256")); public static void main(String[] args) throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new MyProtocolEncoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = b.connect(HOST, PORT).sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } }
客戶端編碼器
public class MyProtocolEncoder extends MessageToByteEncoder<MyProtocolBean> { @Override protected void encode(ChannelHandlerContext ctx, MyProtocolBean msg, ByteBuf out) throws Exception { if(msg == null){ throw new Exception("msg is null"); } out.writeByte(msg.getType()); out.writeByte(msg.getFlag()); out.writeInt(msg.getLength()); out.writeBytes(msg.getContent().getBytes(Charset.forName("UTF-8"))); } }
- 編碼的時候,只需要按照定義的順序依次寫入到ByteBuf中.
客戶端Handler
public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { MyProtocolBean myProtocolBean = new MyProtocolBean((byte)0xA, (byte)0xC, "Hello,Netty".length(), "Hello,Netty"); ctx.writeAndFlush(myProtocolBean); } }
&n