1. 程式人生 > >【Kafka 原始碼解讀】之 【程式碼沒報錯但是訊息卻傳送失敗!】

【Kafka 原始碼解讀】之 【程式碼沒報錯但是訊息卻傳送失敗!】

聊聊最近,2020年,在2019年的年尾時,大家可謂對這年充滿新希望,特別是有20200202這一天。可是澳洲長達幾個月的大火,新型冠狀病毒nCoV的發現,科比的去世等等事情,讓大家感到相當的無奈,生命是如此的脆弱,明天又是如此的未知。但是人應當活在當下,勇敢的面對疫情,和大家和政府一起打贏這場沒硝煙的戰爭!
作為程式設計師,我必定不能停止工作,不能停止學習,現在在家辦公,完全配合現在提倡的隔離戰術,對自己負責,對社會負責。下面我會和大家分享一篇我之前寫的筆記,和大家一起討論關於 Kafka 的一個問題:為什麼 Kafka 傳送訊息失敗?

一、問題:訊息傳送失敗

雖然在使用資源後關閉資源是非常正常的操作,但是確實我們也是經常會缺少呼叫 close()

關閉資源的的程式碼,特別是在自己寫 demo 的時候。而且,以前寫關於檔案 IO 的例子時,不寫 close() 方法確實也不會報錯或者出現問題,但是那為啥到了 Kafka 這裡,不寫就會出現問題呢?

Properties properties = new Properties();
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
               StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
               StringSerializer.class.getName());
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
KafkaProducer<String, String> producer =
    new KafkaProducer<>(properties);
ProducerRecord<String, String> record =
    new ProducerRecord<>(topic, "hello, Kafka!");
try {
    producer.send(record);
} catch (Exception e) {
    e.printStackTrace();
}
//不寫導致訊息傳送失敗
//producer.close();

二、猜測

1.首先我們看一下close()方法。

註釋:此方法會一直阻塞直到之前所有的傳送請求都完成。

/**
     * Close this producer. This method blocks until all previously sent requests complete.
     * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
     * <p>
     * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
     * will be called instead. We do this because the sender thread would otherwise try to join itself and
     * block forever.</strong>
     * <p>
     *
     * @throws InterruptException If the thread is interrupted while blocked
     */
    @Override
    public void close() {
        close(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

2.然後再到KafkaProducer中的註釋

生產者由一個緩衝區空間池組成,其中儲存尚未傳輸到伺服器的記錄,以及一個後臺I/O執行緒,該執行緒負責將這些記錄轉換為請求並將它們傳輸到叢集。使用後不關閉生產商將洩露這些資源。

/*
* <p>
* The producer consists of a pool of buffer space that holds records that haven't yet been transmitted to the server
* as well as a background I/O thread that is responsible for turning these records into requests and transmitting them
* to the cluster. Failure to close the producer after use will leak these resources.
* <p>
*/

3.猜測總結

到這裡我們可以稍微總結一下,KafkaProduce r傳送訊息並不是立刻往 Kafka 中傳送,而是先存在一個緩衝區裡,然後有一條後臺執行緒去不斷地讀取訊息,然後再往Kafka中傳送。我們也可以總結一下,之所以不寫 close() 方法,我們的 main() 方法中,傳送完 main() 方法就執行完了,而此時訊息可能只是剛到緩衝區中,還沒被後臺執行緒去讀取然後傳送。

三、驗證

下面我們要閱讀 KafkaProducer 的原始碼,來驗證上面的總結。

1.KafkaProducer建立

最要關注的是:建立了存放訊息的佇列,並且建立了一條後臺執行緒,主要是從佇列中獲取訊息,往 kafka 中傳送。

KafkaProducer(ProducerConfig config,
              Serializer<K> keySerializer,
              Serializer<V> valueSerializer,
              Metadata metadata,
              KafkaClient kafkaClient) {
    try {
        // ..... 省略掉很多其他程式碼,主要是關於Producer的配置,例如clientId、序列化和攔截器等等。
        
        // 這裡就是存放訊息的佇列。
        this.accumulator = new RecordAccumulator(logContext,
                                                 config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
                                                 this.totalMemorySize,
                                                 this.compressionType,
                                                 config.getLong(ProducerConfig.LINGER_MS_CONFIG),
                                                 retryBackoffMs,
                                                 metrics,
                                                 time,
                                                 apiVersions,
                                                 transactionManager);
        // .... 繼續省略無關配置
        
        // 這個Sender實現了Runnable,是一條後臺執行緒,處理向Kafka叢集傳送生產請求的後臺執行緒
        this.sender = new Sender(logContext,
                                 client,
                                 this.metadata,
                                 this.accumulator,
                                 maxInflightRequests == 1,
                                 config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
                                 acks,
                                 retries,
                                 metricsRegistry.senderMetrics,
                                 Time.SYSTEM,
                                 this.requestTimeoutMs,
                                 config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
                                 this.transactionManager,
                                 apiVersions);
        String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
        this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
        // 啟動執行緒
        this.ioThread.start();
        this.errors = this.metrics.sensor("errors");
        config.logUnused();
        AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics);
        log.debug("Kafka producer started");
    } catch (Throwable t) {
        // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
        close(0, TimeUnit.MILLISECONDS, true);
        // now propagate the exception
        throw new KafkaException("Failed to construct kafka producer", t);
    }
}

2.KafkaProducer傳送訊息

傳送訊息並不是直接就往 kafka 傳送,而是存放到我們上面提及到的佇列 accumulator。

/**
  * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
  * 非同步傳送訊息記錄到指定主題
  * See {@link #send(ProducerRecord, Callback)} for details.
*/
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return send(record, null);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    // intercept the record, which can be potentially modified; this method does not throw exceptions
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return doSend(interceptedRecord, callback);
}

