1. 程式人生 > >Netty中處理TCP粘包和拆包

Netty中處理TCP粘包和拆包

什麼是粘包和拆包

TCP是個”流”協議,流其實就是沒有界限的一串資料。
TCP底層中並不瞭解上層業務資料的具體含義,它會根據TCP緩衝區的實際情況進行包劃分,所以在TCP中就有可能一個完整地包會被TCP拆分成多個包,也有可能吧多個小的包封裝成一個大的資料包傳送,這就是拆包和粘包的問題。

問題例項:假設客戶端分別傳送了兩個資料包D1和D2給服務端,由於服務端一次讀取到的位元組數是不確定的,所以存在幾種情況:
1.不存在粘包和分包的時候,服務端分兩次讀取到了兩個獨立的資料包。
2.存在粘包,一次讀取了D1和D2。
3.存在拆包,第一次讀取的是完整的D1包和部分D2包,第二次讀取到了D2包的剩餘內容。
4.第一次讀取到了D1包的部分內容,第二次讀取了D1剩下的內容和D2獨立的包。

為什麼會出現呢?

1.應用程式write寫入的位元組大小大於套介面傳送緩衝區大小(出現拆包).
2,進行MSS大小的TCP分段
3.大於MTU進行分片

這裡一般來說
MSS <=MTU-40(IPV4) MSS<= MTU -60 (IPV6)

解決策略

下面的策略都可以對應於我們Netty中的某個handler解決
1.訊息定長.
2.在包尾增加回車換行符進行分割,例如FTP協議;
3將訊息分為訊息頭和訊息體,訊息頭中包含表示訊息總長度。
4.複雜的應用協議。

粘包導致的問題

我們可以設計一個類似Echo伺服器的功能,客戶端發出一百條,echo回顯肯定也是一百條。這裡有:

服務端程式碼

package time2;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import
io.netty.channel.socket.nio.NioServerSocketChannel; import time2.TimeServerHandler; /** * Created by lz on 2016/8/9. */ public class TimeServer { public void bind(int port) throws Exception{ //配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try{ ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,1024) .childHandler(new ChildChannerHanler()); //繫結埠,同步等待 ChannelFuture f = b.bind(port).sync(); //等待服務端監聽埠關閉 f.channel().closeFuture().sync(); }finally { //優雅退出,釋放執行緒資源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } private class ChildChannerHanler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; new TimeServer().bind(port); } }

服務端handler

package time2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

/**
 * Created by lz on 2016/8/10.
 */
public class TimeServerHandler extends ChannelHandlerAdapter{
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req,"UTF-8").substring(0,req.length-System.getProperty("line.separator").length());
        System.out.println("Time Server得到了資料:" + body+";counter是:"+ ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString():"BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf  resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

    public static void main(String[] args) {
        System.out.println("1"+System.getProperty("line.separator")+"1");
    }
}

客戶端程式碼

package time2;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;


/**
 * Created by lz on 2016/8/9.
 */
public class TimeClient {
    public void connect(int port,String host) throws Exception{
        //配置客戶端NIO執行緒組
        EventLoopGroup group =new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
            .handler(new ChannelInitializer<SocketChannel>(){

                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new TimeClientHandler());
                }
            });
            //發起非同步連線操作
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeClient().connect(port,"127.0.0.1");
    }
}

客戶端Handler程式碼

