1. 程式人生 > >Netty原始碼分析 (十二)----- 心跳服務之 IdleStateHandler 原始碼分析

Netty原始碼分析 (十二)----- 心跳服務之 IdleStateHandler 原始碼分析

什麼是心跳機制?

心跳說的是在客戶端和服務端在互相建立ESTABLISH狀態的時候,如何通過傳送一個最簡單的包來保持連線的存活,還有監控另一邊服務的可用性等。

心跳包的作用

  • 保活
    Q:為什麼說心跳機制能保持連線的存活,它是叢集中或長連線中最為有效避免網路中斷的一個重要的保障措施?
    A:之所以說是“避免網路中斷的一個重要保障措施”,原因是:我們得知公網IP是一個寶貴的資源,一旦某一連線長時間的佔用並且不發資料,這怎能對得起網路給此連線分配公網IP,這簡直是對網路資源最大的浪費,所以基本上所有的NAT路由器都會定時的清除那些長時間沒有資料傳輸的對映表項。一是回收IP資源,二是釋放NAT路由器本身記憶體的資源,這樣問題就來了,連線被從中間斷開了,雙發還都不曉得對方已經連通不了了,還會繼續發資料,這樣會有兩個結果:a) 發方會收到NAT路由器的RST包,導致發方知道連線已中斷;b) 發方沒有收到任何NAT的回執,NAT只是簡單的drop相應的資料包

    通常我們測試得出的是第二種情況會多些,就是客戶端是不知道自己應經連線斷開了,所以這時候心跳就可以和NAT建立關聯了,只要我們在NAT認為合理連線的時間內傳送心跳資料包,這樣NAT會繼續keep連線的IP對映表項不被移除,達到了連線不會被中斷的目的。

  • 檢測另一端服務是否可用
    TCP的斷開可能有時候是不能瞬時探知的,甚至是不能探知的,也可能有很長時間的延遲,如果前端沒有正常的斷開TCP連線,四次握手沒有發起,服務端無從得知客戶端的掉線,這個時候我們就需要心跳包來檢測另一端服務是否還存活可用。

基於TCP的keepalive機制實現

基於TCP的keepalive機制,由具體的TCP協議棧來實現長連線的維持。如在netty中可以在建立channel的時候,指定SO_KEEPALIVE引數來實現:

存在的問題:Netty只能控制SO_KEEPALIVE這個引數,其他引數,則需要從系統的sysctl中讀取,其中比較關鍵的是tcp_keepalive_time,傳送心跳包檢測的時間間隔,預設為7200s,即空閒後,每2小時檢測一次。如果客戶端在這2小時內斷開了,那麼服務端也要維護這個連線2小時,浪費服務端資源;另外就是對於需要實時傳輸資料的場景,客戶端斷開了,服務端也要2小時後才能發現。服務端傳送心跳檢測,具體可能出現的情況如下:
(1)連線正常:客戶端仍然存在,網路連線狀況良好。此時客戶端會返回一個 ACK 。 服務端接收到ACK後重置計時器,在2小時後再發送探測。如果2小時內連線上有資料傳輸,那麼在該時間基礎上向後推延2個小時;

(2)連線斷開:客戶端異常關閉,或是網路斷開。在這兩種情況下,客戶端都不會響應。伺服器沒有收到對其發出探測的響應,並且在一定時間(系統預設為 1000 ms )後重復發送 keep-alive packet ,並且重複傳送一定次數。
(3)客戶端曾經崩潰,但已經重啟:這種情況下,伺服器將會收到對其存活探測的響應,但該響應是一個復位,從而引起伺服器對連線的終止。

基於Netty的IdleStateHandler實現

什麼是 IdleStateHandler

當連線的空閒時間(讀或者寫)太長時,將會觸發一個 IdleStateEvent 事件。然後,你可以通過你的 ChannelInboundHandler 中重寫 userEventTrigged 方法來處理該事件。

如何使用?

