1. 程式人生 > >Netty 心跳服務之 IdleStateHandler 源碼分析

Netty 心跳服務之 IdleStateHandler 源碼分析

如何使用 讀寫 輸出 源碼 狀態 short 段落 今天 sys

技術分享圖片

前言:Netty 提供的心跳介紹

Netty 作為一個網絡框架,提供了諸多功能,比如我們之前說的編解碼,Netty 準備很多現成的編解碼器,同時,Netty 還為我們準備了網絡中,非常重要的一個服務-----心跳機制。通過心跳檢查對方是否有效,這在 RPC 框架中是必不可少的功能。

Netty 提供了 IdleStateHandler ,ReadTimeoutHandler,WriteTimeoutHandler 檢測連接的有效性。當然,你也可以自己寫個任務。但我們今天不準備使用自定義任務,而是使用 Netty 內部的。

說以下這三個 handler 的作用。

序 號 名稱 作用
1 IdleStateHandler 當連接的空閑時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。然後,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。
2 ReadTimeoutHandler 如果在指定的事件沒有發生讀事件,就會拋出這個異常,並自動關閉這個連接。你可以在 exceptionCaught 方法中處理這個異常。
3 WriteTimeoutHandler 當一個寫操作不能在一定的時間內完成時,拋出此異常,並關閉連接。你同樣可以在 exceptionCaught 方法中處理這個異常。

註意:
其中,關於 WriteTimeoutHandler 的描述,著名的 《Netty 實戰》和 他的英文原版的描述都過時了,原文描述:

如果在指定的時間間隔內沒有任何出站數據寫入,則拋出一個 WriteTimeoutException.

此書出版的時候,Netty 的文檔確實是這樣的,但在 2015 年 12 月 28 號的時候,被一個同學修改了邏輯,見下方 git 日誌:

技術分享圖片

貌似還是個國人妹子。。。。而現在的文檔描述是:

Raises a {@link WriteTimeoutException} when a write operation cannot finish in a certain period of time.

當一個寫操作不能在一定的時間內完成時,就會產生一個 WriteTimeoutException。

ReadTimeout 事件和 WriteTimeout 事件都會自動關閉連接,而且,屬於異常處理,所以,這裏只是介紹以下,我們重點看 IdleStateHandler。

1. 什麽是 IdleStateHandler

  • 回顧一下 IdleStateHandler :

當連接的空閑時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。然後,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。

  • 如何使用呢?

IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 添加到 pipeline 中。然後在自己的 handler 中重寫 userEventTriggered 方法,當發生空閑事件(讀或者寫),就會觸發這個方法,並傳入具體事件。
這時,你可以通過 Context 對象嘗試向目標 Socekt 寫入數據,並設置一個 監聽器,如果發送失敗就關閉 Socket (Netty 準備了一個 ChannelFutureListener.CLOSE_ON_FAILURE 監聽器用來實現關閉 Socket 邏輯)。
這樣,就實現了一個簡單的心跳服務。


2. 源碼分析

  • 1.構造方法,該類有 3 個構造方法,主要對一下 4 個屬性賦值:
private final boolean observeOutput;// 是否考慮出站時較慢的情況。默認值是false(不考慮)。
private final long readerIdleTimeNanos; // 讀事件空閑時間,0 則禁用事件
private final long writerIdleTimeNanos;// 寫事件空閑時間,0 則禁用事件
private final long allIdleTimeNanos; //讀或寫空閑時間,0 則禁用事件
  • 2. handlerAdded 方法

當該 handler 被添加到 pipeline 中時,則調用 initialize 方法:

private void initialize(ChannelHandlerContext ctx) {
    switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
      // 這裏的 schedule 方法會調用 eventLoop 的 schedule 方法,將定時任務添加進隊列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

只要給定的參數大於0,就創建一個定時任務,每個事件都創建。同時,將 state 狀態設置為 1,防止重復初始化。調用 initOutputChanged 方法,初始化 “監控出站數據屬性”,代碼如下:

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 記錄了出站緩沖區相關的數據,buf 對象的 hash 碼,和 buf 的剩余緩沖字節數
        if (buf != null) {
            lastMessageHashCode = System.identityHashCode(buf.current());
            lastPendingWriteBytes = buf.totalPendingWriteBytes();
        }
    }
}

首先說說這個 observeOutput “監控出站數據屬性” 的作用。因為 github 上有人提了 issue ,issue 地址,本來是沒有這個參數的。為什麽需要呢?

