1. 程式人生 > >Netty2:粘包/拆包問題與使用LineBasedFrameDecoder的解決方案

Netty2:粘包/拆包問題與使用LineBasedFrameDecoder的解決方案

substr string 技術分享 query coder 消息頭 handle decode sync

什麽是粘包、拆包

粘包、拆包是Socket編程中最常遇見的一個問題,本文來研究一下Netty是如何解決粘包、拆包的,首先我們從什麽是粘包、拆包開始說起:

TCP是個"流"協議,所謂流,就是沒有界限的一串數據,TCP底層並不了解上層業務的具體含義,它會根據TCP緩沖區的實際情況進行包的劃分,所以在業務上:
  • 一個完整的包可能會被TCP拆分為多個包進行發送(拆包)
  • 多個小的包也有可能被封裝成一個大的包進行發送(粘包)

這就是所謂的TCP粘包與拆包

下圖演示了粘包、拆包的場景:

技術分享圖片

基本上有四種情況:

  • Data1、Data2都分開發送到了Server端,沒有產生粘包與拆包的情況
  • Data1、Data2數據粘在了一起,打成了一個大的包發送到了Server端,這種情況就是粘包
  • Data1被分成Data1_1與Data1_2,Data1_1先到服務端,Data1_2與Data2再到服務端,這種情況就是拆包
  • Data2被分成Data2_1與Data2_2,Data1與Data2_1先到服務端,Data2_2再到服務端,同上,這也是一種拆包的場景

粘包、拆包產生的原因

上面我們詳細了解了TCP粘包與拆包,那麽粘包與拆包為什麽會發生呢,大致上有三種原因:

  • 應用程序寫入的字節大小大於Socket發送緩沖區大小
  • 進行MSS大小的TCP,MSS是最大報文段長度的縮寫,是TCP報文段中的數據字段最大長度,MSS=TCP報文段長度-TCP首部長度
  • 以太網的Payload大於MTU,進行IP分片,MTU是最大傳輸單元的縮寫,以太網的MTU為1500字節

粘包、拆包解決策略

由於底層的TCP無法理解上層的業務數據,所以在底層是無法保證數據包不被拆分和重組的,這個問題只能通過上層的應用協議棧設計來解決,根據業界的主流協議的解決方案,可以歸納如下:

  • 消息定長,例如每個報文的大小固定為200字節,如果不夠空位補空格
  • 包尾增加回車換行符進行分割,例如FTP協議
  • 將消息分為消息頭和消息體,消息頭中包含表示長度的字段,通常涉及思路為消息頭的第一個字段使用int32來表示消息的總長度
  • 更復雜的應用層協議

未考慮TCP粘包導致功能異常演示

基於Netty的第一篇文章《Netty1:初識Netty》,TimeServer與TimeClient不變,簡單修改一下TimeServerHandler與TimeClientHandler即可以模擬出TCP粘包的情況,首先修改TimeClientHandler:

 1 public class TimeClientHandler extends ChannelHandlerAdapter {
 2 
 3     private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class);
 4     
 5     private int counter;
 6     
 7     private byte[] req;
 8     
 9     public TimeClientHandler() {
10         req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
11     }
12     
13     @Override
14     public void channelActive(ChannelHandlerContext ctx) throws Exception {
15         ByteBuf message = null;
16         for (int i = 0; i < 100; i++) {
17             message = Unpooled.buffer(req.length);
18             message.writeBytes(req);
19             ctx.writeAndFlush(message);
20         }
21     }
22     
23     @Override
24     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25         ByteBuf buf = (ByteBuf)msg;
26         byte[] req = new byte[buf.readableBytes()];
27         buf.readBytes(req);
28         
29         String body = new String(req, "UTF-8");
30         System.out.println("Now is:" + body + "; the counter is:" + ++counter);
31     }
32     
33     @Override
34     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
35         LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage());
36         ctx.close();
37     }
38     
39 }

TimeClientHandler的變化是,之前是發送一次"QUERY TIME ORDER"到服務端,現在變為發送100次"QUERY TIME ORDER"+標準換行符到服務端,並在客戶端增加一個計數器,記錄從服務端收到的響應次數。

