1. 程式人生 > >Netty中channelRead收不到自定義解碼器的發來的資料

Netty中channelRead收不到自定義解碼器的發來的資料

在netty中配置Decoder一定注意相互的影響

   protected void initChannel(SocketChannel socketChannel)  {
        ChannelPipeline pipeline = socketChannel.pipeline();
        ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
        pipeline.addLast(new CustomFrameDecoder(1000, delimiter));
        pipeline.addLast
(new HttpRequestDecoder()); pipeline.addLast(DECODER); pipeline.addLast(ENCODER); pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); pipeline.addLast(nettyServerHandler); }

這裡本人自定義的Decoder 和HttpRequestDecoder衝突 當配置HttpRequestDecoder時自定義的Decoder返回的資料並未返回到ChannelRead

## 自定義Decoder 獲取以$符為開頭4位數字為數的頭,資料體的位數是頭中4位數的length ##

package com.llvision.netty.decoder;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import
org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; /** * @author yd * 2017/04/08 */ public class CustomFrameDecoder extends ByteToMessageDecoder { private final Logger logger= LoggerFactory.getLogger(CustomFrameDecoder.class); private static int HEADER_SIZE = 4; private final ByteBuf[] delimiters; private final int maxFrameLength; private final boolean stripDelimiter=true; private final boolean failFast=true; private boolean discardingTooLongFrame; private static ByteBuf buf = Unpooled.buffer(); private int tooLongFrameLength; public CustomFrameDecoder(int maxFrameLength,ByteBuf delimiter ) { this(maxFrameLength,new ByteBuf[]{delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())}); } public CustomFrameDecoder(int maxFrameLength, ByteBuf... delimiters ) { if(delimiters == null) { throw new NullPointerException("delimiters"); } else if(delimiters.length == 0) { throw new IllegalArgumentException("empty delimiters"); } else { this.delimiters = new ByteBuf[delimiters.length]; for (int i = 0; i < delimiters.length; ++i) { ByteBuf d = delimiters[i]; validateDelimiter(d); this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes()); } this.maxFrameLength = maxFrameLength; } } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { Object decoded = this.decode(ctx, in); if(decoded != null) { out.add(decoded); } } protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) { try{ int minFrameLength = 2147483647; ByteBuf minDelim = null; ByteBuf[] minDelimLength = this.delimiters; int frame = minDelimLength.length; int tooLongFrameLength; for(tooLongFrameLength = 0; tooLongFrameLength < frame; ++tooLongFrameLength) { ByteBuf delim = minDelimLength[tooLongFrameLength]; int frameLength = indexOf(buffer, delim); if(frameLength >= 0 && frameLength < minFrameLength) { minFrameLength = frameLength; minDelim = delim; } } if(minDelim != null) { int var10 = minDelim.capacity(); if(this.discardingTooLongFrame) { this.discardingTooLongFrame = false; buffer.skipBytes(minFrameLength + var10); tooLongFrameLength = this.tooLongFrameLength; this.tooLongFrameLength = 0; if(!this.failFast) { this.fail((long)tooLongFrameLength); } return null; } else if(minFrameLength > this.maxFrameLength) { buffer.skipBytes(minFrameLength + var10); this.fail((long)minFrameLength); return null; } else { //如果buffer的中小於5表達根本不可能有頭的位置,所以跳過 if(buffer.readableBytes()<=5){return null;} int bodyLength=bodyLenght(buffer); ByteBuf var11; if((bodyLength+5)<=buffer.readableBytes()){ if(this.stripDelimiter) { buffer.skipBytes(minFrameLength+5); var11 = buffer.readRetainedSlice(bodyLength); } else { buffer.skipBytes(5); var11 = buffer.readRetainedSlice(bodyLength + var10); buffer.skipBytes(bodyLength); } }else{ return null; } return var11; } } else { if(!this.discardingTooLongFrame) { if(buffer.readableBytes() > this.maxFrameLength) { this.tooLongFrameLength = buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); this.discardingTooLongFrame = true; if(this.failFast) { this.fail((long)this.tooLongFrameLength); } } } else { this.tooLongFrameLength += buffer.readableBytes(); buffer.skipBytes(buffer.readableBytes()); } return null; } }catch (Exception e){ e.printStackTrace(); return null; } } private static int indexOf(ByteBuf haystack, ByteBuf needle) { for(int i = haystack.readerIndex(); i < haystack.writerIndex(); ++i) { int haystackIndex = i; int needleIndex; for(needleIndex = 0; needleIndex < needle.capacity() && haystack.getByte(haystackIndex) == needle.getByte(needleIndex); ++needleIndex) { ++haystackIndex; if(haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) { return -1; } } if(needleIndex == needle.capacity()) { return i - haystack.readerIndex(); } } return -1; } /** * 功能:獲取資訊頭中的長度 * */ private int bodyLenght(ByteBuf buf){ int bodyLength=0; int headerIndex=buf.readerIndex()+1,headerEnd=buf.readerIndex()+4; for(;headerIndex<=headerEnd;headerIndex++){ bodyLength*=10; bodyLength+=(int)buf.getByte(headerIndex)-48; } return bodyLength; } private void fail(long frameLength) { if(frameLength > 0L) { throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded"); } else { throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + " - discarding"); } } private static void validateDelimiter(ByteBuf delimiter) { if(delimiter == null) { throw new NullPointerException("delimiter"); } else if(!delimiter.isReadable()) { throw new IllegalArgumentException("empty delimiter"); } } }