1. 程式人生 > >Kafka原始碼分析及圖解原理之Producer端

Kafka原始碼分析及圖解原理之Producer端

一.前言

  任何訊息佇列都是萬變不離其宗都是3部分,訊息生產者(Producer)、訊息消費者(Consumer)和服務載體(在Kafka中用Broker指代)。那麼本篇主要講解Producer端,會有適當的圖解幫助理解底層原理。

 

一.開發應用

  首先介紹一下開發應用,如何構建一個KafkaProducer及使用,還有一些重要引數的簡介。

1.1 一個栗子

 1 /**
 2  * Kafka Producer Demo例項類。
 3  *
 4  * @author GrimMjx
 5  */
 6 public class ProducerDemo {
 7     public static void main(String[] args) throws ExecutionException, InterruptedException {
 8         Properties prop = new Properties();
 9         prop.put("client.id", "DemoProducer");
10 
11         // 以下三個引數必須指定
12         // 用於建立與Kafka broker伺服器的連線,叢集的話則用逗號分隔
13         prop.put("bootstrap.servers", "localhost:9092");
14         // 訊息的key序列化方式
15         prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
16         // 訊息的value序列化方式
17         prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
18 
19         // 以下引數為可配置選項
20         prop.put("acks", "-1");
21         prop.put("retries", "3");
22         prop.put("batch.size", "323840");
23         prop.put("linger.ms", "10");
24         prop.put("buffer.memory", "33554432");
25         prop.put("max.block.ms", "3000");
26 
27         KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
28         try {
29             // 非同步傳送,繼續傳送訊息不用等待。當有結果返回時,callback會被通知執行
30             producer.send(new ProducerRecord<String, String>("test", "key1", "value1"),
31                     new Callback() {
32                         // 返回結果RecordMetadata記錄元資料包括了which partition的which offset
33                         public void onCompletion(RecordMetadata metadata, Exception e) {
34                             // 傳送成功
35                             if (e == null) {
36                                 System.out.println("The offset of the record we just sent is: " + metadata.offset());
37 
38                                 // 傳送失敗
39                             } else {
40                                 if (e instanceof RetriableException) {
41                                     // 處理可重試的異常,比如分割槽leader副本不可用
42                                     // 一般用retries引數來設定重置,畢竟這裡也沒有什麼其他能做的,也是同樣的重試傳送訊息
43                                 } else {
44                                     // 處理不可重試異常
45                                 }
46                             }
47                         }
48                     }
49             );
50 
51             // 同步傳送,send方法返回Future,然後get。在沒有返回結果一直阻塞
52             producer.send(new ProducerRecord<String, String>("test", "key1", "value1")).get();
53 
54         } finally {
55             // producer執行的時候佔用系統額外資源,最後一定要關閉
56             producer.close();
57         }
58     }
59 }

  註釋已經寫得十分詳細了,引數的下面會說,這裡就只說一下非同步傳送和同步傳送。我們先看下KafkaProducer.send方法,可以看到返回的是一個Future,那麼如何實現同步阻塞和非同步非阻塞呢?

  • 同步阻塞:send方法返回Future,然後get。在沒有返回結果一直阻塞,無限等待
  • 非同步非阻塞:send方法提供callback,呼叫send方法後可以繼續傳送訊息不用等待。當有結果返回時,callback會被通知執行

