1. 程式人生 > >kafka生產者網絡層總結

kafka生產者網絡層總結

pre sre ring tsp 檢查 remove complete ava article

1 層次結構

負責進行網絡IO請求的是NetworkClient,主要層次結構如下
技術分享圖片

ClusterConnectionStates報存了每個節點的狀態,以node為key,以node的狀態為value;inFlightRequets中保存了每個節點已經發送的請求,但是還沒有返回的請求,以node為key,以List<ClientRequest>為value。inFlightRequets從名字也可以看出,表示“正在空中飛”的請求。

2 如何保證每次只發送一個請求

sender線程啟動後,如果RecordBatch中有消息,會將消息按照所在節點重新排列,每個節點會創建一個ClientRequest,用來發送,每個節點每次只能發送一個ClientRequest,如下
KafkaChannel#setSend(..)

public void setSend(Send send) {
        if (this.send != null) // 如果已經有send,會拋出異常
            throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress.");
        this.send = send;
        this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
 }

那麽kafka是如何保證避免setSend的時候KafkaChannel中已經有send了呢,這個關鍵就是在sender線程中會調用NetworkClient#ready(..),會將沒有ready的節點去除掉,從而不會在該節點上setSend:

while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) { // 關鍵
                iter.remove();
                notReadyTimeout = Math.min
(notReadyTimeout, this.client.connectionDelay(node, now)); } }

3 NetworkClient#ready(..)

NetworkClient#ready(..)檢查節點是否準備好,從而決定是否可以將消息封裝成ClientRequest放到KafkaChannel上。

public boolean ready(Node node, long now) {
        if (node.isEmpty())
            throw new IllegalArgumentException("Cannot connect to empty node " + node);

        if (isReady(node, now)) // 關鍵
            return true;

        if (connectionStates.canConnect(node.idString(), now))
            initiateConnect(node, now);

        return false;
    }

我們來分析下isReady

public boolean isReady(Node node, long now) {
        return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString());
    }

isReady主要兩個條件,一個是判斷metadata是否到了更新的時候了,如果metadata需要更新,那麽就不發送本次請求,也就是metadata更新優先級高。第二個是判斷這個節點是否canSendRequest。

private boolean canSendRequest(String node) {
        return connectionStates.isConnected(node) && selector.isChannelReady(node) 
        && inFlightRequests.canSendMore(node); // 重點
    }

inFlightRequests報保存的是“正在空中飛”的請求

public boolean canSendMore(String node) {
        Deque<ClientRequest> queue = requests.get(node);
        return queue == null || queue.isEmpty() ||
               (queue.peekFirst().request().completed() && queue.size() < this.maxInFlightRequestsPerConnection);
    }

滿足以下幾個條件,表示可以繼續send

  1. queue是空,即該節點沒有“正在空中飛”的request
  2. queue不為空。queue中排在最開頭的request已經completed 並且queue的大小小於允許的最大值。如何理解呢?queue是一個雙端隊列,每次set的時候都會在queue的頭部插入,所以queue中第一個就是正在發送的,或者說是KafkaChannel中的send。只要當send發送到網絡中的時候才可以繼續發送。這就保證了前面說的“如何保證每次只發送一個請求”。

4 參考

  1. https://blog.csdn.net/chunlongyu/article/details/52651960

kafka生產者網絡層總結