1. 程式人生 > >netty原始碼解解析(4.0)-17 ChannelHandler: IdleStateHandler實現

netty原始碼解解析(4.0)-17 ChannelHandler: IdleStateHandler實現

   io.netty.handler.timeout.IdleStateHandler功能是監測Channel上read, write或者這兩者的空閒狀態。當Channel超過了指定的空閒時間時,這個Handler會觸發一個IdleStateEvent事件。

  在第一次檢測到Channel變成active狀態時向EventExecutor中提交三個延遲任務:

    ReaderIdleTimeoutTask: 檢測read空閒超時。

    WriterIdleTimeoutTask: 檢測write空閒超時。

    AllIdleTimeoutTask: 檢測所有的空閒超時。

  任何一個延遲任務檢測到空閒超時是會觸發一個IdleStateEvent。無論如何,延遲任務都會再次把自己提交到EventExecutor中,等待下次執行。

  三個延遲任務對應於三個超時時間,都是可以獨立設定的:

 1 public IdleStateHandler(boolean observeOutput,
 2             long readerIdleTime, long writerIdleTime, long allIdleTime,
 3             TimeUnit unit) {
 4         if (unit == null) {
 5             throw new NullPointerException("unit");
 6         }
 7 
 8         this.observeOutput = observeOutput;
 9 
10         if (readerIdleTime <= 0) {
11             readerIdleTimeNanos = 0;
12         } else {
13             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
14         }
15         if (writerIdleTime <= 0) {
16             writerIdleTimeNanos = 0;
17         } else {
18             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
19         }
20         if (allIdleTime <= 0) {
21             allIdleTimeNanos = 0;
22         } else {
23             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
24         }
25     }

  這個類繼承自io.netty.channel.ChannelDuplexHandler, 它是一個有狀態的ChannelHandler, 定義了三個狀態:

  private byte state; // 0 - none, 1 - initialized, 2 - destroyed

  state屬性儲存了它的狀態。0:初始狀態,1:已經初始化, 2: 已經銷燬。

  這個ChannelHandler被加入到Channel的pipeline中之後,在Channel已經被register到EventLoop中,且處於Active狀態時,會執行一次初始化操作,向EventExecutor提交前面提到的三個延遲任務。這初始化操作在initialize方法中實現。

 1     private void initialize(ChannelHandlerContext ctx) {
 2         // Avoid the case where destroy() is called before scheduling timeouts.
 3         // See: https://github.com/netty/netty/issues/143
 4         switch (state) {
 5         case 1:
 6         case 2:
 7             return;
 8         }
 9 
10         state = 1;
11         initOutputChanged(ctx);
12 
13         lastReadTime = lastWriteTime = ticksInNanos();
14         if (readerIdleTimeNanos > 0) {
15             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
16                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);
17         }
18         if (writerIdleTimeNanos > 0) {
19             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
20                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);
21         }
22         if (allIdleTimeNanos > 0) {
23             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
24                     allIdleTimeNanos, TimeUnit.NANOSECONDS);
25         }
26     }

 

  第4-10行,只有處於初始狀態時才執行後面的操作,避免多次提交定時任務。

  第11行, 初始化對對Channel的outboundBuffer變化的監視,只有當observeOutput屬性設定為true時才開啟這個監視。

  第13-25行,分別提交三個延遲任務。

 

  initialize方法可能在三個地方被呼叫:

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
            // channelActive() event has been fired already, which means this.channelActive() will
            // not be invoked. We have to initialize here instead.
            initialize(ctx);
        } else {
            // channelActive() event has not been fired yet.  this.channelActive() will be invoked
            // and initialization will occur there.
        }
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        // Initialize early if channel is active already.
        if (ctx.channel().isActive()) {
            initialize(ctx);
        }
        super.channelRegistered(ctx);
    }


    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // This method will be invoked only if this handler was added
        // before channelActive() event is fired.  If a user adds this handler
        // after the channelActive() event, initialize() will be called by beforeAdd().
        initialize(ctx);
        super.channelActive(ctx);
    }

 

  如果在Channel初始化的時候把這個Handler新增到pipeline中,那麼這個Handler的channelActive方法一定會被呼叫,只需要在channleActive中呼叫initialize就可以打了。但是Handler可以在任何時候被加入到pipleline中。當ChannelHandler被新增到pipeline中時,Channel可能已經被register到EventLoop中,且已經處於Active狀態,這種情況下,channelRegistered和channelActive方法都不會被呼叫,所以必須在handlerAdded中呼叫initialize。如果此時,Channnel已經處於Active狀態,但還沒被註冊到EventLoop,只能在channelRegisted中呼叫initialize。

  

  初始化完成之後,延遲任務到期執行時會把自己再次提交到EventExecutor中,等待下次執行。同時會檢查是否滿足觸發事件的條件,如果是就觸發一條自定義的事件。

  

