3.Netty的粘包、拆包(二)
Netty提供的TCP資料拆包、粘包解決方案
1.前言
關於TCP的資料拆包、粘包的介紹,我在上一篇文章裡面已經有過介紹。
想要了解一下的,請點選這裡Chick Here!
今天我們要講解的是Netty提供的兩種解決方案:
- DelimiterBasedFrameDecoder
- FixedLengthFrameDecoder
2.關於Decoder
-
先觀察下兩段程式碼的不同
(1)使用StringDecoder之前
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf in = (ByteBuf) msg; String str = in.toString(CharsetUtil.UTF_8); System.out.println("Client:"+str); } finally { ReferenceCountUtil.release(msg); } }
(2)使用StringDecoder之後
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String str = (String) msg; System.out.println("Client:"+str); } finally { ReferenceCountUtil.release(msg); } }
-
關於Decoder
decoder:n. 解碼器
在我看來,Netty資料的解析方式大概為:
傳送過程:Buffer------>資料報------>位元流
接受過程:Buffer<------資料報<------位元流
所以我們接受到的msg是一個ButeBuf
使用了Decoder(這裡使用StringDecoder舉例)之後:
傳送過程:Buffer------>資料報------>位元流
接受過程:String<------Buffer<------資料報<------位元流
相當於ByteBuf按照StringDecoder的解碼規則,把msg翻譯成為了一個字串。
-
如何使用Decoder
(1)實際程式碼演示:
package com.xm.netty.demo02; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; public class Server { private final int port; public Server(int port) { this.port = port; } public static void main(String[] args) { int port = 8989; try { new Server(port).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void start() throws InterruptedException { EventLoopGroup g1 = new NioEventLoopGroup(); EventLoopGroup g2 = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(g1,g2) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress( port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } finally { g1.shutdownGracefully().sync(); g2.shutdownGracefully().sync(); } } }
程式碼改動:
ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler());
(2)多個Decoder的使用順序:
從前往後,依次解碼
假設我們有個通過字串變化為時間的TimeDecoder:
ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new TimeDecoder()); ch.pipeline().addLast(new ServerHandler());
解析規則為:
3.DelimiterBasedFrameDecoder
-
關於DelimiterBasedFrameDecoder
其實很簡單,就是在一個緩衝區的末尾新增一個結束字元。
在規定了最大長度的緩衝區裡,遇到一個特殊字元,就擷取一次。
原理類似於String的split()方法。
-
程式碼實現
(1)服務端Server
package com.xm.netty.demo03; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server { private final int port; public Server(int port) { this.port = port; } public static void main(String[] args) { int port = 8989; try { new Server(port).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void start() throws InterruptedException { EventLoopGroup g1 = new NioEventLoopGroup(); EventLoopGroup g2 = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(g1,g2) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress( port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } finally { g1.shutdownGracefully().sync(); g2.shutdownGracefully().sync(); } } }
(2)服務端ServerHandler
package com.xm.netty.demo03; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String str = (String) msg; System.out.println("Server:"+str); str = "伺服器返回--->"+ str+"$"; ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一個客戶端連線上伺服器!"); } }
(3)客戶端Client
package com.xm.netty.demo03; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Client { private final int port; private final String host; public Client(int port, String host) { this.port = port; this.host = host; } public static void main(String[] args) { String host = "127.0.0.1"; int port = 8989; try { new Client(port, host).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect().sync(); for(int i=10;i<20;i++) { String str = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) + "---- " +i+"<<<$"; future.channel().write(Unpooled.copiedBuffer(str.getBytes())); } future.channel().flush(); //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes())); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
(4)客戶端ClientHandler
package com.xm.netty.demo03; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String str = (String) msg; System.out.println("Client:"+str); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已連線伺服器!"); } }
-
執行結果截圖
(1)服務端執行結果:
(2)客戶端執行結果:
4.FixedLengthFrameDecoder
-
關於FixedLengthFrameDecoder
其實很簡單,就是對規定的傳送的資料進行限制長度,
當符合這個長度的情況下,就可以解析。
假設你傳送一個’123456‘,’654321‘
那麼解析的狀況為’12345‘,’66543‘
-
程式碼實現
(1)服務端Server
package com.xm.netty.demo04; import java.net.InetSocketAddress; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Server { private final int port; public Server(int port) { this.port = port; } public static void main(String[] args) { int port = 8989; try { new Server(port).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void start() throws InterruptedException { EventLoopGroup g1 = new NioEventLoopGroup(); EventLoopGroup g2 = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(g1,g2) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress( port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(5)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ServerHandler()); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } finally { g1.shutdownGracefully().sync(); g2.shutdownGracefully().sync(); } } }
(2)服務端ServerHandler
package com.xm.netty.demo04; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String str = (String) msg; System.out.println("Server:"+str); ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes())); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一個客戶端連線上伺服器!"); } }
(3)客戶端Client
package com.xm.netty.demo04; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.FixedLengthFrameDecoder; import io.netty.handler.codec.string.StringDecoder; public class Client { private final int port; private final String host; public Client(int port, String host) { this.port = port; this.host = host; } public static void main(String[] args) { String host = "127.0.0.1"; int port = 8989; try { new Client(port, host).start(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } private void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap .group(group) .channel(NioSocketChannel.class) .remoteAddress(host, port) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new FixedLengthFrameDecoder(5)); ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new ClientHandler()); } }); ChannelFuture future = bootstrap.connect().sync(); for(int i=123450;i<123460;i++) { String str = ""+i; future.channel().write(Unpooled.copiedBuffer(str.getBytes())); } future.channel().flush(); //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes())); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } }
(4)客戶端ClientHandler
package com.xm.netty.demo04; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String str = (String) msg; System.out.println("Client:"+str); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已連線伺服器!"); } }
-
執行結果截圖
(1)服務端執行結果:
(2)客戶端執行結果: