1. 程式人生 > >KafkaProducer Sender 執行緒詳解(含詳細的執行流程圖)

KafkaProducer Sender 執行緒詳解(含詳細的執行流程圖)

目錄

  • 1、Sender 執行緒詳解
  • 2、RecordAccumulator 核心方法詳解

溫馨提示:本文基於 Kafka 2.2.1 版本。

上文 《原始碼分析 Kafka 訊息傳送流程》 已經詳細介紹了 KafkaProducer send 方法的流程,該方法只是將訊息追加到 KafKaProducer 的快取中,並未真正的向 broker 傳送訊息,本文將來探討 Kafka 的 Sender 執行緒。

@(本節目錄)
在 KafkaProducer 中會啟動一個單獨的執行緒,其名稱為 “kafka-producer-network-thread | clientID”,其中 clientID 為生產者的 id 。

1、Sender 執行緒詳解

1.1 類圖


我們先來看一下其各個屬性的含義:

  • KafkaClient client
    kafka 網路通訊客戶端,主要封裝與 broker 的網路通訊。
  • RecordAccumulator accumulator
    訊息記錄累積器,訊息追加的入口(RecordAccumulator 的 append 方法)。
  • Metadata metadata
    元資料管理器,即 topic 的路由分割槽資訊。
  • boolean guaranteeMessageOrder
    是否需要保證訊息的順序性。
  • int maxRequestSize
    呼叫 send 方法傳送的最大請求大小,包括 key、訊息體序列化後的訊息總大小不能超過該值。通過引數 max.request.size 來設定。
  • short acks
    用來定義訊息“已提交”的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值如下0、-1、1.
  • int retries
    重試次數。
  • Time time
    時間工具類。
  • boolean running
    該執行緒狀態,為 true 表示執行中。
  • boolean forceClose
    是否強制關閉,此時會忽略正在傳送中的訊息。
  • SenderMetrics sensors
    訊息傳送相關的統計指標收集器。
  • int requestTimeoutMs
    請求的超時時間。
  • long retryBackoffMs
    請求失敗之在重試之前等待的時間。
  • ApiVersions apiVersions
    API版本資訊。
  • TransactionManager transactionManager
    事務處理器。
  • Map< TopicPartition, List< ProducerBatch>> inFlightBatches
    正在執行傳送相關的訊息批次。

1.2 run 方法詳解

Sender#run

