1. 程式人生 > >java架構之路-(netty專題)netty的編解碼(出入戰)與粘包拆包

java架構之路-(netty專題)netty的編解碼(出入戰)與粘包拆包

上次迴歸:

  上次部落格我們主要說了netty的基本使用,都是一些固定的模式去寫的,我們只需要關注我們的攔截器怎麼去寫就可以了,然後我們用我們的基礎示例,改造了一個簡單的聊天室程式,可以看到內部加了一個StringEncoder和StringDecoder,這個就是用來編解碼我們字串的,這次我們就來說說這個編解碼。

編碼&解碼:

  上次我們寫的那個簡單的聊天室程式大家還記得吧,內部加了兩個類似攔截器的玩意。

ch.pipeline().addLast(new StringEncoder());
ch.pipeline().addLast(new StringDecoder());

  一個是編碼的,一個是解碼的,也是藉著這個東西,來個大家說一下我們的ChannelPipline,上次只是簡單了說了一下我們的ChannelPipline內部存放了ChannelHandler,而且是雙向連結串列的結構來儲存的,我們這次來細化一下我們這個攔截器是怎麼工作的。

 內部大概是這個樣子的,每次放入的時候有順序的放置,暫時先說有順序,後面我會詳細解釋這個什麼時候順序生效,記住是由由頭開始放置,這裡涉及到兩個概念就是入站和出站。

   就是說我們從客戶端傳送資料到服務端,叫做出站,會經過一系列的ChannelOutboundHandler,可以方便記憶為一系列的出站攔截器,我們想出站,就要經過出站攔截器。

   反之我們的入站就是和出站相對應的,是由服務端傳送過來的資料,經由我們的一系列ChannelInboundHandler,到達我們的客戶端,其實出站入站你站在客戶端的角度來看就很好理解了,我們客戶端想發出去資料,就是出站,想進來資料(接收資料),就是入站,出站會經過out攔截器,入站會經過in攔截器。切記,是雙向的,客戶端和服務端不是共有一個ChannelPipline,而且這個出站和入站都是相對的,可能還是有一點抽象,我們來拿著我們聊天室的例子來看一下。

  我們需要先明確我們的StringEncoder和StringDecoder是ChannelInboundHandler還是ChannelOutboundHandler。

 

  StringEncoder是ChannelOutboundHandler,StringDecoder是ChannelInboundHandler,這回我們按照我們的程式碼畫一下圖。先弄一個客戶端傳送訊息的。

   簡單解釋一下,我們的服務端和客戶端都有自己的ChannelPipline,我們的客戶端要傳送訊息,相當於客戶端是出站操作,我們要傳送,資料外流,顯然是資料要出去,出站操作啊, 出站要經過Encoder然後是我們自己的Clienthandler,走你,進入網路傳輸,對於我們的服務端來說,要接收資料,資料要進來,一定是入站操作啊,經過我們的Decoder,然後經過我們自己的Serverhandler,到達我們的服務端。

   客戶端往外發送訊息,客戶端是出站操作,經過Encoder,然後經過我們ServerHandler,進入網路,我們客戶端是入站操作,經過Decoder,經過我們的ClientHandler,到達我們的服務端。這樣說應該就理解了吧,你可以自己在decode和encode方法上打斷點,自己除錯一下,後面我會說原始碼,自己也是先熟悉一下原始碼。出站一定是從尾到頭,入站一定是從頭到尾,別問我為什麼,我自己寫了測試類,測試一下午了.....

  我們可以看到Handler是按照順序執行的,這個順序只是對於相同型別的Handlery有效果的,像我們的Decoder和Encoder,一個是入站的Handler,一個是出站的Handler,他倆誰在前,誰在後無所謂的。

粘包&拆包:

   粘包和拆包比上面那個出入戰好理解很多很多,我們先來看一段程式碼。

package com.xiaocai.packing;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

