1. 程式人生 > >netty自定義簡單解碼器處理粘包、拆包

netty自定義簡單解碼器處理粘包、拆包

.so ack 繼承 記錄 line sync ann write 工程

tcp連接的粘包、拆包發生在長連接中,先了解一下長、短連接的概念

短連接:請求/響應之後,關閉已經建立的tcp連接,下次請求再建立新的連接

長連接:請求/響應之後,不關閉已經建立的tcp連接,多次請求,復用同一個連接

技術分享圖片

粘包:Nagle算法,客戶端累積一定量或者緩沖一段時間再傳輸。服務端緩沖區堆積,導致多個請求粘在一起

拆包:發送的請求大於發送緩沖區,進行分片傳輸。服務端緩沖區堆積,導致服務端讀取的請求數據不完成

可以模擬粘包場景,新建一個socket工程如下所示

import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket; public class Client { public static void main(String[] args) throws IOException, InterruptedException { Socket socket = new Socket(); socket.connect(new InetSocketAddress(10000)); OutputStream outputStream = socket.getOutputStream(); byte[] request = new
byte[200]; byte[] message = "測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試測試".getBytes(); System.arraycopy(message, 0, request, 0, message.length); //開十個線程向服務端發送消息 for (int i = 0; i < 10; i++) { new Thread(() -> { try { outputStream.write(request); }
catch (IOException e) { e.printStackTrace(); } }).start(); } } }

新建一個netty server如下所示

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

public class NettyServer {

    public static void main(String[] args) {

        ServerBootstrap serverBootstrap = new ServerBootstrap();

        EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            serverBootstrap.group(eventLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(10000)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new Handler());
                        }
                    });
            ChannelFuture future = serverBootstrap.bind().syncUninterruptibly();
            future.channel().closeFuture().syncUninterruptibly();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            eventLoopGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

新建處理客戶端消息的Handler,這裏只是簡單的將消息打印出來

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class Handler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof ByteBuf) {
            byte[] bytes = new byte[((ByteBuf) msg).readableBytes()];
            ((ByteBuf) msg).readBytes(bytes);
            System.out.println(new String(bytes));
        } else {
            System.out.println(new String((byte[]) msg));
        }

        ctx.fireChannelRead(msg);
    }
}

運行程序後,服務端打印的數據如下,只打印了兩條消息,這顯然不是我們想要的結果

技術分享圖片

對於這種情況,如果我們需要自己處理的話,可以繼承netty提供的ByteToMessageDecoder類並實現其decode方法,這其實就是所謂的編碼解碼過程。

代碼如下所示

import java.util.List;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class Decoder extends ByteToMessageDecoder {

    //為了簡單處理,假設協議為每次固定傳輸200字節
    private static final int POCKET_SIZE = 200;

    //記錄上次未讀完的字節
    private ByteBuf tempMessage = Unpooled.buffer();


    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int inSize = byteBuf.readableBytes();

        System.out.println("=========收到" + inSize + "字節========");
        ByteBuf inMessage;

        //加上上次未讀取完成的字節
        if (tempMessage.readableBytes() == 0) {
            inMessage = byteBuf;
        } else {
            inMessage = Unpooled.buffer();
            inMessage.writeBytes(tempMessage);
            inMessage.writeBytes(byteBuf);
        }

        int counter = inMessage.readableBytes() / POCKET_SIZE;

        for (int i = 0; i < counter; i++) {
            byte[] bytes = new byte[POCKET_SIZE];
            inMessage.readBytes(bytes);
            //將處理的好的消息放入list中向下傳遞
            list.add(bytes);
        }

        tempMessage.clear();
        if (inMessage.readableBytes() != 0) {
            inMessage.readBytes(tempMessage, inMessage.readableBytes());
        }

    }
}

解碼器編寫完成之後,還需要再server啟動時加上,代碼如下所示:

serverBootstrap.group(eventLoopGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(10000)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new Decoder());
                            pipeline.addLast(new Handler());
                        }
                    });

netty自定義簡單解碼器處理粘包、拆包