1. 程式人生 > >記一次解決netty半包問題的經歷

記一次解決netty半包問題的經歷

最近學習了netty,想寫一個簡單的rpc,結果發現傳送訊息時遇到難題了,網上搜了一下,這種情況是半包問題和粘包問題,主要是出現在併發高一些的時候。

talk is cheap 

客戶端編碼:

    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            encode0(channelHandlerContext,o,byteBuf);
    }
    private void
encode0(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception { if(o instanceof UserInfo){ byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class); byteBuf.writeBytes(data); } }

服務端解碼:

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
myDecode(channelHandlerContext,byteBuf,list);
}
public void myDecode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
int len = byteBuf.readableBytes();
byte[] data = new byte[len];
byteBuf.readBytes(data);
UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class);
list.add(userInfo);
}

這是最初版本的,一開始以為只要讀出來反序列化成物件就ok了,進行了簡單的測試發現沒問題,但客戶端傳送頻繁一些服務端就開始報錯:

警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.DecoderException: java.lang.RuntimeException: Reading from a 
byte array threw an IOException (should never happen).

分析一下發現對於來自同一個遠端連線來說,服務端只會分配一個bytebuf來接收訊息(這裡使用的是UnpooledDirectByteBuf),這個bytebuf容量是動態擴增的,如果當前的長度不夠用來儲存新的訊息就會自動擴充套件。當客戶端傳送不頻繁時,服務端有足夠的時間來做準備接收和處理訊息,不會出現問題。但客戶端頻繁傳送時就會出現問題了,如上,服務端的可讀的位元組超過了一個物件,讀取後下一個物件反序列化就會出現問題。

解決思路:

  1.每次傳送定長的訊息,不夠就補全,服務端設定對應的長度(但這樣有問題:如果這樣做客戶端會發送很多無用資訊,浪費效能,而且不知道設定多大的長度合適)

  2.使用netty自帶的編碼和解碼器,如使用/r/n標誌符解碼,這就要繼承MessageDecoder了,也就是字元解碼,即先將訊息在位元組--字串--物件將轉換(有點浪費效率,而且萬一內容中有對應的分隔符就會出問題)

  3.每次傳送訊息前先獲取物件位元組陣列的長度(我最開始使用的方法,後來在網上也找到別人一樣的思路)

  客戶端:

    protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
            encode1(channelHandlerContext,o,byteBuf);
    }
    private void encode1(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {
        if(o instanceof UserInfo){
            byte[] data = Serializition.serialize((UserInfo) o,UserInfo.class);
            byteBuf.writeInt(data.length);
            byteBuf.writeBytes(data);
        }
    }

服務端:

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        myDecode1(channelHandlerContext,byteBuf,list);
    }
    public void myDecode1(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list){
        if(byteBuf.readableBytes()>4){
            int len = byteBuf.readInt();
            byte[] data = new byte[len];
            byteBuf.readBytes(data);
            UserInfo userInfo = Serializition.deSerialize(data,UserInfo.class);
            list.add(userInfo);
        }
    }

這就看起來簡單了  資料流是 |int|bytes|int|bytes,但實際情況還是發生了問題,還是出現了一樣的問題。異常原因是服務端例項化陣列長度後可讀位元組不夠,原因是傳送時客戶端是分包傳送的。

因此我在這個方法的基礎上增加了一個條件:如果可讀位元組數不夠就儲存已建立好的位元組陣列,等下一次位元組數夠時使用

    private volatile int len=0;
    protected void decode5(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        int length =len>0?len:(byteBuf.readableBytes()>=4?byteBuf.readInt():0);
        if(byteBuf.readableBytes()>=length&&length>0) {
            byte[] data = new byte[length];
            byteBuf.readBytes(data);
            UserInfo userInfo = Serializition.deSerialize(data, UserInfo.class);
            list.add(userInfo);
            //bytes.put(length, data);
            len=0;
        }else {
            len = length;
        }
    }

經過測試,問題得到解決。