/**
 * Implementation of asynchronously send a record to a topic.
 * 實現非同步傳送訊息到對應的主題
*/
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        // ... 省略了一些程式碼,主要是關於序列化、分割槽、事務、CallBack等等的配置

        // 下面是往建立KafkaProducer時建立的accumulator裡新增訊息記錄。
        // RecordAccumulator,private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;是存放訊息的變數。
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                                                                         serializedValue, headers, interceptCallback, remainingWaitMs);
        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
    
    // 下面是異常處理
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    }
    // ....省略一堆異常處理
}

3.Sender傳送訊息

接下來我們得看一下後臺執行緒 Sender 是怎麼從佇列 accumulator 裡面獲取訊息記錄,然後發往 Kafka 的。

/**
 * The main run loop for the sender thread
 * 迴圈執行
*/
public void run() {
    log.debug("Starting Kafka producer I/O thread.");

    // main loop, runs until close is called
    while (running) {
        try {
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }

    log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

    // okay we stopped accepting requests but there may still be
    // requests in the accumulator or waiting for acknowledgment,
    // wait until these are completed.
    // 我們已停止接受請求(呼叫了close()方法),但accumulator中可能仍有請求或等待確認,那麼就wait直到這些請求完成
    while (!forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0)) {
        try {
            run(time.milliseconds());
        } catch (Exception e) {
            log.error("Uncaught error in kafka producer I/O thread: ", e);
        }
    }
    // 當等待時長用完,要強制關閉時,我們要讓所有未完成的批處理失敗
    if (forceClose) {
        // We need to fail all the incomplete batches and wake up the threads waiting on
        // the futures.
        log.debug("Aborting incomplete batches due to forced shutdown");
        this.accumulator.abortIncompleteBatches();
    }
    try {
        // 關閉客戶端
        this.client.close();
    } catch (Exception e) {
        log.error("Failed to close network client", e);
    }

    log.debug("Shutdown of Kafka producer I/O thread has completed.");
}

4.sendProducerData方法

傳送資料前的準備

