1. 程式人生 > >netty原始碼分析 之九 handler

netty原始碼分析 之九 handler

學習完前面的channel,回頭來學習handler 會感覺到很簡單的.

handler 這個包裡面的類實現  ChannelHandlerAdapter 


codec我們最後來看,先看其他

logging

LoggingHandler 為log的輸出類, 定義模板,具體實現為 InternalLogger這個介面,log4j logback之類的可以實現介面


ssl

SslHandler 這個類有點不一樣,繼承自 ByteToMessageDecoder,重點看decoder方法

   protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws SSLException {
        final int startOffset = in.readerIndex();
        final int endOffset = in.writerIndex();
        int offset = startOffset;

        // If we calculated the length of the current SSL record before, use that information.
        if (packetLength > 0) {
            if (endOffset - startOffset < packetLength) {
                return;
            } else {
                offset += packetLength;
                packetLength = 0;
            }
        }

        boolean nonSslRecord = false;

        for (;;) {
            final int readableBytes = endOffset - offset;
            if (readableBytes < 5) {
                break;
            }

            final int packetLength = getEncryptedPacketLength(in, offset);
            if (packetLength == -1) {
                nonSslRecord = true;
                break;
            }

            assert packetLength > 0;

            if (packetLength > readableBytes) {
                // wait until the whole packet can be read
                this.packetLength = packetLength;
                break;
            }

            offset += packetLength;
        }

        final int length = offset - startOffset;
        if (length > 0) {
            // The buffer contains one or more full SSL records.
            // Slice out the whole packet so unwrap will only be called with complete packets.
            // Also directly reset the packetLength. This is needed as unwrap(..) may trigger
            // decode(...) again via:
            // 1) unwrap(..) is called
            // 2) wrap(...) is called from within unwrap(...)
            // 3) wrap(...) calls unwrapLater(...)
            // 4) unwrapLater(...) calls decode(...)
            //
            // See https://github.com/netty/netty/issues/1534
            in.skipBytes(length);
            ByteBuffer buffer = in.nioBuffer(startOffset, length);
            unwrap(ctx, buffer, out);
        }

        if (nonSslRecord) {
            // Not an SSL/LS packet
            NotSslRecordException e = new NotSslRecordException(
                    "not an SSL/TLS record: " + ByteBufUtil.hexDump(in));
            in.skipBytes(in.readableBytes());
            ctx.fireExceptionCaught(e);
            setHandshakeFailure(e);
        }
    }


stream

ChunkedWriteHandler chunked一般為分塊的意思,又如http的chunked編碼,指不知道返回的大小。

看其duflush方法
    private void doFlush(final ChannelHandlerContext ctx) throws Exception {
        final Channel channel = ctx.channel();
        if (!channel.isActive()) {
            discard(null);
            return;
        }
        boolean needsFlush;
        while (channel.isWritable()) {
            if (currentWrite == null) {
                currentWrite = queue.poll();
            }

            if (currentWrite == null) {
                break;
            }
            needsFlush = true;
            final PendingWrite currentWrite = this.currentWrite;
            final Object pendingMessage = currentWrite.msg;

            if (pendingMessage instanceof ChunkedInput) {
                final ChunkedInput<?> chunks = (ChunkedInput<?>) pendingMessage;
                boolean endOfInput;
                boolean suspend;
                Object message = null;
                try {
                    message = chunks.readChunk(ctx);
                    endOfInput = chunks.isEndOfInput();

                    if (message == null) {
                        // No need to suspend when reached at the end.
                        suspend = !endOfInput;
                    } else {
                        suspend = false;
                    }
                } catch (final Throwable t) {
                    this.currentWrite = null;

                    if (message != null) {
                        ReferenceCountUtil.release(message);
                    }

                    currentWrite.fail(t);
                    closeInput(chunks);
                    break;
                }

                if (suspend) {
                    // ChunkedInput.nextChunk() returned null and it has
                    // not reached at the end of input. Let's wait until
                    // more chunks arrive. Nothing to write or notify.
                    break;
                }

                if (message == null) {
                    // If message is null write an empty ByteBuf.
                    // See https://github.com/netty/netty/issues/1671
                    message = Unpooled.EMPTY_BUFFER;
                }

                final int amount = amount(message);
                ChannelFuture f = ctx.write(message);
                if (endOfInput) {
                    this.currentWrite = null;

                    // Register a listener which will close the input once the write is complete.
                    // This is needed because the Chunk may have some resource bound that can not
                    // be closed before its not written.
                    //
                    // See https://github.com/netty/netty/issues/303
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            currentWrite.progress(amount);
                            currentWrite.success();
                            closeInput(chunks);
                        }
                    });
                } else if (channel.isWritable()) {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                closeInput((ChunkedInput<?>) pendingMessage);
                                currentWrite.fail(future.cause());
                            } else {
                                currentWrite.progress(amount);
                            }
                        }
                    });
                } else {
                    f.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                closeInput((ChunkedInput<?>) pendingMessage);
                                currentWrite.fail(future.cause());
                            } else {
                                currentWrite.progress(amount);
                                if (channel.isWritable()) {
                                    resumeTransfer();
                                }
                            }
                        }
                    });
                }
            } else {
                ctx.write(pendingMessage, currentWrite.promise);
                this.currentWrite = null;
            }

            if (needsFlush) {
                ctx.flush();
            }
            if (!channel.isActive()) {
                discard(new ClosedChannelException());
                return;
            }
        }
    }