public void run() {
    log.debug("Starting Kafka producer I/O thread.");
    while (running) {   
        try {
            runOnce();    // @1
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {    // @2
        try {
            runOnce();
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    if (forceClose) {                                                                                                                                     // @3
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        this.client.close();                                                                                                                               // @4
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }
    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

程式碼@1:Sender 執行緒在執行狀態下主要的業務處理方法,將訊息快取區中的訊息向 broker 傳送。
程式碼@2:如果主動關閉 Sender 執行緒,如果不是強制關閉,則如果快取區還有訊息待發送,再次呼叫 runOnce 方法將剩餘的訊息傳送完畢後再退出。
程式碼@3:如果強制關閉 Sender 執行緒,則拒絕未完成提交的訊息。
程式碼@4:關閉 Kafka Client 即網路通訊物件。

接下來將分別探討其上述方法的實現細節。

1.2.1 runOnce 詳解

Sender#runOnce

void runOnce() {
    // 此處省略與事務訊息相關的邏輯
    long currentTimeMs = time.milliseconds();
    long pollTimeout = sendProducerData(currentTimeMs);   // @1
    client.poll(pollTimeout, currentTimeMs);                            // @2
}

本文不關注事務訊息的實現原理,故省略了該部分的程式碼。
程式碼@1:呼叫 sendProducerData 方法傳送訊息。
程式碼@2:呼叫這個方法的作用?

接下來分別對上述兩個方法進行深入探究。

1.1.2.1 sendProducerData

接下來將詳細分析其實現步驟。
Sender#sendProducerData

Cluster cluster = metadata.fetch();
// get the list of partitions with data ready to send
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

Step1:首先根據當前時間,根據快取佇列中的資料判斷哪些 topic 的 哪些分割槽已經達到傳送條件。達到可傳送的條件將在 2.1.1.1 節詳細分析。

Sender#sendProducerData

if (!result.unknownLeaderTopics.isEmpty()) {
    for (String topic : result.unknownLeaderTopics)
        this.metadata.add(topic);
    
    log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
                result.unknownLeaderTopics);
    this.metadata.requestUpdate();
}

Step2:如果在待發送的訊息未找到其路由資訊,則需要首先去 broker 伺服器拉取對應的路由資訊(分割槽的 leader 節點資訊)。

Sender#sendProducerData

long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
    Node node = iter.next();
    if (!this.client.ready(node, now)) {
        iter.remove();
        notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
    }
}

Step3:移除在網路層面沒有準備好的分割槽,並且計算在接下來多久的時間間隔內,該分割槽都將處於未準備狀態。
1、在網路環節沒有準備好的標準如下:

  • 分割槽沒有未完成的更新元素資料請求(metadata)。
  • 當前生產者與對端 broker 已建立連線並完成了 TCP 的三次握手。
  • 如果啟用 SSL、ACL 等機制,相關狀態都已就緒。
  • 該分割槽對應的連線正在處理中的請求數時是否超過設定值,預設為 5,可通過屬性 max.in.flight.requests.per.connection 來設定。

2、client pollDelayMs 預估分割槽在接下來多久的時間間隔內都將處於未轉變好狀態(not ready),其標準如下:

  • 如果已與對端的 TCP 連線已建立好,並處於已連線狀態,此時如果沒有觸發限流,則返回0,如果有觸發限流,則返回限流等待時間。
  • 如果還位於對端建立 TCP 連線,則返回 Long.MAX_VALUE,因為連線建立好後,會喚醒傳送執行緒的。

Sender#sendProducerData

// create produce requests
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);

Step4:根據已準備的分割槽,從快取區中抽取待發送的訊息批次(ProducerBatch),並且按照 nodeId:List

Sender#sendProducerData

addToInflightBatches(batches);
public void addToInflightBatches(Map<Integer, List<ProducerBatch>> batches) {
    for (List<ProducerBatch> batchList : batches.values()) {
        addToInflightBatches(batchList);
    }
}
private void addToInflightBatches(List<ProducerBatch> batches) {
    for (ProducerBatch batch : batches) {
        List<ProducerBatch> inflightBatchList = inFlightBatches.get(batch.topicPartition);
        if (inflightBatchList == null) {
            inflightBatchList = new ArrayList<>();
            inFlightBatches.put(batch.topicPartition, inflightBatchList);
        }
        inflightBatchList.add(batch);
    }
}

Step5:將抽取的 ProducerBatch 加入到 inFlightBatches 資料結構,該屬性的宣告如下:Map<TopicPartition, List< ProducerBatch >> inFlightBatches,即按照 topic-分割槽 為鍵,存放已抽取的 ProducerBatch,這個屬性的含義就是儲存待發送的訊息批次。可以根據該資料結構得知在訊息傳送時以分割槽為維度反饋 Sender 執行緒的“積壓情況”,max.in.flight.requests.per.connection 就是來控制積壓的最大數量,如果積壓達到這個數值,針對該佇列的訊息傳送會限流。

Sender#sendProducerData

accumulator.resetNextBatchExpiryTime();
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
expiredBatches.addAll(expiredInflightBatches);

Step6:從 inflightBatches 與 batches 中查詢已過期的訊息批次(ProducerBatch),判斷是否過期的標準是系統當前時間與 ProducerBatch 建立時間之差是否超過120s,過期時間可以通過引數 delivery.timeout.ms 設定。

Sender#sendProducerData

if (!expiredBatches.isEmpty())
    log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
    String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
                + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
    failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
    if (transactionManager != null && expiredBatch.inRetry()) {
        // This ensures that no new batches are drained until the current in flight batches are fully resolved.
        transactionManager.markSequenceUnresolved(expiredBatch.topicPartition);
    }
}

Step7:處理已超時的訊息批次,通知該批訊息傳送失敗,即通過設定 KafkaProducer#send 方法返回的憑證中的 FutureRecordMetadata 中的 ProduceRequestResult result,使之呼叫其 get 方法不會阻塞。

Sender#sendProducerData

sensors.updateProduceRequestMetrics(batches);

Step8:收集統計指標,本文不打算詳細分析,但後續會專門對 Kafka 的 Metrics 設計進行一個深入的探討與學習。

Sender#sendProducerData

long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
    log.trace("Nodes with data ready to send: {}", result.readyNodes);
    pollTimeout = 0;
}