服務單TimeServerHandler也簡單改造一下,增加一個計數器記錄一下從客戶端收到的請求次數:

 1 public class TimeServerHandler extends ChannelHandlerAdapter {
 2 
 3     private int counter;
 4     
 5     @Override
 6     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 7         ByteBuf buf = (ByteBuf)msg;
 8         byte[] req = new byte[buf.readableBytes()];
 9         buf.readBytes(req);
10         
11         String body = new String(req, "UTF-8").substring(0, req.length - System.getProperty("line.separator").length());
12         System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter);
13         
14         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
15         currentTime = currentTime + System.getProperty("line.separator");
16         
17         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
18         ctx.writeAndFlush(resp);
19     }
20     
21     @Override
22     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
23         ctx.close();
24     }
25     
26 }

按照設計,服務端應該會打印出100次"Time time server...",客戶端應當會打印出100次"Now is ...",因為客戶端向服務端發送了100次"QUERY TIME ORDER"的請求,實際運行起來呢?先看一下服務端的打印:

The time server receive order:QUERY TIME ORDER
QUERY TIME ORDER
...省略,這裏有55個
QUERY TIME ORD; the counter is:1
The time server receive order:
...省略,這裏有42個
QUERY TIME ORDER; the counter is:2

counter最終等於2,表明服務端實際上只收到了2條請求,很顯然這裏發生了粘包,即多個客戶端的包合成了一個發送到了服務端,服務端每收到一個包的大小為1024字節。

接著看一下客戶端的打印:

Now is:BAD ORDER
BAD ORDER
; the counter is:1

因為服務端只收到了2條消息,因此客戶端也只會收到2條消息,因為服務端兩次收到的內容都不滿足"QUERY TIME ORDER",因此返回"BAD ORDER"到客戶端,但是為什麽客戶端的counter=1呢?回過頭來仔細想想,因此服務端發送給客戶端的消息也發生了粘包。因此這裏簡單得出一個結論:粘包/拆包不僅僅發生在客戶端給服務端發送數據,服務端回數據給客戶端同樣有可能發生粘包/拆包

上面的例子演示了粘包,拆包其實一樣的,既然可以知道服務端每收到一個包的大小為1024字節,那客戶端每次發送一個大於1024字節的數據給服務端就可以了,有興趣的朋友可以自己嘗試一下。

利用LineBasedFrameDecoder解決粘包問題

為了解決TCP粘包/拆包導致的半包讀寫問題,Netty默認提供了多種編解碼器用於處理半包,針對上面發送"QUERY TIME ORDER"+標準換行符的這種場景,簡單使用LineBasedFrameDecoder就可以解決上面發生的粘包問題。

首先對TimeServer進行改造,加入LineBasedFrameDecoder與StringDecoder:

 1 public class TimeServer {
 2 
 3     public void bind(int port) throws Exception {
 4         // NIO線程組
 5         EventLoopGroup bossGroup = new NioEventLoopGroup();
 6         EventLoopGroup workerGroup = new NioEventLoopGroup();
 7         
 8         try {
 9             ServerBootstrap b = new ServerBootstrap();
10             b.group(bossGroup, workerGroup)
11                 .channel(NioServerSocketChannel.class)
12                 .option(ChannelOption.SO_BACKLOG, 1024)
13                 .childHandler(new ChildChannelHandler());
14             
15             // 綁定端口,同步等待成功
16             ChannelFuture f = b.bind(port).sync();
17             // 等待服務端監聽端口關閉
18             f.channel().closeFuture().sync();
19         } finally {
20             // 優雅退出,釋放線程池資源
21             bossGroup.shutdownGracefully();
22             workerGroup.shutdownGracefully();
23         }
24     }
25     
26     private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
27         @Override
28         protected void initChannel(SocketChannel arg0) throws Exception {
29             arg0.pipeline().addLast(new LineBasedFrameDecoder(1024));
30             arg0.pipeline().addLast(new StringDecoder());
31             arg0.pipeline().addLast(new TimeServerHandler());
32         }
33     }
34     
35 }

改造點就在29行、30行兩行,加入了LineBasedFrameDecoder與StringDecoder,同時TimeServerHandler也需要相應改造:

 1 public class TimeServerHandler extends ChannelHandlerAdapter {
 2 
 3     private int counter;
 4     
 5     @Override
 6     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 7         String body = (String)msg;
 8         System.out.println("The time server receive order:" + body + "; the counter is:" + ++counter);
 9         
10         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
11         currentTime = currentTime + System.getProperty("line.separator");
12         
13         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());
14         ctx.writeAndFlush(resp);
15     }
16     
17     @Override
18     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
19         ctx.close();
20     }
21     
22 }

