1. 程式人生 > >3.Netty的粘包、拆包(二)

3.Netty的粘包、拆包(二)

length coder 解碼器 generate trac main med con ++

Netty提供的TCP數據拆包、粘包解決方案

1.前言

關於TCP的數據拆包、粘包的介紹,我在上一篇文章裏面已經有過介紹。

想要了解一下的,請點擊這裏 Chick Here!

今天我們要講解的是Netty提供的兩種解決方案:

  1. DelimiterBasedFrameDecoder
  2. FixedLengthFrameDecoder

2.關於Decoder

  1. 先觀察下兩段代碼的不同

    (1)使用StringDecoder之前

    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
         try {
    
             ByteBuf in = (ByteBuf) msg;
             String str = in.toString(CharsetUtil.UTF_8);
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }

    (2)使用StringDecoder之後

    @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
  2. 關於Decoder

    decoder:n. 解碼器

    在我看來,Netty數據的解析方式大概為:

    發送過程:Buffer------>數據報------>比特流

    接受過程:Buffer<------數據報<------比特流

    所以我們接受到的msg是一個ButeBuf

    使用了Decoder(這裏使用StringDecoder舉例)之後:

    發送過程:Buffer------>數據報------>比特流

    接受過程:String<------Buffer<------數據報<------比特流

    相當於ByteBuf按照StringDecoder的解碼規則,把msg翻譯成為了一個字符串。

  3. 如何使用Decoder

    (1)實際代碼演示:

    package com.xm.netty.demo02;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
    
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
    
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    代碼改動:

    ch.pipeline().addLast(new StringDecoder());
    ?

    ? ch.pipeline().addLast(new ServerHandler());

    (2)多個Decoder的使用順序:

    從前往後,依次解碼

    ?

    假設我們有個通過字符串變化為時間的TimeDecoder:

    ch.pipeline().addLast(new StringDecoder());

    ch.pipeline().addLast(new TimeDecoder());

    ? ch.pipeline().addLast(new ServerHandler());

    解析規則為:

    技術分享圖片

3.DelimiterBasedFrameDecoder

  1. 關於DelimiterBasedFrameDecoder

    其實很簡單,就是在一個緩沖區的末尾添加一個結束字符。

    在規定了最大長度的緩沖區裏,遇到一個特殊字符,就截取一次。

    原理類似於String的split()方法。

  2. 代碼實現

    (1)服務端Server

    package com.xm.netty.demo03;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
    
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
    
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    (2)服務端ServerHandler

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         String str = (String) msg;
         System.out.println("Server:"+str);
         str = "服務器返回--->"+ str+"$";
         ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes()));
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一個客戶端連接上服務器!");
     }
    
    
    }
    

    (3)客戶端Client

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
    
     private final int port;
     private final String host;
    
    
    
     public Client(int port, String host) {
         this.port = port;
         this.host = host;
     }
    
     public static void main(String[] args) {
         String host = "127.0.0.1";
         int port = 8989;
         try {
             new Client(port, host).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }
    
     private void start() throws InterruptedException {
    
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap
                     .group(group)
                     .channel(NioSocketChannel.class)
                     .remoteAddress(host, port)
                     .handler(new ChannelInitializer<SocketChannel>() {
    
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ByteBuf buf = Unpooled.copiedBuffer("$".getBytes());
                             ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ClientHandler());
                         }
    
                     });
    
             ChannelFuture future = bootstrap.connect().sync();
    
             for(int i=10;i<20;i++) {
                 String str = DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now()) + "---- " +i+"<<<$";
                 future.channel().write(Unpooled.copiedBuffer(str.getBytes()));
             }
    
             future.channel().flush();
    
    
             //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes()));
    
    
    
             future.channel().closeFuture().sync();
         } finally {
             group.shutdownGracefully().sync();
         }
    
     }
    
    }
    

    (4)客戶端ClientHandler

    package com.xm.netty.demo03;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已連接服務器!");
     }
    
    
    }
    
  3. 運行結果截圖

    (1)服務端運行結果:

    技術分享圖片

    (2)客戶端運行結果:

    技術分享圖片