Step9:設定下一次的傳送延時,待補充詳細分析。

Sender#sendProducerData

sendProduceRequests(batches, now);
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
    for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
        sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}

Step10:該步驟按照 brokerId 分別構建傳送請求,即每一個 broker 會將多個 ProducerBatch 一起封裝成一個請求進行傳送,同一時間,每一個 與 broker 連線只會只能傳送一個請求,注意,這裡只是構建請求,並最終會通過 NetworkClient#send 方法,將該批資料設定到 NetworkClient 的待發送資料中,此時並沒有觸發真正的網路呼叫。

sendProducerData 方法就介紹到這裡了,既然這裡還沒有進行真正的網路請求,那在什麼時候觸發呢?

我們繼續回到 runOnce 方法。

1.2.1.2 NetworkClient 的 poll 方法
 public List<ClientResponse> poll(long timeout, long now) {
    ensureActive();

    if (!abortedSends.isEmpty()) {
        // If there are aborted sends because of unsupported version exceptions or disconnects,
        // handle them immediately without waiting for Selector#poll.
        List<ClientResponse> responses = new ArrayList<>();
        handleAbortedSends(responses);
        completeResponses(responses);
        return responses;
    }

    long metadataTimeout = metadataUpdater.maybeUpdate(now);   // @1
    try {
        this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));    // @2
    } catch (IOException e) {
        log.error("Unexpected error during I/O", e);
    }

    // process completed actions
    long updatedNow = this.time.milliseconds();
    List<ClientResponse> responses = new ArrayList<>();            // @3
    handleCompletedSends(responses, updatedNow);
    handleCompletedReceives(responses, updatedNow);
    handleDisconnections(responses, updatedNow);
    handleConnections();
    handleInitiateApiVersionRequests(updatedNow);
    handleTimedOutRequests(responses, updatedNow);
    completeResponses(responses);                                               // @4
    return responses;
}

本文並不會詳細深入探討其網路實現部分,Kafka 的 網路通訊後續我會專門詳細的介紹,在這裡先點出其關鍵點。
程式碼@1:嘗試更新雲資料。
程式碼@2:觸發真正的網路通訊,該方法中會通過收到呼叫 NIO 中的 Selector#select() 方法,對通道的讀寫就緒事件進行處理,當寫事件就緒後,就會將通道中的訊息傳送到遠端的 broker。
程式碼@3:然後會訊息傳送,訊息接收、斷開連線、API版本,超時等結果進行收集。
程式碼@4:並依次對結果進行喚醒,此時會將響應結果設定到 KafkaProducer#send 方法返回的憑證中,從而喚醒傳送客戶端,完成一次完整的訊息傳送流程。

Sender 傳送執行緒的流程就介紹到這裡了,接下來首先給出一張流程圖,然後對上述流程中一些關鍵的方法再補充深入探討一下。

1.2.2 run 方法流程圖


根據上面的原始碼分析得出上述流程圖,圖中對重點步驟也詳細標註了其關鍵點。下面我們對上述流程圖中 Sender 執行緒依賴的相關類的核心方法進行解讀,以便加深 Sender 執行緒的理解。

由於在講解 Sender 傳送流程中,大部分都是呼叫 RecordAccumulator 方法來實現其特定邏輯,故接下來重點對上述涉及到RecordAccumulator 的方法進行一個詳細剖析,加強對 Sender 流程的理解。

2、RecordAccumulator 核心方法詳解

2.1 RecordAccumulator 的 ready 方法詳解

該方法主要就是根據快取區中的訊息,判斷哪些分割槽已經達到傳送條件。