timeout

超時handle. 

IdleStateHandler  用來計算每次使用的空閒時間,來觸發ctx.fireUserEventTriggered(evt); 空閒時間,讓使用者自己來編寫處理邏輯

        lastReadTime = lastWriteTime = System.currentTimeMillis();
        if (readerIdleTimeMillis > 0) {
            readerIdleTimeout = loop.schedule(
                    new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeMillis, TimeUnit.MILLISECONDS);
        }
        if (writerIdleTimeMillis > 0) {
            writerIdleTimeout = loop.schedule(
                    new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeMillis, TimeUnit.MILLISECONDS);
        }
        if (allIdleTimeMillis > 0) {
            allIdleTimeout = loop.schedule(
                    new AllIdleTimeoutTask(ctx),
                    allIdleTimeMillis, TimeUnit.MILLISECONDS);
        }

ReadTimeoutHandler 讀取超時,報異常

ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);

WriteTimeoutHandler 寫超時,報異常

ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);

traffic

單詞的含義是交通燈的意思,看你程式碼實現主要是為了控制頻寬,及讀入與寫出的速度

如下圖程式碼中 getTimeToWait方法

public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
            throws Exception {
        long curtime = System.currentTimeMillis();
        long size = calculateSize(msg);

        if (size > -1 && trafficCounter != null) {
            trafficCounter.bytesWriteFlowControl(size);
            if (writeLimit == 0) {
                ctx.write(msg, promise);
                return;
            }
            // compute the number of ms to wait before continue with the
            // channel
            long wait = getTimeToWait(writeLimit,
                    trafficCounter.currentWrittenBytes(),
                    trafficCounter.lastTime(), curtime);
            if (wait >= MINIMAL_WAIT) {
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ctx.write(msg, promise);
                    }
                }, wait, TimeUnit.MILLISECONDS);
                return;
            }
        }
        ctx.write(msg, promise);
    }

ipfilter

主要用來限制ip,防止誤用。其實也很簡單

註冊的時候,呼叫handleNewChannel ,裡面有accpet,如果符合ip規則,則通過,否則直接關閉

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        handleNewChannel(ctx);
        ctx.fireChannelRegistered();
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (!handleNewChannel(ctx)) {
            throw new IllegalStateException("cannot determine to accept or reject a channel: " + ctx.channel());
        } else {
            ctx.fireChannelActive();
        }
    }

    private boolean handleNewChannel(ChannelHandlerContext ctx) throws Exception {
        @SuppressWarnings("unchecked")
        T remoteAddress = (T) ctx.channel().remoteAddress();

        // If the remote address is not available yet, defer the decision.
        if (remoteAddress == null) {
            return false;
        }

        // No need to keep this handler in the pipeline anymore because the decision is going to be made now.
        // Also, this will prevent the subsequent events from being handled by this handler.
        ctx.pipeline().remove(this);

        if (accept(ctx, remoteAddress)) {
            channelAccepted(ctx, remoteAddress);
        } else {
            ChannelFuture rejectedFuture = channelRejected(ctx, remoteAddress);
            if (rejectedFuture != null) {
                rejectedFuture.addListener(ChannelFutureListener.CLOSE);
            } else {
                ctx.close();
            }
        }

        return true;
    }

    /**
     * This method is called immediately after a {@link io.netty.channel.Channel} gets registered.
     *
     * @return Return true if connections from this IP address and port should be accepted. False otherwise.
     */
    protected abstract boolean accept(ChannelHandlerContext ctx, T remoteAddress) throws Exception;