1. 程式人生 > >【Netty入門】解決TCP粘包/分包的例項

【Netty入門】解決TCP粘包/分包的例項

回顧TCP粘包/分包問題的解決方法

1.訊息定長

2.在包尾都增加特殊字元進行分割

3.將訊息分為訊息頭和訊息體

針對這三種方法,下面我會分別舉例驗證

FixedLengthFrameDecoder類

對應第一種解決方法:訊息定長

(1)例1:服務端程式碼:

public class Server4 {
    public static void main(String[] args) throws SigarException {

        //boss執行緒監聽埠,worker執行緒負責資料讀寫
        EventLoopGroup boss = new
NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try{ //輔助啟動類 ServerBootstrap bootstrap = new ServerBootstrap(); //設定執行緒池 bootstrap.group(boss,worker); //設定socket工廠 bootstrap.channel(NioServerSocketChannel.class); //設定管道工廠
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //獲取管道 ChannelPipeline pipeline = socketChannel.pipeline(); //定長解碼類
pipeline.addLast(new FixedLengthFrameDecoder(19)); //字串解碼類 pipeline.addLast(new StringDecoder()); //處理類 pipeline.addLast(new ServerHandler4()); } }); //繫結埠 ChannelFuture future = bootstrap.bind(8866).sync(); System.out.println("server start ...... "); //等待服務端監聽埠關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); }finally { //優雅退出,釋放執行緒池資源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } } class ServerHandler4 extends SimpleChannelInboundHandler <String>{ //用於記錄次數 private int count = 0; //讀取客戶端傳送的資料 @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println("RESPONSE--------"+msg+";"+" @ "+ ++count); } //新客戶端接入 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelActive"); } //客戶端斷開 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("channelInactive"); } //異常 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { //關閉通道 ctx.channel().close(); //列印異常 cause.printStackTrace(); } }

(2)例1:客戶端程式碼:

public class Client4 {

    public static void main(String[] args) {

        //worker負責讀寫資料
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            //輔助啟動類
            Bootstrap bootstrap = new Bootstrap();

            //設定執行緒池
            bootstrap.group(worker);

            //設定socket工廠
            bootstrap.channel(NioSocketChannel.class);

            //設定管道
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //獲取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //定長解碼類
                    pipeline.addLast(new FixedLengthFrameDecoder(19));
                    //字串編碼類
                    pipeline.addLast(new StringEncoder());
                    //處理類
                    pipeline.addLast(new ClientHandler4());
                }
            });

            //發起非同步連線操作
            ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();

            //等待客戶端鏈路關閉
            futrue.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //優雅的退出,釋放NIO執行緒組
            worker.shutdownGracefully();
        }
    }

}

class ClientHandler4 extends SimpleChannelInboundHandler<String> {

    //接受服務端發來的訊息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server response : "+msg);
    }

    //與伺服器建立連線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //給伺服器發訊息
        String s = System.getProperty("line.separator");
        //傳送50次訊息
        for (int i = 0; i < 50; i++) {
            ctx.channel().writeAndFlush("  I am client    "+s);
        }
    }

    //與伺服器斷開連線
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    //異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉管道
        ctx.channel().close();
        //列印異常資訊
        cause.printStackTrace();
    }

}

例1服務端執行結果:

這裡寫圖片描述

…………………….此處省略多行………………….
這裡寫圖片描述

分析:從執行結果可以看出,符合我們的預期,並沒有TCP粘包問題,這是因為使用的定長解碼器的原因,我在此解釋一下例1client/server程式碼中新增加了幾個“陌生”的類,若之後再次出現,則不作解釋!

  • FixedLengthFrameDecoder類:用於固定長度訊息的粘包分包處理,可以攜帶引數,我在程式碼中指定的引數為19,因為我要傳送的字元長度為19。
  • StringDecoder類 :用於字串的解碼。
  • StringEncoder類 :用於字串的編碼。

LineBasedFrameDecoder類

對應第二種解決方法:在包尾都增加特殊字元(行分隔符)進行分割

(1)例2:服務端程式碼:

public class Server4 {
    public static void main(String[] args) throws SigarException {

        //boss執行緒監聽埠,worker執行緒負責資料讀寫
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();

        try{
            //輔助啟動類
            ServerBootstrap bootstrap = new ServerBootstrap();
            //設定執行緒池
            bootstrap.group(boss,worker);

            //設定socket工廠
            bootstrap.channel(NioServerSocketChannel.class);

            //設定管道工廠
            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //獲取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //行分隔符解碼類
                    pipeline.addLast(new LineBasedFrameDecoder(1024));
                    //字串解碼類
                    pipeline.addLast(new StringDecoder());
                    //處理類
                    pipeline.addLast(new ServerHandler4());
                }
            });

            //繫結埠
            ChannelFuture future = bootstrap.bind(8866).sync();
            System.out.println("server start ...... ");

            //等待服務端監聽埠關閉
            future.channel().closeFuture().sync();

        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //優雅退出,釋放執行緒池資源
            boss.shutdownGracefully();
            worker.shutdownGracefully();
        }
    }

}

class ServerHandler4 extends SimpleChannelInboundHandler <String>{

    //用於記錄次數
    private int count = 0;
    //讀取客戶端傳送的資料
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("RESPONSE--------"+msg+";"+"   @ "+ ++count);

    }

    //新客戶端接入
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelActive");
    }

    //客戶端斷開
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    //異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉通道
        ctx.channel().close();
        //列印異常
        cause.printStackTrace();
    }
}

(2)例2:客戶端程式碼:

public class Client4 {

    public static void main(String[] args) {

        //worker負責讀寫資料
        EventLoopGroup worker = new NioEventLoopGroup();

        try {
            //輔助啟動類
            Bootstrap bootstrap = new Bootstrap();

            //設定執行緒池
            bootstrap.group(worker);

            //設定socket工廠
            bootstrap.channel(NioSocketChannel.class);

            //設定管道
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    //獲取管道
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    //行分隔符解碼類
                    pipeline.addLast(new LineBasedFrameDecoder(1024));
                    //字串編碼類
                    pipeline.addLast(new StringEncoder());
                    //處理類
                    pipeline.addLast(new ClientHandler4());
                }
            });

            //發起非同步連線操作
            ChannelFuture futrue = bootstrap.connect(new InetSocketAddress("127.0.0.1",8866)).sync();

            //等待客戶端鏈路關閉
            futrue.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //優雅的退出,釋放NIO執行緒組
            worker.shutdownGracefully();
        }
    }

}

class ClientHandler4 extends SimpleChannelInboundHandler<String> {

    //接受服務端發來的訊息
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        System.out.println("server response : "+msg);
    }

    //與伺服器建立連線
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //給伺服器發訊息
        String s = System.getProperty("line.separator");
        //傳送50次訊息
        for (int i = 0; i < 50; i++) {
            ctx.channel().writeAndFlush("  I am client    "+s);
        }
    }

    //與伺服器斷開連線
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelInactive");
    }

    //異常
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        //關閉管道
        ctx.channel().close();
        //列印異常資訊
        cause.printStackTrace();
    }

}

例2服務端執行結果:

這裡寫圖片描述

…………………….此處省略多行………………….
這裡寫圖片描述

分析:從執行結果可以看出沒有TCP粘包問題了,細心的你或許已經發現程式碼中新出現了一個LineBasedFrameDecoder類,它可以攜帶引數,我指定的引數為1024,含義為在每1024個位元組中尋找換行符,若有,就傳送訊息,否則繼續尋找。

DelimiterBasedFrameDecoder類

對應第二種解決方法:在包尾都增加特殊字元(#)進行分割

例3:服務端程式碼,和例2服務端程式碼類似,由於篇幅有限,我就僅僅指出它們不一樣的地方了!

將例2服務端 第23行 和第24行 程式碼修改為

//自定義分隔符解碼類
pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));

例3:客戶端程式碼:(和例2客戶端程式碼的不同之處)

將例2客戶端 第24行 和第25行 程式碼修改為