RecordAccumulator#ready

public ReadyCheckResult ready(Cluster cluster, long nowMs) {
    Set<Node> readyNodes = new HashSet<>();
    long nextReadyCheckDelayMs = Long.MAX_VALUE;
    Set<String> unknownLeaderTopics = new HashSet<>();

    boolean exhausted = this.free.queued() > 0;
    for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {   // @1
        TopicPartition part = entry.getKey();
        Deque<ProducerBatch> deque = entry.getValue();

        Node leader = cluster.leaderFor(part);   // @2
        synchronized (deque) {
            if (leader == null && !deque.isEmpty()) {   // @3
                // This is a partition for which leader is not known, but messages are available to send.
                // Note that entries are currently not removed from batches when deque is empty.
                unknownLeaderTopics.add(part.topic());
            } else if (!readyNodes.contains(leader) && !isMuted(part, nowMs)) {    // @4
                ProducerBatch batch = deque.peekFirst();
                if (batch != null) {
                    long waitedTimeMs = batch.waitedTimeMs(nowMs);
                    boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
                    long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
                    boolean full = deque.size() > 1 || batch.isFull();
                    boolean expired = waitedTimeMs >= timeToWaitMs;
                    boolean sendable = full || expired || exhausted || closed || flushInProgress();
                    if (sendable && !backingOff) {   // @5
                        readyNodes.add(leader);
                    } else {
                        long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
                        // Note that this results in a conservative estimate since an un-sendable partition may have
                        // a leader that will later be found to have sendable data. However, this is good enough
                        // since we'll just wake up and then sleep again for the remaining time.
                        nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);   
                    }
                }
            }
        }
    }
    return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}

程式碼@1:對生產者快取區 ConcurrentHashMap<TopicPartition, Deque< ProducerBatch>> batches 遍歷,從中挑選已準備好的訊息批次。
程式碼@2:從生產者元資料快取中嘗試查詢分割槽(TopicPartition) 的 leader 資訊,如果不存在,當將該 topic 新增到 unknownLeaderTopics (程式碼@3),稍後會發送元資料更新請求去 broker 端查詢分割槽的路由資訊。
程式碼@4:如果不在 readyNodes 中就需要判斷是否滿足條件,isMuted 與順序訊息有關,本文暫時不關注,在後面的順序訊息部分會重點探討。
程式碼@5:這裡就是判斷是否準備好的條件,先一個一個來解讀區域性變數的含義。

  • long waitedTimeMs
    該 ProducerBatch 已等待的時長,等於當前時間戳 與 ProducerBatch 的 lastAttemptMs 之差,在 ProducerBatch 建立時或需要重試時會將當前的時間賦值給lastAttemptMs。
  • retryBackoffMs
    當發生異常時發起重試之前的等待時間,預設為 100ms,可通過屬性 retry.backoff.ms 配置。
  • batch.attempts()
    該批次當前已重試的次數。
  • backingOff
    後臺傳送是否關閉,即如果需要重試並且等待時間小於 retryBackoffMs ,則 backingOff = true,也意味著該批次未準備好。
  • timeToWaitMs
    send 執行緒傳送訊息需要的等待時間,如果 backingOff 為 true,表示該批次是在重試,並且等待時間小於系統設定的需要等待時間,這種情況下 timeToWaitMs = retryBackoffMs 。否則需要等待的時間為 lingerMs。
  • boolean full
    該批次是否已滿,如果兩個條件中的任意一個滿足即為 true。
    • Deque< ProducerBatch> 該佇列的個數大於1,表示肯定有一個 ProducerBatch 已寫滿。
    • ProducerBatch 已寫滿。
  • boolean expired
    是否過期,等於已經等待的時間是否大於需要等待的時間,如果把傳送看成定時傳送的話,expired 為 true 表示定時器已到達觸發點,即需要執行。
  • boolean exhausted
    當前生產者快取已不夠,建立新的 ProducerBatch 時阻塞在申請快取空間的執行緒大於0,此時應立即將快取區中的訊息立即傳送到伺服器。
  • boolean sendable
    是否可傳送。其滿足下面的任意一個條件即可:
    • 該批次已寫滿。(full = true)。
    • 已等待系統規定的時長。(expired = true)
    • 傳送者內部快取區已耗盡並且有新的執行緒需要申請(exhausted = true)。
    • 該傳送者的 close 方法被呼叫(close = true)。
    • 該傳送者的 flush 方法被呼叫。