4.FixedLengthFrameDecoder

  1. 關於FixedLengthFrameDecoder

    其實很簡單,就是對規定的發送的數據進行限制長度,

    當符合這個長度的情況下,就可以解析。

    假設你發送一個’123456‘,’654321‘

    那麽解析的狀況為’12345‘,’66543‘

  2. 代碼實現

    (1)服務端Server

    package com.xm.netty.demo04;
    
    import java.net.InetSocketAddress;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Server {
    
     private final int port;
    
     public Server(int port) {
         this.port = port;
     }
    
    
    
     public static void main(String[] args) {
    
         int port = 8989;
         try {
             new Server(port).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
    
     }
    
    
    
     private void start() throws InterruptedException {
         EventLoopGroup g1 = new NioEventLoopGroup();
         EventLoopGroup g2 = new NioEventLoopGroup();
         try {
             ServerBootstrap bootstrap = new ServerBootstrap();
              bootstrap
                     .group(g1,g2)
                     .channel(NioServerSocketChannel.class)
                     .localAddress(new InetSocketAddress( port))
                     .childHandler(new ChannelInitializer() {
                         @Override
                         protected void initChannel(Channel ch) throws Exception {
                             ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ServerHandler());
                         }
                     });
              ChannelFuture future = bootstrap.bind().sync();
              future.channel().closeFuture().sync();
         } finally {
             g1.shutdownGracefully().sync();
             g2.shutdownGracefully().sync();
         }
     }
    
    }

    (2)服務端ServerHandler

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
         String str = (String) msg;
         System.out.println("Server:"+str);
         ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes()));
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"一個客戶端連接上服務器!");
     }
    
    
    }
    

    (3)客戶端Client

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.FixedLengthFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
    
     private final int port;
     private final String host;
    
    
    
     public Client(int port, String host) {
         this.port = port;
         this.host = host;
     }
    
     public static void main(String[] args) {
         String host = "127.0.0.1";
         int port = 8989;
         try {
             new Client(port, host).start();
         } catch (InterruptedException e) {
             // TODO Auto-generated catch block
             e.printStackTrace();
         }
     }
    
     private void start() throws InterruptedException {
    
         EventLoopGroup group = new NioEventLoopGroup();
         try {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap
                     .group(group)
                     .channel(NioSocketChannel.class)
                     .remoteAddress(host, port)
                     .handler(new ChannelInitializer<SocketChannel>() {
    
                         @Override
                         protected void initChannel(SocketChannel ch) throws Exception {
                             ch.pipeline().addLast(new FixedLengthFrameDecoder(5));
                             ch.pipeline().addLast(new StringDecoder());
                             ch.pipeline().addLast(new ClientHandler());
                         }
    
                     });
    
             ChannelFuture future = bootstrap.connect().sync();
    
             for(int i=123450;i<123460;i++) {
                 String str = ""+i;
                 future.channel().write(Unpooled.copiedBuffer(str.getBytes()));
             }
             future.channel().flush();
    
             //future.channel().writeAndFlush(Unpooled.copiedBuffer("Hello Netty!".getBytes()));
    
    
    
             future.channel().closeFuture().sync();
         } finally {
             group.shutdownGracefully().sync();
         }
    
     }
    
    }
    

    (4)客戶端ClientHandler

    package com.xm.netty.demo04;
    
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.CharsetUtil;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter {
    
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    
         try {
             String str = (String) msg;
             System.out.println("Client:"+str);
    
         } finally {
             ReferenceCountUtil.release(msg);
         }
     }
    
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
         cause.printStackTrace();
         ctx.close();
     }
    
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         System.out.println(DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(LocalDateTime.now())+"已連接服務器!");
     }
    
    
    }
    
  3. 運行結果截圖

    (1)服務端運行結果:

    技術分享圖片

    (2)客戶端運行結果:

    技術分享圖片

3.Netty的粘包、拆包(二)