public class NettyClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();// 開啟工作執行緒組
        try {
            Bootstrap bootstrap = new Bootstrap(); //建立一個和服務端相對應的server
            bootstrap.group(group) //設定執行緒組
                    .channel(NioSocketChannel.class) //使用NioSocketChannel作為客戶端的通道實現
                    .handler(new ChannelInitializer<SocketChannel>() {//設定回撥函式
                        @Override
                        protected void initChannel(SocketChannel ch) {

                        }
                    });

            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();//啟動客戶端去連線伺服器端
            //對通道關閉進行監聽
            System.out.println("netty client start。。準備開始傳送資料");
            for (int i = 0; i < 2000; i++) {
                ByteBuf buf = Unpooled.copiedBuffer("hello,xiaocaiJAVA!".getBytes(CharsetUtil.UTF_8));
                cf.channel().writeAndFlush(buf);
            }
            System.out.println("傳送資料完畢");
            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();//關閉執行緒組
        }
    }
}

  就是什麼意思呢?我們建立一個客戶端連線,然後我們多次向我們的服務端傳送訊息,理論上我們每次收到的訊息都應該是hello,xiaocaiJAVA!,我們來看一下結論。

   我們可以看到,有部分是正常的,有一部分是hello,xiaocaiJAVA!hello,xiaocaiJAVA!有的還是o,什麼什麼,這個明顯錯誤的,也就是我們的粘包拆包,為什麼會出現這個呢?netty收到我們的訊息不是馬上傳送出去,大概會等待一個瞬間,然後再發送我們的訊息,在等待的瞬間再次進來的訊息,他會一次性的傳送出去,但是netty自身並不知道我們的訊息該從何位置截斷,所以就出現了我們看到的粘包拆包問題,我們來看一下解決方法。

   我們每次可以把資料發過去,而且把資料的長度帶過去就OK了,然後客戶端每次優先判斷一下資料的長度就可以了,看一下我這的解決方案。

package com.xiaocai.packing;

/**
 * 自定義協議包
 */
public class MyMessageProtocol {

    //定義一次傳送包體長度
    private int len;
    //一次傳送包體內容
    private byte[] content;

    public int getLen() {
        return len;
    }

    public void setLen(int len) {
        this.len = len;
    }

    public byte[] getContent() {
        return content;
    }

    public void setContent(byte[] content) {
        this.content = content;
    }
}
package com.xiaocai.packing;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;

public class MyMessageEncoder extends MessageToByteEncoder<MyMessageProtocol> {
    @Override
    protected void encode(ChannelHandlerContext ctx, MyMessageProtocol msg, ByteBuf out) throws Exception {
        out.writeInt(msg.getLen());
        out.writeBytes(msg.getContent());
    }
}
package com.xiaocai.packing;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

public class MyMessageDecoder extends ByteToMessageDecoder {

    int length = 0;

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        if(in.readableBytes() >= 4) {
            if (length == 0){
                length = in.readInt();
            }
            if (in.readableBytes() < length) {
                System.out.println("當前可讀資料不夠,繼續等待。。");
                return;
            }
            byte[] content = new byte[length];
            if (in.readableBytes() >= length){
                in.readBytes(content);

                //封裝成MyMessageProtocol物件,傳遞到下一個handler業務處理
                MyMessageProtocol messageProtocol = new MyMessageProtocol();
                messageProtocol.setLen(length);
                messageProtocol.setContent(content);
                out.add(messageProtocol);
            }
            length = 0;
        }
    }
}
package com.xiaocai.packing;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

public class PackingServerHandler extends SimpleChannelInboundHandler<MyMessageProtocol> {

    private int count;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, MyMessageProtocol msg) throws Exception {
        System.out.println("====服務端接收到訊息如下====");
        System.out.println("長度=" + msg.getLen());
        System.out.println("內容=" + new String(msg.getContent(), CharsetUtil.UTF_8));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}
package com.xiaocai.packing;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;

public class PackingClient {
    public static void main(String[] args) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new MyMessageEncoder());
                        }
                    });

            ChannelFuture cf = bootstrap.connect("127.0.0.1", 9000).sync();

            for(int i = 0; i< 200; i++) {
                String msg = "你好,小菜JAVA!";
                //建立協議包物件
                MyMessageProtocol messageProtocol = new MyMessageProtocol();
                messageProtocol.setLen(msg.getBytes(CharsetUtil.UTF_8).length);
                messageProtocol.setContent(msg.getBytes(CharsetUtil.UTF_8));

                cf.channel().writeAndFlush(messageProtocol);
            }

            cf.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

  簡單來說就是我們封裝一個協議包,包含我們的內容和長度,我們要客戶端傳訊息之前,資訊進行處理,給予長度,然後我客戶端接收資料時優先判斷長度,長度不夠繼續等待,這樣就解決了我們的拆包粘包問題,有的人還會提出用什麼特殊符號的方法,也是可行的,但是你的資料中一定不要包含那個特殊符號,而且每次來一個新的開發人員,都要了解你們的特殊符號協議,還是比較麻煩的。

總結:

  這次我們主要說了ChannelPipline內部的結構和addList時的放置順序,netty的入戰出戰,是相對的,出站走out攔截器,入站走in攔截器,入站一定是從頭到尾的,出站一定是從尾到頭的,切記~!!!

 

最進弄了一個公眾號,小菜技術,歡迎大家的加入

&n