1.2 重要引數

  這裡分析一下broker端的重要引數,前3個是必要引數。Kafka的文件真的很吊,可以看這個類,每個引數和註釋都解釋的十分詳細:org.apache.kafka.clients.producer.ProducerConfig

  • bootstrap.server(必要):broker伺服器列表,如果叢集的機器很多,不用全配,producer可以發現叢集中所有broker
  • key.serializer/value.serializer(必要):key和value的序列化方式。這兩個引數都必須是全限定類名,可以自定義拓展。
  • acks:有3個值,0、1和all(-1)
    • 0:produce不關心broker端的處理結果,吞吐量最高
    • 1:produce傳送訊息給leader broker端,broker端寫入本地日誌返回結果,折中方案
    • all(-1):配合min.insync.replicas使用,控制寫入isr中的多少副本才算成功
      • 思考:如果當前叢集中ISR副本小於min.insync.replicas會發生什麼,消費者還能正常消費嗎?stack overflow地址:https://stackoverflow.com/questions/57231185/does-min-insync-replicas-property-effects-consumers-in-kafka
  • buffer.memory:producer啟動會建立一個記憶體緩衝區儲存待發送的訊息,這部分的記憶體大小就是這個引數來控制的
  • commpression.type:壓縮演算法的選擇,目前有GZIP、Snappy和LZ4。目前結合LZ4效能最好
  • retries:重試次數,0.11.0.0版本之前可能導致訊息重發
  • batch.size:相同分割槽多條訊息集合叫batch,當batch滿了則傳送給broker
  • linger.ms:難道batch沒滿就不發了麼?當然不是,不滿則等linger.ms時間再發。延時權衡行為
  • max.request.size:控制傳送請求的大小
  • request.timeout.ms:超過時間則會在回撥函式丟擲TimeoutException異常
  • partitioner.class:分割槽機制,可自定義,預設分割槽器的處理是:有key則用murmur2演算法計算key的雜湊值,對總分割槽取模算出分割槽號,無key則輪詢
  • enable.idempotence:Apache Kafka 0.11.0.0版本用於實現EOS的利器

二.原始碼分析及圖解原理

2.1 RecordAccumulator

  上面介紹的引數中buffer.memory是緩衝區的大小,RecordAccmulator就是承擔了緩衝區的角色。預設是32MB。

  還有上面介紹的引數中batch.size提到了batch的概念,在kafka producer中,訊息不是一條一條發給broker的,而是多條訊息組成一個ProducerBatch,然後由Sender一次性發出去,這裡的batch.size並不是訊息的條數(湊滿多少條即傳送),而是一個大小。預設是16KB,可以根據具體情況來進行優化。

  在RecordAccumulator中,最核心的引數就是:

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

  它是一個ConcurrentMap,key是TopicPartition類,代表一個topic的一個partition。value是一個包含ProducerBatch的雙端佇列。等待Sender執行緒傳送給broker。畫張圖來看下:

  再從原始碼角度來看如何新增到緩衝區佇列裡的,主要看這個方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#append:

  註釋寫的十分詳細了,這裡需要思考一點,為什麼分配記憶體的程式碼沒有放在synchronized同步塊裡?看起來這裡很多餘,導致下面的synchronized同步塊中還要tryAppend一下,因為這時候可能其他執行緒已經建立好RecordBatch了。造成多餘的記憶體申請。但是仔細想想,如果把分配記憶體放在synchronized同步塊會有什麼問題?

  記憶體申請不到執行緒會一直等待,如果放在同步塊中會造成一直不釋放Deque佇列的鎖,那其他執行緒將無法對Deque佇列進行執行緒安全的同步操作。那不是走遠了?

 1 /**
 2  * Add a record to the accumulator, return the append result
 3  * <p>
 4  * The append result will contain the future metadata, and flag for whether the appended batch is full or a new batch is created
 5  * <p>
 6  *
 7  * @param tp The topic/partition to which this record is being sent
 8  * @param timestamp The timestamp of the record
 9  * @param key The key for the record
10  * @param value The value for the record
11  * @param headers the Headers for the record
12  * @param callback The user-supplied callback to execute when the request is complete
13  * @param maxTimeToBlock The maximum time in milliseconds to block for buffer memory to be available
14  */
15 public RecordAppendResult append(TopicPartition tp,
16                                  long timestamp,
17                                  byte[] key,
18                                  byte[] value,
19                                  Header[] headers,
20                                  Callback callback,
21                                  long maxTimeToBlock) throws InterruptedException {
22     // We keep track of the number of appending thread to make sure we do not miss batches in
23     // abortIncompleteBatches().
24     appendsInProgress.incrementAndGet();
25     ByteBuffer buffer = null;
26     if (headers == null) headers = Record.EMPTY_HEADERS;
27     try {
28         // check if we have an in-progress batch
29         // 其實就是一個putIfAbsent操作的方法,不展開分析
30         Deque<ProducerBatch> dq = getOrCreateDeque(tp);
31         // batches是執行緒安全的,但是Deque不是執行緒安全的
32         // 已有在處理中的batch
33         synchronized (dq) {
34             if (closed)
35                 throw new IllegalStateException("Cannot send after the producer is closed.");
36             RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
37             if (appendResult != null)
38                 return appendResult;
39         }
40 
41         // we don't have an in-progress record batch try to allocate a new batch
42         // 建立一個新的ProducerBatch
43         byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
44         // 分配一個記憶體
45         int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
46         log.trace("Allocating a new {} byte message buffer for topic {} partition {}", size, tp.topic(), tp.partition());
47         // 申請不到記憶體
48         buffer = free.allocate(size, maxTimeToBlock);
49         synchronized (dq) {
50             // Need to check if producer is closed again after grabbing the dequeue lock.
51             if (closed)
52                 throw new IllegalStateException("Cannot send after the producer is closed.");
53 
54             // 再次嘗試新增,因為分配記憶體的那段程式碼並不在synchronized塊中
55             // 有可能這時候其他執行緒已經建立好RecordBatch了,finally會把分配好的記憶體還回去
56             RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq);
57             if (appendResult != null) {
58                 // 作者自己都說了,希望不要總是發生,多個執行緒都去申請記憶體,到時候還不是要還回去?
59                 // Somebody else found us a batch, return the one we waited for! Hopefully this doesn't happen often...
60                 return appendResult;
61             }
62 
63             // 建立ProducerBatch
64             MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
65             ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
66             FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));
67 
68             dq.addLast(batch);
69             // incomplete是一個Set集合,存放不完整的batch
70             incomplete.add(batch);
71 
72             // Don't deallocate this buffer in the finally block as it's being used in the record batch
73             buffer = null;
74 
75             // 返回記錄新增結果類
76             return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true);
77         }
78     } finally {
79         // 釋放要還的記憶體
80         if (buffer != null)
81             free.deallocate(buffer);
82         appendsInProgress.decrementAndGet();
83     }
84 }

  附加tryAppend()方法,不多說,都在程式碼註釋裡:

 1 /**
 2  *  Try to append to a ProducerBatch.
 3  *
 4  *  If it is full, we return null and a new batch is created. We also close the batch for record appends to free up
 5  *  resources like compression buffers. The batch will be fully closed (ie. the record batch headers will be written
 6  *  and memory records built) in one of the following cases (whichever comes first): right before send,
 7  *  if it is expired, or when the producer is closed.
 8  */
 9 private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, Callback callback, Deque<ProducerBatch> deque) {
10     // 獲取最新加入的ProducerBatch
11     ProducerBatch last = deque.peekLast();
12     if (last != null) {
13         FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
14         if (future == null)
15             last.closeForRecordAppends();
16         else
17             // 記錄新增結果類包含future、batch是否已滿的標記、是否是新batch建立的標記
18             return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
19     }
20     // 如果這個Deque沒有ProducerBatch元素,或者已經滿了不足以加入本條訊息則返回null
21     return null;
22 }

   以上程式碼見圖解:

 