read空閒超時檢查

 1 private final class ReaderIdleTimeoutTask extends AbstractIdleTask {
 2         @Override
 3         protected void run(ChannelHandlerContext ctx) {
 4             long nextDelay = readerIdleTimeNanos;
 5             if (!reading) {
 6                 nextDelay -= ticksInNanos() - lastReadTime;
 7             }
 8 
 9             if (nextDelay <= 0) {
10                 // Reader is idle - set a new timeout and notify the callback.
11                 readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
12 
13                 boolean first = firstReaderIdleEvent;
14                 firstReaderIdleEvent = false;
15 
16                 try {
17                     IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
18                     channelIdle(ctx, event);
19                 } catch (Throwable t) {
20                     ctx.fireExceptionCaught(t);
21                 }
22             } else {
23                 // Read occurred before the timeout - set a new timeout with shorter delay.
24                 readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
25             }
26         }
27     }

  4-9行,判斷是否read空閒超時。

  11-21行,read空閒超時,重新把自己提交成延遲任務。

  24行,read沒有空閒超時,重新把自己提交成延遲任務。

  這裡的關鍵是判斷read空閒超時。lastReadTime是最近一次執行read的時間,readerIdleTimeNanos是初始化時設定的空閒超時時間,因此如果readerIdleTimeNanos - (ticksInNanos() - lastReadtime)  <= 0,表示已經read空閒超時了。令人困惑的是第5行,只有在reading==false才檢查進行空閒超時的計算。筆者在<<netty原始碼解解析(4.0)-14 Channel NIO實現:讀取資料>>一章中分析過Channel read的實現。一次read操作或觸發多個read和一個readComplete事件,read操作由多個步驟組成。這reading屬性用來表示正在read的狀態。

 1     @Override
 2     public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 3         if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
 4             reading = true;
 5             firstReaderIdleEvent = firstAllIdleEvent = true;
 6         }
 7         ctx.fireChannelRead(msg);
 8     }
 9 
10     @Override
11     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
12         if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
13             lastReadTime = ticksInNanos();
14             reading = false;
15         }
16         ctx.fireChannelReadComplete();
17     }

  3-4行,在設定了讀空閒超時或所有空閒超時的情況下,會吧reading設定成true,表示當前正處於正在read的狀態。

  12-14行,在設定了讀空閒超時或所有空閒超時的情況下, 如果當前正處於read狀態,把reading設定成false,同時更新最近一次執行read的時間。

 

