1. 程式人生 > >網際網路技術21——netty拆包粘包

網際網路技術21——netty拆包粘包

Netty拆包粘包

在基於流的傳輸裡比如TCP/IP,接收到的資料會先被儲存到一個socket接收緩衝裡。不幸的是,基於流的傳輸並不是一個數據包佇列,而是一個位元組佇列。即使你傳送了2個獨立的資料包,作業系統也不會作為2個訊息處理而僅僅是作為一連串的位元組而言。因此這是不能保證你遠端寫入的資料就會準確地讀取。 
參考資料:http://ifeve.com/netty5-user-guide

常用的拆包粘包主要有3種方式:

  • 1、訊息定長,例如每個報文的大小固定為200個位元組,如果不夠,空位補空格。
  • 2、在包尾部增加特殊字串進行分割,例如加回車等
  • 3、 將訊息分為訊息頭和訊息體,在訊息頭中包含表示訊息總長度的欄位,然後進行業務邏輯的處理

(1) 在包尾部增加特殊字串進行分割在上一年部落格中同string型別傳輸一起做了程式碼演示

https://blog.csdn.net/qq_28240551/article/details/82393565

關鍵點為

bootstrap.group(boss, worker)
        //指定channel型別
        .channel(NioServerSocketChannel.class)
        //handler會在初始化時就執行,而childHandler會在客戶端成功connect後才執行
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                ByteBuf byteBuf = Unpooled.copiedBuffer("$_".getBytes());
                socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,byteBuf));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new ServerHandler());
            }
        })
        //設定tcp緩衝區大小
        .option(ChannelOption.SO_BACKLOG, 128)
        //設定傳送緩衝區大小
        .option(ChannelOption.SO_SNDBUF, 1024 * 32)
        //設定接收緩衝區大小
        .option(ChannelOption.SO_RCVBUF, 1024 * 32)
        //設定是否儲存長連線
        .childOption(ChannelOption.SO_KEEPALIVE, true);

(2)關於訊息頭和訊息體的方法,這裡不做演示了,工作中用到可以搜一下使用例子

(3)這裡再演示一下報文定長的方式進行資料傳輸,(總體看使用頻率,分割>定長>報文)

關鍵點為

Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
        .channel(NioSocketChannel.class)
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
                socketChannel.pipeline().addLast(new StringDecoder());
                socketChannel.pipeline().addLast(new StringEncoder());
                socketChannel.pipeline().addLast(new ClientHandler());
            }
        })
        .option(ChannelOption.SO_KEEPALIVE,true);

 

server

package com.nettyFixed;


import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by BaiTianShi on 2018/9/5.
 */
public class Server {

    private int port;

    public Server(int port) {
        this.port = port;
    }

    public void run() {
        //用來接收連線事件組
        EventLoopGroup boss = new NioEventLoopGroup();
        //用來處理接收到的連線事件處理組
        EventLoopGroup worker = new NioEventLoopGroup();
        //server配置輔助類
        ServerBootstrap bootstrap = new ServerBootstrap();
        try {

            //將連線接收組與事件處理組連線,當server的boss接收到連線收就會交給worker處理
            bootstrap.group(boss, worker)
                    //指定channel型別
                    .channel(NioServerSocketChannel.class)
                    //handler會在初始化時就執行,而childHandler會在客戶端成功connect後才執行
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new StringEncoder());
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    })
                    //設定tcp緩衝區大小
                    .option(ChannelOption.SO_BACKLOG, 128)
                    //設定傳送緩衝區大小
                    .option(ChannelOption.SO_SNDBUF, 1024 * 32)
                    //設定接收緩衝區大小
                    .option(ChannelOption.SO_RCVBUF, 1024 * 32)
                    //設定是否儲存長連線
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            //注意。此處option()是提供給NioServerSocketChannel用來接收進來的連線,也就是boss執行緒
            //childOption是提供給有福管道serverChannel接收到的連線,也就是worker執行緒,在這個例子中也就是NioServerSocketChannel


            //非同步繫結埠,可以繫結多個埠
            ChannelFuture fu1 = bootstrap.bind(port).sync();
            ChannelFuture fu2 = bootstrap.bind(8766).sync();

            //非同步檢查是否關閉
            fu1.channel().closeFuture().sync();
            fu2.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }

    }

    public static void main(String[] args) {
        Server server = new Server(8765);
        server.run();
    }
}

client

package com.nettyFixed;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.FixedLengthFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * Created by BaiTianShi on 2018/9/5.
 */
public class Client {

    private String ip;
    private int port;

    public Client(String ip, int port) {
        this.ip = ip;
        this.port = port;
    }


    public void run(){
        //客戶端用來連線服務端的連線組
        EventLoopGroup worker= new NioEventLoopGroup();
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(worker)
                .channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new FixedLengthFrameDecoder(5));
                        socketChannel.pipeline().addLast(new StringDecoder());
                        socketChannel.pipeline().addLast(new StringEncoder());
                        socketChannel.pipeline().addLast(new ClientHandler());
                    }
                })
                .option(ChannelOption.SO_KEEPALIVE,true);


        try {
            //可以進多個埠同時連線
            ChannelFuture fu1 = bootstrap.connect(ip,port).sync();
            ChannelFuture fu2 = bootstrap.connect(ip,8766).sync();


            fu1.channel().writeAndFlush("aaaaaaa");
            fu2.channel().writeAndFlush("1111222");

            fu1.channel().closeFuture().sync();
            fu2.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            worker.shutdownGracefully();
        }
    }

    public static void main(String[] args) {
        Client cl = new Client("127.0.0.1",8765);
        cl.run();
    }
}

server端控制檯

client控制檯