2.2 Sender

  Sender裡最重要的方法莫過於run()方法,其中比較核心的方法是org.apache.kafka.clients.producer.internals.Sender#sendProducerData

  其中pollTimeout需要認真讀註釋,意思是最長阻塞到至少有一個通道在你註冊的事件就緒了。返回0則表示走起發車了

 1 private long sendProducerData(long now) {
 2     // 獲取當前叢集的所有資訊
 3     Cluster cluster = metadata.fetch();
 4     // get the list of partitions with data ready to send
 5     // @return ReadyCheckResult類的三個變數解釋
 6     // 1.Set<Node> readyNodes 準備好傳送的節點
 7     // 2.long nextReadyCheckDelayMs 下次檢查節點的延遲時間
 8     // 3.Set<String> unknownLeaderTopics 哪些topic找不到leader節點
 9     RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
10     // if there are any partitions whose leaders are not known yet, force metadata update
11     // 如果有些topic不知道leader資訊,更新metadata
12     if (!result.unknownLeaderTopics.isEmpty()) {
13         // The set of topics with unknown leader contains topics with leader election pending as well as
14         // topics which may have expired. Add the topic again to metadata to ensure it is included
15         // and request metadata update, since there are messages to send to the topic.
16         for (String topic : result.unknownLeaderTopics)
17             this.metadata.add(topic);
18         this.metadata.requestUpdate();
19     }
20 
21     // 去除不能傳送資訊的節點
22     // remove any nodes we aren't ready to send to
23     Iterator<Node> iter = result.readyNodes.iterator();
24     long notReadyTimeout = Long.MAX_VALUE;
25     while (iter.hasNext()) {
26         Node node = iter.next();
27         if (!this.client.ready(node, now)) {
28             iter.remove();
29             notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now));
30         }
31     }
32 
33     // 獲取將要傳送的訊息
34     // create produce requests
35     Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
36             this.maxRequestSize, now);
37 
38     // 保證傳送訊息的順序
39     if (guaranteeMessageOrder) {
40         // Mute all the partitions drained
41         for (List<ProducerBatch> batchList : batches.values()) {
42             for (ProducerBatch batch : batchList)
43                 this.accumulator.mutePartition(batch.topicPartition);
44         }
45     }
46 
47     // 過期的batch
48     List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeout, now);
49     boolean needsTransactionStateReset = false;
50     // Reset the producer id if an expired batch has previously been sent to the broker. Also update the metrics
51     // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
52     // we need to reset the producer id here.
53     if (!expiredBatches.isEmpty())
54         log.trace("Expired {} batches in accumulator", expiredBatches.size());
55     for (ProducerBatch expiredBatch : expiredBatches) {
56         failBatch(expiredBatch, -1, NO_TIMESTAMP, expiredBatch.timeoutException());
57         if (transactionManager != null && expiredBatch.inRetry()) {
58             needsTransactionStateReset = true;
59         }
60         this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
61     }
62     if (needsTransactionStateReset) {
63         transactionManager.resetProducerId();
64         return 0;
65     }
66     sensors.updateProduceRequestMetrics(batches);
67     // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately
68     // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data
69     // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes
70     // with sendable data that aren't ready to send since they would cause busy looping.
71     // 到底返回的這個pollTimeout是啥,我覺得用英文的註釋解釋比較清楚
72     // 1.The amount of time to block if there is nothing to do
73     // 2.waiting for a channel to become ready; if zero, block indefinitely;
74     long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
75     if (!result.readyNodes.isEmpty()) {
76         log.trace("Nodes with data ready to send: {}", result.readyNodes);
77         // if some partitions are already ready to be sent, the select time would be 0;
78         // otherwise if some partition already has some data accumulated but not ready yet,
79         // the select time will be the time difference between now and its linger expiry time;
80         // otherwise the select time will be the time difference between now and the metadata expiry time;
81         pollTimeout = 0;
82     }
83 
84     // 傳送訊息
85     // 最後呼叫client.send()
86     sendProduceRequests(batches, now);
87     return pollTimeout;
88 }

  其中也需要了解這個方法:org.apache.kafka.clients.producer.internals.RecordAccumulator#ready。返回的類中3個關鍵引數的解釋都在註釋裡。煩請看註釋,我解釋不好的地方可以看英文,原汁原味最好:

 1 /**
 2  * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable
 3  * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated
 4  * partition batches.
 5  * <p>
 6  * A destination node is ready to send data if:
 7  * <ol>
 8  * <li>There is at least one partition that is not backing off its send
 9  * <li><b>and</b> those partitions are not muted (to prevent reordering if
10  *   {@value org.apache.kafka.clients.producer.ProducerConfig#MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION}
11  *   is set to one)</li>
12  * <li><b>and <i>any</i></b> of the following are true</li>
13  * <ul>
14  *     <li>The record set is full</li>
15  *     <li>The record set has sat in the accumulator for at least lingerMs milliseconds</li>
16  *     <li>The accumulator is out of memory and threads are blocking waiting for data (in this case all partitions
17  *     are immediately considered ready).</li>
18  *     <li>The accumulator has been closed</li>
19  * </ul>
20  * </ol>
21  */
22 /**
23  * @return ReadyCheckResult類的三個變數解釋
24  * 1.Set<Node> readyNodes 準備好傳送的節點
25  * 2.long nextReadyCheckDelayMs 下次檢查節點的延遲時間
26  * 3.Set<String> unknownLeaderTopics 哪些topic找不到leader節點
27  *
28  * 一個節點滿足以下任一條件則表示可以傳送資料 
29  * 1.batch滿了
30  * 2.batch沒滿,但是等了lingerMs的時間
31  * 3.accumulator滿了
32  * 4.accumulator關了
33  */
34 public ReadyCheckResult ready(Cluster cluster, long nowMs) {
35     Set<Node> readyNodes = new HashSet<>();
36     long nextReadyCheckDelayMs = Long.MAX_VALUE;
37     Set<String> unknownLeaderTopics = new HashSet<>();
38     boolean exhausted = this.free.queued() > 0;
39     for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
40         TopicPartition part = entry.getKey();
41         Deque<ProducerBatch> deque = entry.getValue();
42         Node leader = cluster.leaderFor(part);
43         synchronized (deque) {
44             // leader沒有且佇列非空則新增unknownLeaderTopics
45             if (leader == null && !deque.isEmpty()) {
46                 // This is a partition for which leader is not known, but messages are available to send.
47                 // Note that entries are currently not removed from batches when deque is empty.
48                 unknownLeaderTopics.add(part.topic());
49 
50                 // 如果readyNodes不包含leader且muted不包含part
51                 // mute這個變數跟producer端的一個配置有關係:max.in.flight.requests.per.connection=1
52                 // 主要防止topic同分區下的訊息亂序問題,限制了producer在單個broker連線上能夠傳送的未響應請求的數量
53                 // 如果設定為1,則producer在收到響應之前無法再給該broker傳送該topic的PRODUCE請求
54             } else if (!readyNodes.contains(leader) && !muted.contains(part)) {
55                 ProducerBatch batch = deque.peekFirst();
56                 if (batch != null) {
57                     long waitedTimeMs = batch.waitedTimeMs(nowMs);
58                     boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
59                     // 等待時間
60                     long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
61                     // batch滿了
62                     boolean full = deque.size() > 1 || batch.isFull();
63                     // batch過期
64                     boolean expired = waitedTimeMs >= timeToWaitMs;
65                     boolean sendable = full || expired || exhausted || closed || flushInProgress();
66                     if (sendable && !backingOff) {
67                         readyNodes.add(leader);
68                     } else {
69                         long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
70                         // Note that this results in a conservative estimate since an un-sendable partition may have
71                         // a leader that will later be found to have sendable data. However, this is good enough
72                         // since we'll just wake up and then sleep again for the remaining time.
73                         // 目前還沒有leader,下次重試
74                         nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
75                     }
76                 }
77             }
78         }
79     }
80     return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
81 }

  還有一個方法就是org.apache.kafka.clients.producer.internals.RecordAccumulator#drain,從accumulator緩衝區獲取要傳送的資料,最大一次性發max.request.size大小的資料(最上面的配置引數裡有):

  1 /**
  2  * Drain all the data for the given nodes and collate them into a list of batches that will fit within the specified
  3  * size on a per-node basis. This method attempts to avoid choosing the same topic-node over and over.
  4  *
  5  * @param cluster The current cluster metadata
  6  * @param nodes The list of node to drain
  7  * @param maxSize The maximum number of bytes to drain
  8  * maxSize也就是producer端配置引數max.request.size來控制的,一次最多發多少
  9  * @param now The current unix time in milliseconds
 10  * @return A list of {@link ProducerBatch} for each node specified with total size less than the requested maxSize.
 11  */
 12 public Map<Integer, List<ProducerBatch>> drain(Cluster cluster, Set<Node> nodes, int maxSize, long now) {
 13     if (nodes.isEmpty())
 14         return Collections.emptyMap();
 15     Map<Integer, List<ProducerBatch>> batches = new HashMap<>();
 16     for (Node node : nodes) {
 17         // for迴圈獲取要發的batch
 18         List<ProducerBatch> ready = drainBatchesForOneNode(cluster, node, maxSize, now);
 19         batches.put(node.id(), ready);
 20     }
 21     return batches;
 22 }
 23 
 24 private List<ProducerBatch> drainBatchesForOneNode(Cluster cluster, Node node, int maxSize, long now) {
 25     int size = 0;
 26     // 獲取node的partition
 27     List<PartitionInfo> parts = cluster.partitionsForNode(node.id());
 28     List<ProducerBatch> ready = new ArrayList<>();
 29     /* to make starvation less likely this loop doesn't start at 0 */
 30     // 避免每次都從一個partition取,要雨露均沾
 31     int start = drainIndex = drainIndex % parts.size();
 32     do {
 33         PartitionInfo part = parts.get(drainIndex);
 34         TopicPartition tp = new TopicPartition(part.topic(), part.partition());
 35         this.drainIndex = (this.drainIndex + 1) % parts.size();
 36 
 37         // Only proceed if the partition has no in-flight batches.
 38         if (isMuted(tp, now))
 39             continue;
 40 
 41         Deque<ProducerBatch> deque = getDeque(tp);
 42         if (deque == null)
 43             continue;
 44 
 45         // 加鎖,不用說了吧
 46         synchronized (deque) {
 47             // invariant: !isMuted(tp,now) && deque != null
 48             ProducerBatch first = deque.peekFirst();
 49             if (first == null)
 50                 continue;
 51 
 52             // first != null
 53             // 檢視是否在backoff期間
 54             boolean backoff = first.attempts() > 0 && first.waitedTimeMs(now) < retryBackoffMs;
 55             // Only drain the batch if it is not during backoff period.
 56             if (backoff)
 57                 continue;
 58 
 59             // 超過maxSize且ready裡有東西
 60             if (size + first.estimatedSizeInBytes() > maxSize && !ready.isEmpty()) {
 61                 // there is a rare case that a single batch size is larger than the request size due to
 62                 // compression; in this case we will still eventually send this batch in a single request
 63                 // 有一種特殊的情況,batch的大小超過了maxSize,且batch是空的。也就是一個batch大小直接大於一次傳送的maxSize
 64                 // 這種情況下最終還是會發送這個batch在一次請求
 65                 break;
 66             } else {
 67                 if (shouldStopDrainBatchesForPartition(first, tp))
 68                     break;
 69 
 70                 // 這塊配置下面會講
 71                 boolean isTransactional = transactionManager != null ? transactionManager.isTransactional() : false;
 72                 ProducerIdAndEpoch producerIdAndEpoch =
 73                     transactionManager != null ? transactionManager.producerIdAndEpoch() : null;
 74                 ProducerBatch batch = deque.pollFirst();
 75                 if (producerIdAndEpoch != null && !batch.hasSequence()) {
 76                     // If the batch already has an assigned sequence, then we should not change the producer id and
 77                     // sequence number, since this may introduce duplicates. In particular, the previous attempt
 78                     // may actually have been accepted, and if we change the producer id and sequence here, this
 79                     // attempt will also be accepted, causing a duplicate.
 80                     //
 81                     // Additionally, we update the next sequence number bound for the partition, and also have
 82                     // the transaction manager track the batch so as to ensure that sequence ordering is maintained
 83                     // even if we receive out of order responses.
 84                     batch.setProducerState(producerIdAndEpoch, transactionManager.sequenceNumber(batch.topicPartition), isTransactional);
 85                     transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
 86                     log.debug("Assigned producerId {} and producerEpoch {} to batch with base sequence " +
 87                             "{} being sent to partition {}", producerIdAndEpoch.producerId,
 88                         producerIdAndEpoch.epoch, batch.baseSequence(), tp);
 89 
 90                     transactionManager.addInFlightBatch(batch);
 91                 }
 92                 // 新增batch,並且close
 93                 batch.close();
 94                 size += batch.records().sizeInBytes();
 95                 ready.add(batch);
 96 
 97                 batch.drained(now);
 98             }
 99         }
100     } while (start != drainIndex);
101     return ready;
102 }

三.冪等性producer

  上面說到一個引數,enable.idempotence。0.11.0.0版本引入的冪等性producer表示它的傳送操作是冪等的,也就是說,不會存在各種錯誤導致的重複訊息。(比如說瞬時的傳送錯誤可能導致producer端出現重試,同一個訊息可能傳送多次)

  producer傳送到broker端的每批訊息都會有一個序列號(用於去重),Kakfa會把這個序列號存在底層日誌,儲存序列號只需要幾個位元組,開銷很小。producer端會分配一個PID,對於PID、分割槽和序列號的關係,可以想象成一個雜湊表,key就是(PID,分割槽),value就是序列號。比如第一次給broker傳送((PID=1,分割槽=1),序列號=2),第二次傳送的value比2小或者等於2,則broker會拒絕PRODUCE請求,實現去重。

  這個只能保證單個producer例項的EOS