1. 程式人生 > >一起學Netty(六)之 TCP粘包拆包場景

一起學Netty(六)之 TCP粘包拆包場景

TCP程式設計底層都有粘包和拆包機制,因為我們在C/S這種傳輸模型下,以TCP協議傳輸的時候,在網路中的byte其實就像是河水,TCP就像一個搬運工,將這流水從一端轉送到另一端,這時又分兩種情況:

1)如果客戶端的每次製造的水比較多,也就是我們常說的客戶端給的包比較大,TCP這個搬運工就會分多次去搬運。

2)如果客戶端每次製造的水比較少的話,TCP可能會等客戶端多次生產之後,把所有的水一起再運輸到另一端

上述第一種情況,就是需要我們進行粘包,在另一端接收的時候,需要把多次獲取的結果粘在一起,變成我們可以理解的資訊,第二種情況,我們在另一端接收的時候,就必須進行拆包處理,因為每次接收的資訊,可能是另一個遠端端多次傳送的包,被TCP粘在一起的

我們進行上述兩種情況給出具體的場景:

1)單次傳送的包內容過多的情況,拆包的現象:

我們先寫客戶端的bootstrap:

package com.lyncc.netty.stickpackage.myself;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;

public class BaseClient {
    
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8080"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));

    public static void main(String[] args) throws Exception {

        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .option(ChannelOption.TCP_NODELAY,true)
             .handler(new ChannelInitializer<SocketChannel>() {
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ChannelPipeline p = ch.pipeline();
//                     p.addLast(new LineBasedFrameDecoder(1024));
                     p.addLast(new StringDecoder());
                     p.addLast(new BaseClientHandler());
                 }
             });

            ChannelFuture future = b.connect(HOST, PORT).sync();
            future.channel().writeAndFlush("Hello Netty Server ,I am a common client");
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

}
客戶端的handler:
package com.lyncc.netty.stickpackage.myself;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BaseClientHandler extends ChannelInboundHandlerAdapter{
    
    private byte[] req;
    
    private int counter;
    
    public BaseClientHandler() {
//        req = ("BazingaLyncc is learner" + System.getProperty("line.separator"))
//            .getBytes();
        req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"
                + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "
                + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"
                + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"
                + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"
                + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"
                + " process of configuring and connecting all of Netty’s components to bring your learned about threading "
                + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"
                + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"
                + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"
                + "sdsa ddasd asdsadas dsadasdas").getBytes();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
//        for (int i = 0; i < 100; i++) {
//            message = Unpooled.buffer(req.length);
//            message.writeBytes(req);
//            ctx.writeAndFlush(message);
//        }
        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
        message = Unpooled.buffer(req.length);
        message.writeBytes(req);
        ctx.writeAndFlush(message);
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg)
        throws Exception {
        String buf = (String) msg;
        System.out.println("Now is : " + buf + " ; the counter is : "+ ++counter);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }
    
    

}


服務端的serverBootstrap:
package com.lyncc.netty.stickpackage.myself;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;

import java.net.InetSocketAddress;

public class BaseServer {

    private int port;
    
    public BaseServer(int port) {
        this.port = port;
    }
    
    public void start(){
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sbs = new ServerBootstrap().group(bossGroup,workerGroup).channel(NioServerSocketChannel.class).localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        
                        protected void initChannel(SocketChannel ch) throws Exception {
//                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new BaseServerHandler());
                        };
                        
                    }).option(ChannelOption.SO_BACKLOG, 128)   
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
             // 繫結埠,開始接收進來的連線
             ChannelFuture future = sbs.bind(port).sync();  
             
             System.out.println("Server start listen at " + port );
             future.channel().closeFuture().sync();
        } catch (Exception e) {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new BaseServer(port).start();
    }
}
服務端的handler:
package com.lyncc.netty.stickpackage.myself;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BaseServerHandler extends ChannelInboundHandlerAdapter{
    
    
    private int counter;
    
    
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        
        String body = (String)msg;
        System.out.println("server receive order : " + body + ";the counter is: " + ++counter);
    }
    
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

}

照例,我們先執行伺服器端:

我們再執行客戶端,客戶端啟動後,我們再看看伺服器端的控制檯列印輸出:

我們可以看到伺服器端分三次接收到了客戶端兩次傳送的那段很長的資訊

2)單次傳送的包內容過多的情況,粘包的現象:

客戶端和服務端的bootstrap不改變,我們修改一下,客戶端傳送資訊的channelActive的程式碼:

package com.lyncc.netty.stickpackage.myself;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class BaseClientHandler extends ChannelInboundHandlerAdapter{
    
    private byte[] req;
    
    private int counter;
    
    public BaseClientHandler() {
        req = ("BazingaLyncc is learner").getBytes();
//        req = ("In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. His book w"
//                + "ill give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the process "
//                + "of configuring and connecting all of Netty’s components to bring your learned about threading models in ge"
//                + "neral and Netty’s threading model in particular, whose performance and consistency advantages we discuss"
//                + "ed in detail In this chapter you general, we recommend Java Concurrency in Practice by Brian Goetz. Hi"
//                + "s book will give We’ve reached an exciting point—in the next chapter we’ll discuss bootstrapping, the"
//                + " process of configuring and connecting all of Netty’s components to bring your learned about threading "
//                + "models in general and Netty’s threading model in particular, whose performance and consistency advantag"
//                + "es we discussed in detailIn this chapter you general, we recommend Java Concurrency in Practice by Bri"
//                + "an Goetz. His book will give We’ve reached an exciting point—in the next chapter;the counter is: 1 2222"
//                + "sdsa ddasd asdsadas dsadasdas").getBytes();
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf message = null;
        for (int i = 0; i < 100; i++) {
            message = Unpooled.buffer(req.length);
            message.writeBytes(req);
            ctx.writeAndFlush(message);
        }
//        message = Unpooled.buffer(req.length);
//        message.writeBytes(req);
//        ctx.writeAndFlush(message);
//        message = Unpooled.buffer(req.length);
//        message.writeBytes(req);
//        ctx.writeAndFlush(message);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        ctx.close();
    }

}
我們再次啟動伺服器端:


啟動客戶端後,依舊看伺服器端的控制檯:

可以看出,客戶端傳送100次的資訊,被伺服器端分三次就接收了,這就發生了粘包的現象

以上就是典型的粘包和拆包的場景