//自定義分隔符解碼類
pipeline.addLast(newDelimiterBasedFrameDecoder(1024,Unpooled.copiedBuffer("#".getBytes())));

再將例2客戶端 第63行 程式碼修改為

ctx.channel().writeAndFlush("  I am  client    "+"#");

例3服務端執行結果:

這裡寫圖片描述

…………………….此處省略多行………………….
這裡寫圖片描述

分析:從執行結果可以看出TCP粘包問題解決了!程式碼中新出現了一個DelimiterBasedFrameDecoder類,它可以攜帶引數,我指定的引數為(1024,Unpooled.copiedBuffer(“#”.getBytes()))),含義為在每1024個位元組中尋找#,若有,就傳送訊息,否則繼續尋找。

MyRequestDecoder自定義類

對應第三種方法:將訊息分為訊息頭和訊息體

對於訊息頭和訊息體,有三種情況分別如下:

  • 有頭部的拆包與粘包:

    lengthFieldOffset = 2 長度欄位偏移量 ( = 外部頭部Header 1的長度)
    lengthFieldLength = 3 長度欄位佔用位元組數
    lengthAdjustment = 0
    initialBytesToStrip = 0

這裡寫圖片描述

  • 長度欄位在前且有頭部的拆包與粘包:

    lengthFieldOffset = 0 長度欄位偏移量
    lengthFieldLength = 3 長度欄位佔用位元組數
    lengthAdjustment = 2 ( Header 1 的長度)
    initialBytesToStrip = 0

這裡寫圖片描述

  • 多擴充套件頭部的拆包與粘包:
    lengthFieldOffset = 1 長度欄位偏移量(=頭HDR1的長度)
    lengthFieldLength = 2 長度欄位佔用位元組數
    lengthAdjustment = 1 調整長度(= 頭HDR2的長度)
    initialBytesToStrip = 3 排除的偏移量(= the length of HDR1 + LEN)

這裡寫圖片描述

舉一個簡單的例子

例4:

import netty.EnDecode.Request;

/**
 * 請求解碼器
 * <pre>
 * 資料包格式
 * +——----——+——-----——+——----——+——----——+——-----——+
 * | 包頭   | 模組號    | 命令號 |  長度   |   資料   |
 * +——----——+——-----——+——----——+——----——+——-----——+
 * </pre>
 * 包頭4位元組
 * 模組號2位元組short
 * 命令號2位元組short
 * 長度4位元組(描述資料部分位元組長度)
 */


public class MyRequestDecoder extends FrameDecoder{

    //資料包基本長度
    public static final int BASE_LENTH = 4 + 2 + 2 + 4;

    @Override
    protected Object decode(ChannelHandlerContext arg0, Channel arg1, ChannelBuffer buffer) throws Exception {

        //可讀長度必須大於基本長度
        if(buffer.readableBytes() >= BASE_LENTH){
            //防止socket位元組流攻擊
            if(buffer.readableBytes() > 2048){
                buffer.skipBytes(buffer.readableBytes());
            }

            //記錄包頭開始的index
            int beginReader;

            while(true){
                beginReader = buffer.readerIndex();
                buffer.markReaderIndex();
                if(buffer.readInt() == -32523523){
                    break;
                }

                //未讀到包頭,略過一個位元組
                buffer.resetReaderIndex();
                buffer.readByte();

                //長度又變得不滿足
                if(buffer.readableBytes() < BASE_LENTH){
                    return null;
                }
            }

            //模組號
            short module = buffer.readShort();
            //命令號
            short cmd = buffer.readShort();
            //長度
            int length = buffer.readInt();

            //判斷請求資料包資料是否到齊
            if(buffer.readableBytes() < length){
                //還原讀指標
                buffer.readerIndex(beginReader);
                return null;
            }

            //讀取data資料
            byte[] data = new byte[length];
            buffer.readBytes(data);

            Request request = new Request();
            request.setModule(module);
            request.setCmd(cmd);
            request.setData(data);

            //繼續往下傳遞 
            return request;

        }
        //資料包不完整,需要等待後面的包來
        return null;
    }

}



本人才疏學淺,如有錯誤,請指出
謝謝!