改造點在第7行,由於使用了StringDecoder,因此channelRead的第二個參數msg不再是ByteBuf類型而是String類型,因此這裏只需要做一次String強轉即可。

TimeClient改造類似:

 1 public class TimeClient {
 2 
 3     public void connect(int port, String host) throws Exception {
 4         EventLoopGroup group = new NioEventLoopGroup();
 5         try {
 6             Bootstrap b = new Bootstrap();
 7             
 8             b.group(group)
 9                 .channel(NioSocketChannel.class)
10                 .option(ChannelOption.TCP_NODELAY, true)
11                 .handler(new ChannelInitializer<SocketChannel>() {
12                     protected void initChannel(SocketChannel ch) throws Exception {
13                         ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
14                         ch.pipeline().addLast(new StringDecoder());
15                         ch.pipeline().addLast(new TimeClientHandler());
16                     };
17                 });
18             
19             // 發起異步連接操作
20             ChannelFuture f = b.connect(host, port).sync();
21             // 等待客戶端連接關閉
22             f.channel().closeFuture().sync();
23         } finally {
24             // 優雅退出,釋放NIO線程組
25             group.shutdownGracefully();
26         }
27     }
28     
29 }

第13行、第14行這兩行加入了LineBasedFrameDecoder與StringDecoder,TimeClientHandler相應改造:

 1 public class TimeClientHandler extends ChannelHandlerAdapter {
 2 
 3     private static final Logger LOGGER = LoggerFactory.getLogger(TimeClientHandler.class);
 4     
 5     private int counter;
 6     
 7     private byte[] req;
 8     
 9     public TimeClientHandler() {
10         req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
11     }
12     
13     @Override
14     public void channelActive(ChannelHandlerContext ctx) throws Exception {
15         ByteBuf message = null;
16         for (int i = 0; i < 100; i++) {
17             message = Unpooled.buffer(req.length);
18             message.writeBytes(req);
19             ctx.writeAndFlush(message);
20         }
21     }
22     
23     @Override
24     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
25         String body = (String)msg;
26         System.out.println("Now is:" + body + "; the counter is:" + ++counter);
27     }
28     
29     @Override
30     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
31         LOGGER.warn("Unexcepted exception from downstream:" + cause.getMessage());
32         ctx.close();
33     }
34     
35 }

第25行這裏使用String進行強轉即可。接下來看一下服務端的打印:

The time server receive order:QUERY TIME ORDER; the counter is:1
The time server receive order:QUERY TIME ORDER; the counter is:2
The time server receive order:QUERY TIME ORDER; the counter is:3
The time server receive order:QUERY TIME ORDER; the counter is:4
The time server receive order:QUERY TIME ORDER; the counter is:5
...
The time server receive order:QUERY TIME ORDER; the counter is:98
The time server receive order:QUERY TIME ORDER; the counter is:99
The time server receive order:QUERY TIME ORDER; the counter is:100

看到服務端正常counter從1打印到了100,即收到了100個完整的客戶端請求,客戶端的打印如下:

Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:1
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:2
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:3
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:4
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:5
...
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:98
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:99
Now is:Sat Apr 07 16:00:51 CST 2018; the counter is:100

看到同樣的客戶端也正常counter從1打印到了100,即收到了100個完整的服務端響應,至此,使用LineBasedFrameDecoder與StringDecoder解決了上述粘包問題。

整個LineBasedFrameDecoder的原理也比較簡單:

LineBasedFrameDecoder依次遍歷ByteBuf中的可讀字節,判斷是否有"\n"或者"\r\n",如果有就以此位置為結束位置,從可讀索引到結束位置區間的字節就組成了一行,它是以換行符為結束標誌的解碼器,支持攜帶結束符或者不攜帶結束符兩種解碼方式,同時支持配置單行的最大長度,如果連續讀到最大長度後仍然沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。

StringDecoder的功能非常簡單,就是將接收到的對象轉換為字符串,然後繼續調用後面的Handler

LineBasedFrameDecoder+StringDecoder就是按行切換的文本解碼器,被設計用於支持TCP的粘包和拆包

Netty2:粘包/拆包問題與使用LineBasedFrameDecoder的解決方案