Netty中channelRead收不到自定義解碼器的發來的資料
阿新 • • 發佈:2019-02-08
在netty中配置Decoder一定注意相互的影響
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
ByteBuf delimiter = Unpooled.copiedBuffer("$".getBytes());
pipeline.addLast(new CustomFrameDecoder(1000, delimiter));
pipeline.addLast (new HttpRequestDecoder());
pipeline.addLast(DECODER);
pipeline.addLast(ENCODER);
pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(nettyServerHandler);
}
這裡本人自定義的Decoder 和HttpRequestDecoder衝突 當配置HttpRequestDecoder時自定義的Decoder返回的資料並未返回到ChannelRead
## 自定義Decoder 獲取以$符為開頭4位數字為數的頭,資料體的位數是頭中4位數的length ##
package com.llvision.netty.decoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.TooLongFrameException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* @author yd
* 2017/04/08
*/
public class CustomFrameDecoder extends ByteToMessageDecoder {
private final Logger logger= LoggerFactory.getLogger(CustomFrameDecoder.class);
private static int HEADER_SIZE = 4;
private final ByteBuf[] delimiters;
private final int maxFrameLength;
private final boolean stripDelimiter=true;
private final boolean failFast=true;
private boolean discardingTooLongFrame;
private static ByteBuf buf = Unpooled.buffer();
private int tooLongFrameLength;
public CustomFrameDecoder(int maxFrameLength,ByteBuf delimiter ) {
this(maxFrameLength,new ByteBuf[]{delimiter.slice(delimiter.readerIndex(), delimiter.readableBytes())});
}
public CustomFrameDecoder(int maxFrameLength, ByteBuf... delimiters ) {
if(delimiters == null) {
throw new NullPointerException("delimiters");
} else if(delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; ++i) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
this.maxFrameLength = maxFrameLength;
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = this.decode(ctx, in);
if(decoded != null) {
out.add(decoded);
}
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
try{
int minFrameLength = 2147483647;
ByteBuf minDelim = null;
ByteBuf[] minDelimLength = this.delimiters;
int frame = minDelimLength.length;
int tooLongFrameLength;
for(tooLongFrameLength = 0; tooLongFrameLength < frame; ++tooLongFrameLength) {
ByteBuf delim = minDelimLength[tooLongFrameLength];
int frameLength = indexOf(buffer, delim);
if(frameLength >= 0 && frameLength < minFrameLength) {
minFrameLength = frameLength;
minDelim = delim;
}
}
if(minDelim != null) {
int var10 = minDelim.capacity();
if(this.discardingTooLongFrame) {
this.discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + var10);
tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if(!this.failFast) {
this.fail((long)tooLongFrameLength);
}
return null;
} else if(minFrameLength > this.maxFrameLength) {
buffer.skipBytes(minFrameLength + var10);
this.fail((long)minFrameLength);
return null;
} else {
//如果buffer的中小於5表達根本不可能有頭的位置,所以跳過
if(buffer.readableBytes()<=5){return null;}
int bodyLength=bodyLenght(buffer);
ByteBuf var11;
if((bodyLength+5)<=buffer.readableBytes()){
if(this.stripDelimiter) {
buffer.skipBytes(minFrameLength+5);
var11 = buffer.readRetainedSlice(bodyLength);
} else {
buffer.skipBytes(5);
var11 = buffer.readRetainedSlice(bodyLength + var10);
buffer.skipBytes(bodyLength);
}
}else{
return null;
}
return var11;
}
} else {
if(!this.discardingTooLongFrame) {
if(buffer.readableBytes() > this.maxFrameLength) {
this.tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
this.discardingTooLongFrame = true;
if(this.failFast) {
this.fail((long)this.tooLongFrameLength);
}
}
} else {
this.tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}catch (Exception e){
e.printStackTrace();
return null;
}
}
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for(int i = haystack.readerIndex(); i < haystack.writerIndex(); ++i) {
int haystackIndex = i;
int needleIndex;
for(needleIndex = 0; needleIndex < needle.capacity() && haystack.getByte(haystackIndex) == needle.getByte(needleIndex); ++needleIndex) {
++haystackIndex;
if(haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) {
return -1;
}
}
if(needleIndex == needle.capacity()) {
return i - haystack.readerIndex();
}
}
return -1;
}
/**
* 功能:獲取資訊頭中的長度
* */
private int bodyLenght(ByteBuf buf){
int bodyLength=0;
int headerIndex=buf.readerIndex()+1,headerEnd=buf.readerIndex()+4;
for(;headerIndex<=headerEnd;headerIndex++){
bodyLength*=10;
bodyLength+=(int)buf.getByte(headerIndex)-48;
}
return bodyLength;
}
private void fail(long frameLength) {
if(frameLength > 0L) {
throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + ": " + frameLength + " - discarded");
} else {
throw new TooLongFrameException("frame length exceeds " + this.maxFrameLength + " - discarding");
}
}
private static void validateDelimiter(ByteBuf delimiter) {
if(delimiter == null) {
throw new NullPointerException("delimiter");
} else if(!delimiter.isReadable()) {
throw new IllegalArgumentException("empty delimiter");
}
}
}