IdleStateHandler 既是出站處理器也是入站處理器,繼承了 ChannelDuplexHandler 。通常在 initChannel 方法中將 IdleStateHandler 新增到 pipeline 中。然後在自己的 handler 中重寫 userEventTriggered 方法,當發生空閒事件(讀或者寫),就會觸發這個方法,並傳入具體事件。
這時,你可以通過 Context 物件嘗試向目標 Socekt 寫入資料,並設定一個 監聽器,如果傳送失敗就關閉 Socket (Netty 準備了一個 ChannelFutureListener.CLOSE_ON_FAILURE 監聽器用來實現關閉 Socket 邏輯)。
這樣,就實現了一個簡單的心跳服務。

原始碼分析

構造方法

該類有 3 個構造方法,主要對一下 4 個屬性賦值:

private final boolean observeOutput;// 是否考慮出站時較慢的情況。預設值是false(不考慮)。
private final long readerIdleTimeNanos; // 讀事件空閒時間,0 則禁用事件
private final long writerIdleTimeNanos;// 寫事件空閒時間,0 則禁用事件
private final long allIdleTimeNanos; //讀或寫空閒時間,0 則禁用事件

可以分別控制讀,寫,讀寫超時的時間,單位為秒,如果是0表示不檢測,所以如果全是0,則相當於沒新增這個IdleStateHandler,連線是個普通的短連線。

handlerAdded 方法

IdleStateHandler是在建立IdleStateHandler例項並新增到ChannelPipeline時新增定時任務來進行定時檢測的,具體在initialize(ctx)方法實現;同時在從ChannelPipeline移除或Channel關閉時,移除這個定時檢測,具體在destroy()實現

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
        this.initialize(ctx);
    }

}

public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    this.destroy();
}

initialize

private void initialize(ChannelHandlerContext ctx) {
    switch (state) {
    case 1:
    case 2:
        return;
    }
    state = 1;
    initOutputChanged(ctx);

    lastReadTime = lastWriteTime = ticksInNanos();
    if (readerIdleTimeNanos > 0) {
      // 這裡的 schedule 方法會呼叫 eventLoop 的 schedule 方法,將定時任務新增進佇列中
        readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (writerIdleTimeNanos > 0) {
        writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                writerIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
    if (allIdleTimeNanos > 0) {
        allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                allIdleTimeNanos, TimeUnit.NANOSECONDS);
    }
}

只要給定的引數大於0,就建立一個定時任務,每個事件都建立。同時,將 state 狀態設定為 1,防止重複初始化。呼叫 initOutputChanged 方法,初始化 “監控出站資料屬性”,程式碼如下:

private void initOutputChanged(ChannelHandlerContext ctx) {
    if (observeOutput) {
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 記錄了出站緩衝區相關的資料,buf 物件的 hash 碼,和 buf 的剩餘緩衝位元組數
        if (buf != null) {
            lastMessageHashCode = System.identityHashCode(buf.current());
            lastPendingWriteBytes = buf.totalPendingWriteBytes();
        }
    }
}

讀事件的 run 方法

程式碼如下:

protected void run(ChannelHandlerContext ctx) {
    long nextDelay = readerIdleTimeNanos;
    if (!reading) {
        nextDelay -= ticksInNanos() - lastReadTime;
    }

    if (nextDelay <= 0) {
        // Reader is idle - set a new timeout and notify the callback.
        // 用於取消任務 promise
        readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);

        boolean first = firstReaderIdleEvent;
        firstReaderIdleEvent = false;

        try {
            // 再次提交任務
            IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
            // 觸發使用者 handler use
            channelIdle(ctx, event);
        } catch (Throwable t) {
            ctx.fireExceptionCaught(t);
        }
    } else {
        // Read occurred before the timeout - set a new timeout with shorter delay.
        readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
    }
}

nextDelay的初始化值為超時秒數readerIdleTimeNanos,如果檢測的時候沒有正在讀,且計算多久沒讀了:nextDelay -= 當前時間 - 上次讀取時間,如果小於0,說明左邊的readerIdleTimeNanos小於空閒時間(當前時間 - 上次讀取時間)了,則超時了
則建立IdleStateEvent事件,IdleState列舉值為READER_IDLE,然後呼叫channelIdle方法分發給下一個ChannelInboundHandler,通常由使用者自定義一個ChannelInboundHandler來捕獲並處理

