1. 程式人生 > >netty原始碼深入研究(從客戶端入手)第四篇(讀寫超時詳解)

netty原始碼深入研究(從客戶端入手)第四篇(讀寫超時詳解)

怎麼設定讀寫超時的監聽函式呢,首先從文件開始,或者看看官方有沒有例子,一般任何平臺的官方都會或多或少的提供例子。

官方文件有這樣一個類new IdleStateHandler(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds),我這裡就用中式英語翻譯,大體的意思就是空閒狀態的處理者,第一個引數設定讀超時時間,第二個引數設定寫超時時間,第三個設定總的超時時間。

怎麼用呢,咱們先看看它的父類是什麼public class IdleStateHandler extends ChannelDuplexHandler,那麼ChannelDuplexHandler

又是什麼鬼,public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter,繼承與ChannelInboundHandlerAdapter,前幾篇介紹過ChannelInboundHandlerAdapter這個類標記了這個管道是用來連線讀管道的,那麼疑問來了,寫超時它是怎麼計算的。通過以下入口點,進入空閒狀態的處理者內部。

ch.pipeline().addLast("IdleState",
new IdleStateHandler(5, 5, 10));

 public IdleStateHandler(boolean observeOutput,
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        this.observeOutput = observeOutput;

        if (readerIdleTime <= 0) {
            readerIdleTimeNanos = 0;
        } else {
            readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (writerIdleTime <= 0) {
            writerIdleTimeNanos = 0;
        } else {
            writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
        }
        if (allIdleTime <= 0) {
            allIdleTimeNanos = 0;
        } else {
            allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
        }
    }
構造方法裡,判斷設定的時間值,如果小於等於0,設定時間為0,總時間取預設值和設定值的最大值。直接點類進入後我們只能跟蹤到構造方法,其他方法基本上都是被回撥的方法,看這個
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActive() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet.  this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

handlerAdded()方法在什麼時候被執行呢,第一篇講過連線之前,要進行註冊,那麼跟進去看看是不是那個地方執行的,該方法在DefaultChannelPipeline的callHandlerAdded0方法裡呼叫
 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
        	//檢查handler是否合法
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);

            // If the registered is false it means that the channel was not registered on an eventloop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                    	//被呼叫
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        //被呼叫
        callHandlerAdded0(newCtx);
        return this;
    }
看程式碼中文註釋部分,此方法原來是在新增的時候被呼叫,和註冊沒關係

如果此時連線已經建立,那麼就初始化這個管道,0k,開始執行initialize方法,

 private void initialize(ChannelHandlerContext ctx) {
    	//第一次進入的時候state為預設值,肯定不是1和2,跳過
        switch (state) {
        case 1:
        case 2:
            return;
        }

        state = 1;
        //設定是否監聽寫的資訊,預設不監聽
        initOutputChanged(ctx);
        //當前時間
        lastReadTime = lastWriteTime = ticksInNanos();
        //假如設定的超時時間有效則啟動監聽任務
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }

啟動任務最後還是加入到NioEventLoop的訊息佇列中,並延時執行,任務執行每次都是在讀寫以後。接下來看一下這幾個任務都幹了什麼

讀任務的程式碼:

 protected void run(ChannelHandlerContext ctx) {
            long nextDelay = readerIdleTimeNanos;
            //是否正在讀,每次呼叫管道讀的時候reading會變成true,而讀完後reading變成false
            if (!reading) {
            	//沒有讀則計算時間
                nextDelay -= ticksInNanos() - lastReadTime;
            }
             //如果計算值為負數,則超時
            if (nextDelay <= 0) {
                // Reader is idle - set a new timeout and notify the callback.
            	//進行下一次統計任務
                readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstReaderIdleEvent;
                firstReaderIdleEvent = false;

                try {
                	//通知超時
                    IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Read occurred before the timeout - set a new timeout with shorter delay.
            	//進行下一次統計任務
                readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }

通過上面程式碼可以發現,每次執行到任務時獲取當前的時間減去上一次讀取的時間,如果大於最大讀取時間的話,那麼回撥超時介面。那麼回撥的是什麼呢?跟進看一下
 protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

最後又通過管道包裝類把回撥分發出去
    @Override
    public ChannelHandlerContext fireUserEventTriggered(final Object event) {
        invokeUserEventTriggered(findContextInbound(), event);
        return this;
    }

最後又執行到迴圈呼叫佇列方法的地方
 private void invokeUserEventTriggered(Object event) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).userEventTriggered(this, event);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireUserEventTriggered(event);
        }
    }