package time2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by lz on 2016/8/10.
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
    private int counter;
    private byte[] req;
    public TimeClientHandler(){
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf buf = (ByteBuf) msg;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String( req,"UTF-8");
        System.out.println("Now is:"+body+";the counter is :" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

進行測試分別執行客戶端和伺服器端,發現出現了粘包現象,客戶端的counter只加了一次,這就出現了粘包問題,在我們初學的時候可以不考慮這個問題但是當壓力上來後,傳送大報文,就會出現粘包/拆包。
我們就需要我們的半包解碼器。如下:

LineBasedFrameDecoder

這是一個一換行符為界限的解碼器,如果有換行我們就把他標記為結束位置,這樣就組成一個獨立的包。我們可以設定最大長度。如果連續讀取到最大長度後沒有發現換行符,就會丟擲異常,同時忽略掉之前讀到的異常碼流。程式碼如下:

Server端程式碼:

package time3;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


/**
 * Created by lz on 2016/8/10.
 */
public class TimeServer {
    public void bind(int port) throws Exception{
        //配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024)
                    .childHandler(new ChildChannelHandler());
            //繫結埠,同步等待
            ChannelFuture f = b.bind(port).sync();
            //等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放執行緒資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel>{

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            System.out.println("已經繫結");
            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new TimeServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeServer().bind(port);
    }
}

ServerHandler端程式碼:

package time3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

/**
 * Created by lz on 2016/8/10.
 */
public class TimeServerHandler extends ChannelHandlerAdapter{
    private int counter;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        String body = (String) msg;
        System.out.println("Time Server得到了資料:" + body+";counter是:"+ ++counter);
        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date().toString():"BAD ORDER";
        currentTime = currentTime + System.getProperty("line.separator");
        ByteBuf  resp = Unpooled.copiedBuffer(currentTime.getBytes());
        ctx.writeAndFlush(resp);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }

    public static void main(String[] args) {
        System.out.println("1"+System.getProperty("line.separator")+"1");
    }
}

Client端程式碼:

package time3;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;


/**
 * Created by lz on 2016/8/10.
 */
public class TimeClient {
    public void connect(int port,String host) throws Exception{
        //配置客戶端NIO執行緒組
        EventLoopGroup group =new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>(){

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new TimeClientHandler());
                        }
                    });
            //發起非同步連線操作
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new TimeClient().connect(port,"127.0.0.1");
    }
}

Client端Handler程式碼

package time3;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by lz on 2016/8/10.
 */
public class TimeClientHandler extends ChannelHandlerAdapter {
    private int counter;
    private byte[] req;
    public TimeClientHandler(){
        System.out.println("初始化");
        req = ("QUERY TIME ORDER" + System.getProperty("line.separator")).getBytes();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("傳送資料");
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("Now is:"+body+";the counter is :" + ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println(cause.getMessage());
        ctx.close();
    }
}

通過我們的解碼器我們的程式碼就簡單了很多,部分地方有註釋就不祥解了。

DelimiterBasedFrameDecoder

DelimiterBasedFrameDecoder允許我們設定特定的分隔符用來分隔,用法和上面類似,

 ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));

上面就是以$_為分隔符,這裡我們用echo伺服器做測試:

EchoServer端程式碼

package echo1;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import time3.TimeServerHandler;

/**
 * Created by lz on 2016/8/10.
 */
public class EchoServer {
    public void bind(int port) throws Exception{
        //配置服務端的NIO執行緒組,一個用於接收連線,一個用於處理連線
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,1024) //設定accept的最大 也就是三次握手成功的佇列長度
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChildChannelHandler());
            //繫結埠,同步等待
            ChannelFuture f = b.bind(port).sync();
            //等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放執行緒資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            System.out.println("已經繫結");
            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
            socketChannel.pipeline().addLast(new StringDecoder());
            socketChannel.pipeline().addLast(new EchoServerHandler());
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoServer().bind(port);
    }
}

EchoServer Handler程式碼

package echo1;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by lz on 2016/8/10.
 */
public class EchoServerHandler extends ChannelHandlerAdapter {
    int counter = 0;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        String body = (String) msg;
        System.out.println("第"+ ++counter +"次接收"+body);
        body +="$_";
        ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
        ctx.writeAndFlush(echo);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();//發生異常,關閉鏈路
    }
}

Echo Client端程式碼

package echo1;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import time3.TimeClientHandler;

/**
 * Created by lz on 2016/8/10.
 */
public class EchoClient {
    public void connect(int port,String host) throws Exception{
        //配置客戶端NIO執行緒組
        EventLoopGroup group =new NioEventLoopGroup();
        try{
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>(){

                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ByteBuf delimiter = Unpooled.copiedBuffer("$_".getBytes()); //使用$_為分隔符
                            socketChannel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,delimiter));
                            socketChannel.pipeline().addLast(new StringDecoder());
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            //發起非同步連線操作
            ChannelFuture f = b.connect(host,port).sync();
            f.channel().closeFuture().sync();
        }finally {
            //優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoClient().connect(port,"127.0.0.1");
    }
}

EchoClient Handler程式碼

package echo1;

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

/**
 * Created by lz on 2016/8/10.
 */
public class EchoClientHandler extends ChannelHandlerAdapter {
    private int counter;
    static final String ECHO_REQ = "Hi,lizhao$_";

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        for (int i = 0; i < 10; i++) {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
        }
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("這是第"+ ++counter + "次接收" + msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

執行上面的程式碼可以展示出正確的結果,我們當然不止這幾個我們還有FixedLengthFrameDecoder固定長度的解碼器,以及其他解碼器,更多檢視官方手冊。