總的來說,每次讀取操作都會記錄一個時間,定時任務時間到了,會計算當前時間和最後一次讀的時間的間隔,如果間隔超過了設定的時間,就觸發 UserEventTriggered 方法。就是這麼簡單。

寫事件的 run 方法

寫任務的邏輯基本和讀任務的邏輯一樣,唯一不同的就是有一個針對 出站較慢資料的判斷。

if (hasOutputChanged(ctx, first)) {
   return;
}

如果這個方法返回 true,就不執行觸發事件操作了,即使時間到了。看看該方法實現:

private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
    if (observeOutput) {
        // 如果最後一次寫的時間和上一次記錄的時間不一樣,說明寫操作進行過了,則更新此值
        if (lastChangeCheckTimeStamp != lastWriteTime) {
            lastChangeCheckTimeStamp = lastWriteTime;
            // 但如果,在這個方法的呼叫間隙修改的,就仍然不觸發事件
            if (!first) { // #firstWriterIdleEvent or #firstAllIdleEvent
                return true;
            }
        }
        Channel channel = ctx.channel();
        Unsafe unsafe = channel.unsafe();
        ChannelOutboundBuffer buf = unsafe.outboundBuffer();
        // 如果出站區有資料
        if (buf != null) {
            // 拿到出站緩衝區的 物件 hashcode
            int messageHashCode = System.identityHashCode(buf.current());
            // 拿到這個 緩衝區的 所有位元組
            long pendingWriteBytes = buf.totalPendingWriteBytes();
            // 如果和之前的不相等,或者位元組數不同,說明,輸出有變化,將 "最後一個緩衝區引用" 和 “剩餘位元組數” 重新整理
            if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
                lastMessageHashCode = messageHashCode;
                lastPendingWriteBytes = pendingWriteBytes;
                // 如果寫操作沒有進行過,則任務寫的慢,不觸發空閒事件
                if (!first) {
                    return true;
                }
            }
        }
    }
    return false;
}
  1. 如果使用者沒有設定了需要觀察出站情況。就返回 false,繼續執行事件。
  2. 反之,繼續向下, 如果最後一次寫的時間和上一次記錄的時間不一樣,說明寫操作剛剛做過了,則更新此值,但仍然需要判斷這個 first 的值,如果這個值還是 false,說明在這個寫事件是在兩個方法呼叫間隙完成的 / 或者是第一次訪問這個方法,就仍然不觸發事件。
  3. 如果不滿足上面的條件,就取出緩衝區物件,如果緩衝區沒物件了,說明沒有發生寫的很慢的事件,就觸發空閒事件。反之,記錄當前緩衝區物件的 hashcode 和 剩餘位元組數,再和之前的比較,如果任意一個不相等,說明資料在變化,或者說資料在慢慢的寫出去。那麼就更新這兩個值,留在下一次判斷。
  4. 繼續判斷 first ,如果是 fasle,說明這是第二次呼叫,就不用觸發空閒事件了。

所有事件的 run 方法

這個類叫做 AllIdleTimeoutTask ,表示這個監控著所有的事件。當讀寫事件發生時,都會記錄。程式碼邏輯和寫事件的的基本一致,除了這裡:

long nextDelay = allIdleTimeNanos;
if (!reading) {
   // 當前時間減去 最後一次寫或讀 的時間 ,若大於0,說明超時了
   nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}

這裡的時間計算是取讀寫事件中的最大值來的。然後像寫事件一樣,判斷是否發生了寫的慢的情況。最後呼叫 ctx.fireUserEventTriggered(evt) 方法。

通常這個使用的是最多的。構造方法一般是:

pipeline.addLast(new IdleStateHandler(0, 0, 30, TimeUnit.SECONDS));

讀寫都是 0 表示禁用,30 表示 30 秒內沒有任務讀寫事件發生,就觸發事件。注意,當不是 0 的時候,這三個任務會重疊。

&n