netty原始碼分析 之九 handler
阿新 • • 發佈:2019-02-16
學習完前面的channel,回頭來學習handler 會感覺到很簡單的.
handler 這個包裡面的類實現 ChannelHandlerAdapter
codec我們最後來看,先看其他
logging
LoggingHandler 為log的輸出類, 定義模板,具體實現為 InternalLogger這個介面,log4j logback之類的可以實現介面
ssl
SslHandler 這個類有點不一樣,繼承自 ByteToMessageDecoder,重點看decoder方法
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException { final int startOffset = in.readerIndex(); final int endOffset = in.writerIndex(); int offset = startOffset; // If we calculated the length of the current SSL record before, use that information. if (packetLength > 0) { if (endOffset - startOffset < packetLength) { return; } else { offset += packetLength; packetLength = 0; } } boolean nonSslRecord = false; for (;;) { final int readableBytes = endOffset - offset; if (readableBytes < 5) { break; } final int packetLength = getEncryptedPacketLength(in, offset); if (packetLength == -1) { nonSslRecord = true; break; } assert packetLength > 0; if (packetLength > readableBytes) { // wait until the whole packet can be read this.packetLength = packetLength; break; } offset += packetLength; } final int length = offset - startOffset; if (length > 0) { // The buffer contains one or more full SSL records. // Slice out the whole packet so unwrap will only be called with complete packets. // Also directly reset the packetLength. This is needed as unwrap(..) may trigger // decode(...) again via: // 1) unwrap(..) is called // 2) wrap(...) is called from within unwrap(...) // 3) wrap(...) calls unwrapLater(...) // 4) unwrapLater(...) calls decode(...) // // See https://github.com/netty/netty/issues/1534 in.skipBytes(length); ByteBuffer buffer = in.nioBuffer(startOffset, length); unwrap(ctx, buffer, out); } if (nonSslRecord) { // Not an SSL/LS packet NotSslRecordException e = new NotSslRecordException( "not an SSL/TLS record: " + ByteBufUtil.hexDump(in)); in.skipBytes(in.readableBytes()); ctx.fireExceptionCaught(e); setHandshakeFailure(e); } }
stream
ChunkedWriteHandler chunked一般為分塊的意思,又如http的chunked編碼,指不知道返回的大小。
看其duflush方法private void doFlush(final ChannelHandlerContext ctx) throws Exception { final Channel channel = ctx.channel(); if (!channel.isActive()) { discard(null); return; } boolean needsFlush; while (channel.isWritable()) { if (currentWrite == null) { currentWrite = queue.poll(); } if (currentWrite == null) { break; } needsFlush = true; final PendingWrite currentWrite = this.currentWrite; final Object pendingMessage = currentWrite.msg; if (pendingMessage instanceof ChunkedInput) { final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage; boolean endOfInput; boolean suspend; Object message = null; try { message = chunks.readChunk(ctx); endOfInput = chunks.isEndOfInput(); if (message == null) { // No need to suspend when reached at the end. suspend = !endOfInput; } else { suspend = false; } } catch (final Throwable t) { this.currentWrite = null; if (message != null) { ReferenceCountUtil.release(message); } currentWrite.fail(t); closeInput(chunks); break; } if (suspend) { // ChunkedInput.nextChunk() returned null and it has // not reached at the end of input. Let's wait until // more chunks arrive. Nothing to write or notify. break; } if (message == null) { // If message is null write an empty ByteBuf. // See https://github.com/netty/netty/issues/1671 message = Unpooled.EMPTY_BUFFER; } final int amount = amount(message); ChannelFuture f = ctx.write(message); if (endOfInput) { this.currentWrite = null; // Register a listener which will close the input once the write is complete. // This is needed because the Chunk may have some resource bound that can not // be closed before its not written. // // See https://github.com/netty/netty/issues/303 f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { currentWrite.progress(amount); currentWrite.success(); closeInput(chunks); } }); } else if (channel.isWritable()) { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { closeInput((ChunkedInput<?>) pendingMessage); currentWrite.fail(future.cause()); } else { currentWrite.progress(amount); } } }); } else { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { closeInput((ChunkedInput<?>) pendingMessage); currentWrite.fail(future.cause()); } else { currentWrite.progress(amount); if (channel.isWritable()) { resumeTransfer(); } } } }); } } else { ctx.write(pendingMessage, currentWrite.promise); this.currentWrite = null; } if (needsFlush) { ctx.flush(); } if (!channel.isActive()) { discard(new ClosedChannelException()); return; } } }
timeout
超時handle.
IdleStateHandler 用來計算每次使用的空閒時間,來觸發ctx.fireUserEventTriggered(evt); 空閒時間,讓使用者自己來編寫處理邏輯
lastReadTime = lastWriteTime = System.currentTimeMillis(); if (readerIdleTimeMillis > 0) { readerIdleTimeout = loop.schedule( new ReaderIdleTimeoutTask(ctx), readerIdleTimeMillis, TimeUnit.MILLISECONDS); } if (writerIdleTimeMillis > 0) { writerIdleTimeout = loop.schedule( new WriterIdleTimeoutTask(ctx), writerIdleTimeMillis, TimeUnit.MILLISECONDS); } if (allIdleTimeMillis > 0) { allIdleTimeout = loop.schedule( new AllIdleTimeoutTask(ctx), allIdleTimeMillis, TimeUnit.MILLISECONDS); }
ReadTimeoutHandler 讀取超時,報異常
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
WriteTimeoutHandler 寫超時,報異常
ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
traffic
單詞的含義是交通燈的意思,看你程式碼實現主要是為了控制頻寬,及讀入與寫出的速度
如下圖程式碼中 getTimeToWait方法
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long curtime = System.currentTimeMillis();
long size = calculateSize(msg);
if (size > -1 && trafficCounter != null) {
trafficCounter.bytesWriteFlowControl(size);
if (writeLimit == 0) {
ctx.write(msg, promise);
return;
}
// compute the number of ms to wait before continue with the
// channel
long wait = getTimeToWait(writeLimit,
trafficCounter.currentWrittenBytes(),
trafficCounter.lastTime(), curtime);
if (wait >= MINIMAL_WAIT) {
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
ctx.write(msg, promise);
}
}, wait, TimeUnit.MILLISECONDS);
return;
}
}
ctx.write(msg, promise);
}
ipfilter
主要用來限制ip,防止誤用。其實也很簡單註冊的時候,呼叫handleNewChannel ,裡面有accpet,如果符合ip規則,則通過,否則直接關閉
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
handleNewChannel(ctx);
ctx.fireChannelRegistered();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
if (!handleNewChannel(ctx)) {
throw new IllegalStateException("cannot determine to accept or reject a channel: " + ctx.channel());
} else {
ctx.fireChannelActive();
}
}
private boolean handleNewChannel(ChannelHandlerContext ctx) throws Exception {
@SuppressWarnings("unchecked")
T remoteAddress = (T) ctx.channel().remoteAddress();
// If the remote address is not available yet, defer the decision.
if (remoteAddress == null) {
return false;
}
// No need to keep this handler in the pipeline anymore because the decision is going to be made now.
// Also, this will prevent the subsequent events from being handled by this handler.
ctx.pipeline().remove(this);
if (accept(ctx, remoteAddress)) {
channelAccepted(ctx, remoteAddress);
} else {
ChannelFuture rejectedFuture = channelRejected(ctx, remoteAddress);
if (rejectedFuture != null) {
rejectedFuture.addListener(ChannelFutureListener.CLOSE);
} else {
ctx.close();
}
}
return true;
}
/**
* This method is called immediately after a {@link io.netty.channel.Channel} gets registered.
*
* @return Return true if connections from this IP address and port should be accepted. False otherwise.
*/
protected abstract boolean accept(ChannelHandlerContext ctx, T remoteAddress) throws Exception;