假設:當你的客戶端應用每次接收數據是30秒,而你的寫空閑時間是 25 秒,那麽,當你數據還沒有寫出的時候,寫空閑時間觸發了。實際上是不合乎邏輯的。因為你的應用根本不空閑。

怎麽解決呢?

Netty 的解決方案是:記錄最後一次輸出消息的相關信息,並使用一個值 firstXXXXIdleEvent 表示是否再次活動過,每次讀寫活動都會將對應的 first 值更新為 true,如果是 false,說明這段時間沒有發生過讀寫事件。同時如果第一次記錄出站的相關數據和第二次得到的出站相關數據不同,則說明數據在緩慢的出站,就不用觸發空閑事件。

總的來說,這個字段就是用來對付 “客戶端接收數據奇慢無比,慢到比空閑時間還多” 的極端情況。所以,Netty 默認是關閉這個字段的。

  • 3. 該類內部的 3 個定時任務類

如下圖:

技術分享圖片

這 3 個定時任務分別對應 讀,寫,讀或者寫 事件。共有一個父類。這個父類提供了一個模板方法:

技術分享圖片

當通道關閉了,就不執行任務了。反之,執行子類的 run 方法。

1. 讀事件的 run 方法

代碼如下:

protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        // 用於取消任務 promise
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;

        try {
            // 再次提交任務
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            // 觸發用戶 handler use
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

該方法很簡單:

  1. 得到用戶設置的超時時間。
  2. 如果讀取操作結束了(執行了 channelReadComplete 方法設置) ,就用當前時間減去給定時間和最後一次讀操作的時間(執行了 channelReadComplete 方法設置),如果小於0,就觸發事件。反之,繼續放入隊列。間隔時間是新的計算時間。
  3. 觸發的邏輯是:首先將任務再次放到隊列,時間是剛開始設置的時間,返回一個 promise 對象,用於做取消操作。然後,設置 first 屬性為 false ,表示,下一次讀取不再是第一次了,這個屬性在 channelRead 方法會被改成 true。
  4. 創建一個 IdleStateEvent 類型的寫事件對象,將此對象傳遞給用戶的 UserEventTriggered 方法。完成觸發事件的操作。

總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算當前時間和最後一次讀的時間的間隔,如果間隔超過了設置的時間,就觸發 UserEventTriggered 方法。就是這麽簡單。

再看看寫事件任務。

2. 寫事件的 run 方法

寫任務的邏輯基本和讀任務的邏輯一樣,唯一不同的就是有一個針對 出站較慢數據的判斷。

 if (hasOutputChanged(ctx, first)) {
     return;
}

如果這個方法返回 true,就不執行觸發事件操作了,即使時間到了。看看該方法實現:

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {
        // 如果最後一次寫的時間和上一次記錄的時間不一樣,說明寫操作進行過了,則更新此值
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;
            // 但如果,在這個方法的調用間隙修改的,就仍然不觸發事件
            if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent
                return true;
            }
        }
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 如果出站區有數據
        if (buf != null) {
            // 拿到出站緩沖區的 對象 hashcode
            int messageHashCode = System.identityHashCode(buf.current());
            // 拿到這個 緩沖區的 所有字節
            long pendingWriteBytes = buf.totalPendingWriteBytes();
            // 如果和之前的不相等,或者字節數不同,說明,輸出有變化,將 "最後一個緩沖區引用" 和 “剩余字節數” 刷新
            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;
                // 如果寫操作沒有進行過,則任務寫的慢,不觸發空閑事件
                if (!first) {
                    return true;
                }
            }
        }
    }
    return false;
}

寫了一些註釋,還是再梳理一下吧:

  1. 如果用戶沒有設置了需要觀察出站情況。就返回 false,繼續執行事件。
  2. 反之,繼續向下, 如果最後一次寫的時間和上一次記錄的時間不一樣,說明寫操作剛剛做過了,則更新此值,但仍然需要判斷這個 first 的值,如果這個值還是 false,說明在這個寫事件是在兩個方法調用間隙完成的 / 或者是第一次訪問這個方法,就仍然不觸發事件。
  3. 如果不滿足上面的條件,就取出緩沖區對象,如果緩沖區沒對象了,說明沒有發生寫的很慢的事件,就觸發空閑事件。反之,記錄當前緩沖區對象的 hashcode 和 剩余字節數,再和之前的比較,如果任意一個不相等,說明數據在變化,或者說數據在慢慢的寫出去。那麽就更新這兩個值,留在下一次判斷。
  4. 繼續判斷 first ,如果是 fasle,說明這是第二次調用,就不用觸發空閑事件了。