因為將管道新增進去的時候,invokeHander已然返回true,所以必然執行((ChannelInboundHandler) handler()).userEventTriggered(this, event);

也就是每一個讀管道的

public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
			throws Exception {
		// TODO Auto-generated method stub
		if (evt instanceof IdleStateEvent) {
			IdleStateEvent event = (IdleStateEvent) evt;
			if (event.state().equals(IdleState.ALL_IDLE)) {
				// 未進行寫操作傳送心跳包
				String heratBeat = "{\"event\":\"HEARTBEAT\"}";
				ctx.write(Unpooled.copiedBuffer(heratBeat.getBytes()));
				ctx.flush();
			}
		}
		super.userEventTriggered(ctx, evt);
	}
這個方法,我們可以在這個裡面做一些事情,比如一定時間沒有和伺服器發信息了,那麼傳送心跳,看super.userEventTriggered(ctx, evt),它的父類方法都做了什麼
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

奧,這不是又把事件分發給佇列的下一個管道處理類了嗎,預設的解析器我們不會重寫userEventTriggered方法,所以他會直接分發給下一個管道類處理。


那麼繼續,從上面的讀超時任務,知道有一個最後讀取資料的時間變數lastReadTime,那麼它是怎麼計算的呢?

 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
            lastReadTime = ticksInNanos();
            reading = false;
        }
        ctx.fireChannelReadComplete();
    }

這個值是讀資料完成的回撥方法裡執行的,第二篇有詳細介紹
 public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    //讀管道處理操作回撥
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());
                //讀取完成的操作回撥
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }
    }
看程式碼中文註釋,現在讀超時的思路已經清晰。接著看寫任務做了哪些
 protected void run(ChannelHandlerContext ctx) {

            long lastWriteTime = IdleStateHandler.this.lastWriteTime;
            long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
            if (nextDelay <= 0) {
                // Writer is idle - set a new timeout and notify the callback.
                writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);

                boolean first = firstWriterIdleEvent;
                firstWriterIdleEvent = false;

                try {
                    if (hasOutputChanged(ctx, first)) {
                        return;
                    }

                    IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
                    channelIdle(ctx, event);
                } catch (Throwable t) {
                    ctx.fireExceptionCaught(t);
                }
            } else {
                // Write occurred before the timeout - set a new timeout with shorter delay.
                writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
            }
        }
    }
執行邏輯和讀差不多,其他跳過直接看lastWriteTime這個引數哪來的

 private final ChannelFutureListener writeListener = new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            lastWriteTime = ticksInNanos();
            firstWriterIdleEvent = firstAllIdleEvent = true;
        }
    };

哎要,它是在回撥函式中設定的值

 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        // Allow writing with void promise if handler is only configured for read timeout events.
        if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
            ChannelPromise unvoid = promise.unvoid();
            unvoid.addListener(writeListener);
            ctx.write(msg, unvoid);
        } else {
            ctx.write(msg, promise);
        }
    }

是在write方法中加入的回撥值,額什麼時候執行write呢
   private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }
只要實現了此介面就能監聽的管道的寫訊息,所以IdleStateHandler實現了implements ChannelOutboundHandler介面,寫方法當然會執行,就和第二篇寫訊息流程一樣,不再具體分析。

第五篇的時候,我會來一個netty的總結,把類關係圖弄一下,那樣感覺更直觀。