write空閒超時檢查

 1     private final class WriterIdleTimeoutTask extends AbstractIdleTask {
 2 
 3         @Override
 4         protected void run(ChannelHandlerContext ctx) {
 5 
 6             long lastWriteTime = IdleStateHandler.this.lastWriteTime;
 7             long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
 8             if (nextDelay <= 0) {
 9                 // Writer is idle - set a new timeout and notify the callback.
10                 writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS);
11 
12                 boolean first = firstWriterIdleEvent;
13                 firstWriterIdleEvent = false;
14 
15                 try {
16                     if (hasOutputChanged(ctx, first)) {
17                         return;
18                     }
19 
20                     IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
21                     channelIdle(ctx, event);
22                 } catch (Throwable t) {
23                     ctx.fireExceptionCaught(t);
24                 }
25             } else {
26                 // Write occurred before the timeout - set a new timeout with shorter delay.
27                 writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
28             }
29         }
30     }

 

  6-8行,檢查write空閒超時,和檢查read空閒超時類似。

  12-21行,如果write空閒超時,且outboundBuffer中的資料沒有變化, 觸發write空閒超時事件。

  這裡呼叫了hasOutputChanged方法檢查outboundBuffer中的資料是否有變化。筆者在<<netty原始碼解解析(4.0)-15 Channel NIO實現:寫資料>>中分write實現時,已經講過,每個Channel都以一個outboundBuffer, write的資料會先序列化成Byte流追加到outboundBuffer中,然後再從outboundBuffer中順序讀出Byte流執行真正的write操作。在Handler的write方法沒有被呼叫的情況下,如果outboundBuffer中有資料,且資料傳送了變化,表示正在執行真正的write操作,反之則意味著Channel處於不可寫的狀態,無法執行真正的write操作。write空閒超時事件只會在write空閒超時且沒有執行真正write操作的時候才會觸發。另外,這個檢查有個開關屬性,只有observeOutput==true時才會檢查。

  

  AllIdleTimeoutTask的實現和WriterIdleTimeoutTask類似,只不過檢查超時的條件有些差別:read和write任何一個空閒超時都算超時。

 

ReadTimeoutHandler實現

  ReadTimeoutHandler繼承了IdleStateHandler類,它的功能是在觸發read空閒超時事件時觸發一個ReadTimeoutException異常,同時關閉Channel。 

    @Override
    protected final void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        assert evt.state() == IdleState.READER_IDLE;
        readTimedOut(ctx);
    }

    /**
     * Is called when a read timeout was detected.
     */
    protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }

 

 

WriteTimeoutHandler實現

  WriteTimeoutHandler繼承了ChannelOutboundHandlerAdapter,它的功能是在觸發監視Channel的write呼叫超時,如果超時則關閉掉這個Channel。和ReadTimeoutHandler不同,它監控的不是空閒超時,而是Channel的write方法返回的Promise超時。

  首先在write時候,為每個Promise新增一個監控超時的延遲任務:

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        scheduleTimeout(ctx, promise);
        ctx.write(msg, promise);
    }
    private void scheduleTimeout(final ChannelHandlerContext ctx, final ChannelPromise promise) {
        // Schedule a timeout.
        final WriteTimeoutTask task = new WriteTimeoutTask(ctx, promise);
        task.scheduledFuture = ctx.executor().schedule(task, timeoutNanos, TimeUnit.NANOSECONDS);

        if (!task.scheduledFuture.isDone()) {
            addWriteTimeoutTask(task);

            // Cancel the scheduled timeout if the flush promise is complete.
            promise.addListener(task);
        }
    }

   然後,如果延遲任務執行的時候檢查到Promise超時,就觸發一個WriteTimeoutException異常,然後關閉掉這個Channel。

    protected void writeTimedOut(ChannelHandlerContext ctx) throws Exception {
        if (!closed) {
            ctx.fireExceptionCaught(WriteTimeoutException.INSTANCE);
            ctx.close();
            closed = true;
        }
    }

   WriteTimeoutTask類同時實現了Runnable和ChannelFutureListener介面,超時後會呼叫run方法。

 1         @Override
 2         public void run() {
 3             // Was not written yet so issue a write timeout
 4             // The promise itself will be failed with a ClosedChannelException once the close() was issued
 5             // See https://github.com/netty/netty/issues/2159
 6             if (!promise.isDone()) {
 7                 try {
 8                     writeTimedOut(ctx);
 9                 } catch (Throwable t) {
10                     ctx.fireExceptionCaught(t);
11                 }
12             }
13             removeWriteTimeoutTask(this);
14         }

  7-10行,promise沒有完成,觸發WriteTimeoutException或其他異常。

      13行,write已經完成,刪除當前的WriteTimeoutTask物件。

    如果promise已經完成, 會呼叫operationComplete方法, 清理掉當前的WriteTimeoutTask物件。

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            // scheduledFuture has already be set when reaching here
            scheduledFuture.cancel(false);
            removeWriteTimeoutTask(this);
        }

   

 &n