整個邏輯如下:

技術分享圖片

這裏有個問題,為什麽第一次的時候一定要觸發事件呢?假設,客戶端開始變得很慢,這個時候,定時任務監聽發現時間到了,就進入這裏判斷,當上次記錄的緩沖區相關數據已經不同,這個時候難道觸發事件嗎?

實際上,這裏是 Netty 的一個考慮:假設真的發生了很寫出速度很慢的問題,很可能引發 OOM,相比叫連接空閑,這要嚴重多了。為什麽第一次一定要觸發事件呢?如果不觸發,用戶根本不知道發送了什麽,當一次寫空閑事件觸發,隨後出現了 OOM,用戶可以感知到:可能是寫的太慢,後面的數據根本寫不進去,所以發生了OOM。所以,這裏的一次警告還是必要的。

當然,這是我的一個猜測。有必要的話,可以去 Netty 那裏提個 issue。

好,關於客戶端寫的慢的特殊處理告一段落。再看看另一個任務的邏輯。

3. 所有事件的 run 方法

這個類叫做 AllIdleTimeoutTask ,表示這個監控著所有的事件。當讀寫事件發生時,都會記錄。代碼邏輯和寫事件的的基本一致,除了這裏:

long nextDelay = allIdleTimeNanos;
if (!reading) {
   // 當前時間減去 最後一次寫或讀 的時間 ,若大於0,說明超時了
   nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}

這裏的時間計算是取讀寫事件中的最大值來的。然後像寫事件一樣,判斷是否發生了寫的慢的情況。最後調用 ctx.fireUserEventTriggered(evt) 方法。

通常這個使用的是最多的。構造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

讀寫都是 0 表示禁用,30 表示 30 秒內沒有任務讀寫事件發生,就觸發事件。註意,當不是 0 的時候,這三個任務會重疊。

總結

IdleStateHandler 可以實現心跳功能,當服務器和客戶端沒有任何讀寫交互時,並超過了給定的時間,則會觸發用戶 handler 的 userEventTriggered 方法。用戶可以在這個方法中嘗試向對方發送信息,如果發送失敗,則關閉連接。

IdleStateHandler 的實現基於 EventLoop 的定時任務,每次讀寫都會記錄一個值,在定時任務運行的時候,通過計算當前時間和設置時間和上次事件發生時間的結果,來判斷是否空閑。

內部有 3 個定時任務,分別對應讀事件,寫事件,讀寫事件。通常用戶監聽讀寫事件就足夠了。

同時,IdleStateHandler 內部也考慮了一些極端情況:客戶端接收緩慢,一次接收數據的速度超過了設置的空閑時間。Netty 通過構造方法中的 observeOutput 屬性來決定是否對出站緩沖區的情況進行判斷。

如果出站緩慢,Netty 不認為這是空閑,也就不觸發空閑事件。但第一次無論如何也是要觸發的。因為第一次無法判斷是出站緩慢還是空閑。當然,出站緩慢的話,OOM 比空閑的問題更大。

所以,當你的應用出現了內存溢出,OOM之類,並且寫空閑極少發生(使用了 observeOutput 為 true),那麽就需要註意是不是數據出站速度過慢。

默認 observeOutput 是 false,意思是,即使你的應用出站緩慢,Netty 認為是寫空閑。

可見這個 observeOutput 的作用好像不是那麽重要,如果真的發生了出站緩慢,判斷是否空閑根本就不重要了,重要的是 OOM。所以 Netty 選擇了默認 false。

還有一個註意的地方:剛開始我們說的 ReadTimeoutHandler ,就是繼承自 IdleStateHandler,當觸發讀空閑事件的時候,就觸發 ctx.fireExceptionCaught 方法,並傳入一個 ReadTimeoutException,然後關閉 Socket。

而 WriteTimeoutHandler 的實現不是基於 IdleStateHandler 的,他的原理是,當調用 write 方法的時候,會創建一個定時任務,任務內容是根據傳入的 promise 的完成情況來判斷是否超出了寫的時間。當定時任務根據指定時間開始運行,發現 promise 的 isDone 方法返回 false,表明還沒有寫完,說明超時了,則拋出異常。當 write 方法完成後,會打斷定時任務。

好了,關於 Netty 自帶的心跳相關的類就介紹到這裏。這些功能對於開發穩定的高性能 RPC 至關重要。

good luck!!!

Netty 心跳服務之 IdleStateHandler 源碼分析