1. 程式人生 > >netty自定義協議解碼

netty自定義協議解碼

繼承ByteToMessageDecoder類複寫decode方法,專案中的一段解碼規則如下:

1、服務端接收報文分為兩種型別,單幀包:head為20位元組,多幀包:head為24位元組。位元組位置:5

2、表示報文體長度欄位為2個位元組,位元組開始位置:18

3、先讀取一次buffer(快取區),檢視長度是否大於20,小於20指標回滾不作處理,等待下次讀取,如果

      大於20,取出幀型別欄位判斷是否為多幀包,再取出報文體長度,如果為多幀包,則包體長度加4,最後

      擷取包頭加包體的長度報文,返回完整的一個包;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;

public class MsgPackDecode extends ByteToMessageDecoder{
    private static int FRAMETYPE_OFFSET = 5;//幀型別位元組位置
    private static int HEAD_LENGHT = 20;//單幀包頭部長度 如果為多幀包為24
    private static int bodyStartPos = 18;//包體長度標識起始位置
    private static int LENGHT_OFFSET = 2;//包體長度標識長度
    private static ByteBuf buf = Unpooled.buffer();//建立一個ByteBuf快取區
    private static java.util.concurrent.atomic.AtomicInteger  c = new AtomicInteger(1);
     @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in,
            List<Object> out) throws Exception {
        Object o = decode(ctx, in);
        if (o != null) {
            out.add(o);
            System.err.println(c.getAndIncrement());
        }
    }
    
     //自定義協議解碼
     private Object decode(ChannelHandlerContext ctx, ByteBuf in) {
             //標記讀指標位置,以便可以回滾指標
            in.markReaderIndex();
            
            //如果讀取的包長度不夠head_length,回滾指標不做處理,等待下個包再解析
            if(in.readableBytes() < HEAD_LENGHT){
                in.resetReaderIndex();
                return null;
            } else {
                //讀取包頭中幀型別(0:單幀包1:多幀包)資訊
                in.readBytes(buf, HEAD_LENGHT);
                byte[] req = new byte[buf.readableBytes()];
                buf.readBytes(req);
                //判斷幀型別(0:單幀包1:多幀包)
                int frameType = (req[FRAMETYPE_OFFSET] & 0x04) == 0x04 ? 1 : 0;//獲取幀型別
                int bodylenght = byteToint(subBytes(req,bodyStartPos,LENGHT_OFFSET));//獲取體長度
                
                if(frameType==0){
                    //單幀包
                    // 如發現剩餘位元組不夠包體長度,回滾指標,等待下次解碼
                    if (in.readableBytes() < bodylenght) {
                        in.resetReaderIndex();
                        return null;
                    }
                    in.readBytes(buf, bodylenght);
                    ByteBuf frame = ctx.alloc().buffer(HEAD_LENGHT + bodylenght);
                    frame.writeBytes(buf, 0, HEAD_LENGHT + bodylenght);
                    buf.clear();
                    return frame;
                }else {
                    //多幀包
                    // 如發現剩餘位元組不夠包體長度,回滾指標,等待下次解碼
                    if (in.readableBytes() < bodylenght+4) {
                        in.resetReaderIndex();
                        return null;
                    }
                    in.readBytes(buf, bodylenght+4);
                    ByteBuf frame = ctx.alloc().buffer(HEAD_LENGHT + bodylenght+4);
                    frame.writeBytes(buf, 0, HEAD_LENGHT + bodylenght+4);
                    buf.clear();
                    return frame;
                }
            }
        }

     /**
         * 從一個byte[]陣列中擷取一部分
         * @param src
         * @param begin
         * @param count
         * @return
         */
        public static byte[] subBytes(byte[] req, int begin, int count) {
            byte[] bs = new byte[count];
            for (int i=begin; i<begin+count; i++) bs[i-begin] = req[i];
            return bs;
        }
        
        /**
         * 位元組轉int
         * @param src
         * @param begin
         * @param count
         * @return
         */
        public static int byteToint(byte[] res) {   
            // 一個byte資料左移24位變成0x??000000,再右移8位變成0x00??0000   
            int targets = (res[0] << 8 & 0xFF00) | (res[1] & 0xFF);
            return targets;   
        }
}

這種方式是為了解決自定義協議tcp粘包的問題。