private long sendProducerData(long now) {
        Cluster cluster = metadata.fetch();

        // get the list of partitions with data ready to send
        // 獲取準備傳送資料的分割槽列表
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);

        // if there are any partitions whose leaders are not known yet, force metadata update   
        // 處理沒有可用的leader副本的問題,強制更新元資料
        if (!result.unknownLeaderTopics.isEmpty()) {
            // The set of topics with unknown leader contains topics with leader election pending as well as
            // topics which may have expired. Add the topic again to metadata to ensure it is included
            // and request metadata update, since there are messages to send to the topic.
            for (String topic : result.unknownLeaderTopics)
                this.metadata.add(topic);

            log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);

            this.metadata.requestUpdate();
        }

        // remove any nodes we aren't ready to send to
        // 移除還沒準備好傳送的節點
        Iterator<Node> iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;
        while (iter.hasNext()) {
            Node node = iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        // 建立請求
        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
                this.maxRequestSize, now);
        if (guaranteeMessageOrder) {
            // Mute all the partitions drained
            for (List<ProducerBatch> batchList : batches.values()) {
                for (ProducerBatch batch : batchList)
                    this.accumulator.mutePartition(batch.topicPartition);
            }
        }

        // ..... 省略其他處理
        
        // 真正傳送請求的方法
        sendProduceRequests(batches, now);

        return pollTimeout;
    }

5.sendProduceRequest方法

最後是使用 NetworkClient 傳送資料到 Kafka 的。

/**
     * Create a produce request from the given record batches
     */
    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (batches.isEmpty())
            return;

        Map<TopicPartition, MemoryRecords> produceRecordsByPartition = new HashMap<>(batches.size());
        final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());

        // find the minimum magic version used when creating the record sets
        byte minUsedMagic = apiVersions.maxUsableProduceMagic();
        for (ProducerBatch batch : batches) {
            if (batch.magic() < minUsedMagic)
                minUsedMagic = batch.magic();
        }

        for (ProducerBatch batch : batches) {
            TopicPartition tp = batch.topicPartition;
            MemoryRecords records = batch.records();

            // down convert if necessary to the minimum magic used. In general, there can be a delay between the time
            // that the producer starts building the batch and the time that we send the request, and we may have
            // chosen the message format based on out-dated metadata. In the worst case, we optimistically chose to use
            // the new message format, but found that the broker didn't support it, so we need to down-convert on the
            // client before sending. This is intended to handle edge cases around cluster upgrades where brokers may
            // not all support the same message format version. For example, if a partition migrates from a broker
            // which is supporting the new magic version to one which doesn't, then we will need to convert.
            if (!records.hasMatchingMagic(minUsedMagic))
                records = batch.records().downConvert(minUsedMagic, 0, time).records();
            produceRecordsByPartition.put(tp, records);
            recordsByPartition.put(tp, batch);
        }

        String transactionalId = null;
        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionalId = transactionManager.transactionalId();
        }
        ProduceRequest.Builder requestBuilder = ProduceRequest.Builder.forMagic(minUsedMagic, acks, timeout,
                produceRecordsByPartition, transactionalId);
        RequestCompletionHandler callback = new RequestCompletionHandler() {
            public void onComplete(ClientResponse response) {
                handleProduceResponse(response, recordsByPartition, time.milliseconds());
            }
        };

        String nodeId = Integer.toString(destination);
        ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
                requestTimeoutMs, callback);
        // 最後是利用NetworkClient傳送的。
        client.send(clientRequest, now);
        log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
    }

四、最後:

​ 如果不寫 Producer.close() ,確實可能會導致訊息的傳送失敗,而註釋中也提醒了我們一定要 close 掉生產者,避免資源洩漏。而這其中最主要的原因是 KafkaProducer 實現非同步傳送的邏輯。它是先將訊息存放到RecordAccumulator 佇列中,然後讓 KafkaThread 執行緒後臺不斷地從 RecordAccumulator 中讀取已準備好傳送的訊息,最後傳送到 Kafka 中。而我們的程式碼中,如果不寫 Producer.close(),就不會進行超時 wait,而當 main() 方法執行完後,KafkaThread 執行緒還沒來得及從 RecordAccumulator 佇列中獲取訊息也跟著被銷燬了,所以導致訊息最後還是沒傳送成功