Kafka Producer 設定 Interceptor 來統計訊息
Kafka 訊息的 Producer 在呼叫producer.send()
方法傳送訊息時會先把訊息放到本地緩衝中,然後由 Kafka 網路執行緒從緩衝中提取訊息再送到 Kafka 代理上去。本地緩衝區大小由buffer.memory
來配置,預設為 32M(32 * 1024 * 1024L)。如果發訊息到網路慢於提交訊息到緩衝區的話,緩衝區就可能會滿就無法接受新的訊息,這時候就要依照block.on.buffer.full
設定是否暫停還是丟擲異常,預設為暫停producer.send()
;暫停時間由max.block.ms
決定,預設為 60 秒。producer.send()
返回一個Future<RecordMetadata>
, 也就是每次呼叫send()
方法在緩衝區滿後要等待 60 秒才能獲得結果(異常)。
這裡的關係是send()
--a-->緩衝區
--b-->傳送到 Kafka 代理
,自然要在a
與b
之間進行流量控制,如果b
太慢,緩衝區滿的話必須把a
放慢下來。如果能基於緩衝區已使用大小來放緩a
也是也行的,留待以後進行研究。本文提供另一種實現參考,為 Producer 配置一個Interceptor
能夠大致統計多少訊息提交到緩衝區,多少訊息從緩衝區取出。
Kafka 的所有配置項常量可以在這個頁面ofollow,noindex" target="_blank">https://kafka.apache.org/0100/javadoc/constant-values.html
找到。對interceptor.classes
的解釋是:可以為 Producer 配置一個或多個 Interceptor(需要實現 ProducerInterceptor)。另外 Consumer 也有自己的 Interceptor(實現 ConsumerInterceptor)。
ProducerInterceptor 有三個介面方法:
- void close(): Interceptor 關閉時呼叫,會在 Producer 關閉前被呼叫
- ProducerRecord<K,V> onSend(ProducerRecord<K, V> record): 由 KafkaProducer.send(ProducerRecord) 和 KafkaProducer.send(ProducerRecord, Callback) 呼叫,在序列化 key 和 value 和指定 partition(如果沒有指定) 之前呼叫,就是說在把訊息放到緩衝區之前呼叫。該方法可能再次對訊息進行修改。
- void onAcknowledgement(RecordMetadata metadata, Exception exception): 該方法在訊息從緩衝區提出來成功傳送到了網路,或傳送失敗後都被呼叫
- void configure(Map<String, ?> configs): 在建立 KafkaProducer 之前還有一次機會對屬性進行配置
現在用程式碼來演示來統計提交到緩衝區,傳送成功,傳送失敗的訊息記錄數
Producer 相關程式碼
public class Main { private static final Logger logger = LoggerFactory.getLogger(Main.class); public static void main(String[] args) { String topic = "test_topic"; Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "blog.yanbin.StatisticsProducerInterceptor"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 5; i++) { producer.send(new ProducerRecord<>(topic, String.valueOf(i), String.valueOf(i))); } producer.close(); logger.info(StatisticsProducerInterceptor.getRecordStatistics()); }
上面用ProducerConfig.INTERCEPTOR_CLASSES_CONFIG
指定了一個 Interceptor 的實現類 StatisticsProducerInterceptor
,它的程式碼如下
public class StatisticsProducerInterceptor implements ProducerInterceptor<String, String> { private static final Logger logger = LoggerFactory.getLogger(StatisticsProducerInterceptor.class); private static LongAdder submittedRecords = new LongAdder(); private static LongAdder deliveredRecords = new LongAdder(); private static LongAdder failedRecords = new LongAdder(); @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { ProducerRecord<String, String> updatedRecord = record.value().compareTo("3") < 0 ? record : new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), record.key(), record.value() + "+U"); //演示修改訊息 logger.info("record: {} to be sent, updated value from {} to {}", updatedRecord, record.value(), updatedRecord.value()); submittedRecords.increment(); //如果訊息最終無法被序列化,將不被放到緩衝區,並觸發 onAcknowledgement() 方法並帶有異常 return updatedRecord; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { if(exception == null) { deliveredRecords.increment(); logger.info("sent message: topic: {}, partition: {}, offset: {}, timestamp: {}, checksum: {}", metadata.topic(), metadata.partition(), metadata.offset(), metadata.timestamp(), metadata.checksum()); } else { failedRecords.increment(); logger.error("failed to send message: {}", metadata, exception); } logger.info(getRecordStatistics()); } @Override public void close() { logger.info("producer closed"); } @Override public void configure(Map<String, ?> configs) { logger.info("configuration: {}", configs); } public static String getRecordStatistics() { return String.format("record statistics, submitted: %s, delivered: %s, failed: %s", submittedRecords.longValue(), deliveredRecords.longValue(), failedRecords.longValue()); } }
執行後效果大概如下
2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 02018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 12018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=2, value=2, timestamp=null) to be sent, updated value from 2 to 22018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=3, value=3+U, timestamp=null) to be sent, updated value from 3 to 3+U2018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=4, value=4+U, timestamp=null) to be sent, updated value from 4 to 4+U2018-11-01 00:33:23 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.2018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6351, timestamp: 1541050403463, checksum: 14786124722018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 1, failed: 02018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6352, timestamp: 1541050403475, checksum: 41999077142018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 2, failed: 02018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6353, timestamp: 1541050403475, checksum: 38551312862018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 3, failed: 02018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6354, timestamp: 1541050403475, checksum: 15028228212018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 4, failed: 02018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 6355, timestamp: 1541050403475, checksum: 36733513582018-11-01 00:33:23 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 5, delivered: 5, failed: 02018-11-01 00:33:23 [main] INFO StatisticsProducerInterceptor - producer closed2018-11-01 00:33:23 [main] INFO Main - record statistics, submitted: 5, delivered: 5, failed: 0
從日誌中可以看到總共提交了 5 條訊息,成功傳送了 5 條訊息,失敗訊息數為 0。訊息能在onSend(..)
函式中被修改。而且看起來好像完成把全部訊息放到緩衝區後才開始傳送訊息,main 函式中數字改為 10,也差不多,onSend(..)
呼叫完 10 才開始真正傳送訊息到網路。但是注意到 onSend(..)
與onAcknowledgement(..)
是由不同的執行緒呼叫的,所以它們不該存在先後順序的。
若是不信,我們可以一次性發送 2000 條訊息,修改 main 函式的迴圈次數為 2000,執行後再檢視日誌,以下是片斷
2018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=0, value=0, timestamp=null) to be sent, updated value from 0 to 02018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1, value=1, timestamp=null) to be sent, updated value from 1 to 12018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=821, value=821+U, timestamp=null) to be sent, updated value from 820 to 820+U2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 821, delivered: 1, failed: 02018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 19357, timestamp: 1541052100757, checksum: 7914942352018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=855, value=855+U, timestamp=null) to be sent, updated value from 855 to 855+U2018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 855, delivered: 2, failed: 02018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1612, value=1612, timestamp=null) to be sent, updated value from 1612 to 16122018-11-01 01:01:40 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 1611, delivered: 242, failed: 02018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1614, value=1614, timestamp=null) to be sent, updated value from 1614 to 16142018-11-01 01:01:40 [main] INFO StatisticsProducerInterceptor - record: ProducerRecord(topic=test_topic, partition=null, key=1999, value=1999, timestamp=null) to be sent, updated value from 1999 to 19992018-11-01 01:01:40 [main] INFO KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.2018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 1999, failed: 02018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - sent message: topic: test_topic, partition: 0, offset: 21355, timestamp: 1541052100856, checksum: 24897475702018-11-01 01:01:41 [kafka-producer-network-thread | producer-1] INFO StatisticsProducerInterceptor - record statistics, submitted: 2000, delivered: 2000, failed: 02018-11-01 01:01:41 [main] INFO StatisticsProducerInterceptor - producer closed2018-11-01 01:01:41 [main] INFO Main - record statistics, submitted: 2000, delivered: 2000, failed: 0
從日誌來說明,傳送從緩衝區中取訊息傳送到網路上並不需要等待所有的訊息都放到緩衝區後再進行,它們是不同的兩個執行緒,但是從最絡來看待發送的訊息都成功傳送到了 Kafka 代理上。
使用 ProducerInterceptor 還是可以比較準確的統計到待發送訊息與成功送到網路的記錄數,如果訊息不能被序列化將直接帶異常的觸發onAcknowledgement(..)
方法,並統計為傳送失敗。這也是我們想要的結果。