2.2 RecordAccumulator 的 drain方法詳解

RecordAccumulator#drain

public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) { // @1
    if (nodes.isEmpty())
        return Collections.emptyMap();

    Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
    for (Node node : nodes) {                                                                                                                              
        List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);                      // @2
        batches.put(node.id(), ready);
    }
    return batches;
}

程式碼@1:我們首先來介紹該方法的引數:

  • Cluster cluster
    叢集資訊。
  • Set< Node> nodes
    已準備好的節點集合。
  • int maxSize
    一次請求最大的位元組數。
  • long now
    當前時間。

程式碼@2:遍歷所有節點,呼叫 drainBatchesForOneNode 方法抽取資料,組裝成 Map<Integer /** brokerId */, List< ProducerBatch>> batches。

接下來重點來看一下 drainBatchesForOneNode。
RecordAccumulator#drainBatchesForOneNode

private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
    int size = 0;
    List<PartitionInfo> parts = cluster.partitionsForNode(node.id());   // @1
    List<ProducerBatch> ready = new ArrayList<>();
    int start = drainIndex = drainIndex % parts.size();                        // @2
    do {                                                                                                // @3 
        PartitionInfo part = parts.get(drainIndex);
        TopicPartition tp = new TopicPartition(part.topic(), part.partition()); 
        this.drainIndex = (this.drainIndex + 1) % parts.size();                     
            
        if (isMuted(tp, now))
            continue;

        Deque<ProducerBatch> deque = getDeque(tp);                              // @4
        if (deque == null)
            continue;

        synchronized (deque) {
            // invariant: !isMuted(tp,now) && deque != null
            ProducerBatch first = deque.peekFirst();                                         // @5
            if (first == null)
                continue;

            // first != null
            boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;   // @6
            // Only drain the batch if it is not during backoff period.
            if (backoff)                                                                                     
                continue;

            if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {     // @7
                break;
            } else {
                if (shouldStopDrainBatchesForPartition(first, tp))                                  
                    break;

                // 這裡省略與事務訊息相關的程式碼,後續會重點學習。
                batch.close();                                                                                            // @8
                size += batch.records().sizeInBytes();
                ready.add(batch);                                                                            

                batch.drained(now);                                                                             
            }
        }
    } while (start != drainIndex);
    return ready;
}

程式碼@1:根據 brokerId 獲取該 broker 上的所有主分割槽。
程式碼@2:初始化 start。這裡首先來闡述一下 start 與 drainIndex 。

  • start 當前開始遍歷的分割槽序號。
  • drainIndex 上次抽取的佇列索引後,這裡主要是為了每個佇列都是從零號分割槽開始抽取。

程式碼@3:迴圈從快取區抽取對應分割槽中累積的資料。
程式碼@4:根據 topic + 分割槽號從生產者傳送快取區中獲取已累積的雙端Queue。
程式碼@5:從雙端佇列的頭部獲取一個元素。(訊息追加時是追加到佇列尾部)。
程式碼@6:如果當前批次是重試,並且還未到阻塞時間,則跳過該分割槽。
程式碼@7:如果當前已抽取的訊息總大小 加上新的訊息已超過 maxRequestSize,則結束抽取。
程式碼@8:將當前批次加入到已準備集合中,並關閉該批次,即不在允許向該批次中追加訊息。

關於訊息傳送就介紹到這裡,NetworkClient 的 poll 方法內部會呼叫 Selector 執行就緒事件的選擇,並將抽取的訊息通過網路傳送到 Broker 伺服器,關於網路後面的具體實現,將在後續文章中單獨介紹。


作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社群佈道師,公眾號:中介軟體興趣圈 維護者,目前已陸續發表原始碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等原始碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。