1. 程式人生 > >初識 Kafka Producer 生產者

初識 Kafka Producer 生產者

目錄

  • 1、KafkaProducer 概述
  • 2、KafkaProducer 類圖
  • 3、KafkaProducer 簡單示例

溫馨提示:整個 Kafka Client 專欄基於 kafka-2.3.0 版本。

@(本節目錄)

1、KafkaProducer 概述

根據 KafkaProducer 類上的註釋上來看 KafkaProducer 具有如下特徵:

  • KafkaProducer 是執行緒安全的,可以被多個執行緒交叉使用。
  • KafkaProducer 內部包含一個快取池,存放待發送訊息,即 ProducerRecord 佇列,與此同時會開啟一個IO執行緒將 ProducerRecord 物件傳送到 Kafka 叢集。
  • KafkaProducer 的訊息傳送 API send 方法是非同步,只負責將待發送訊息 ProducerRecord 傳送到快取區中,立即返回,並返回一個結果憑證 Future。
  • acks
    KafkaProducer 提供了一個核心引數 acks 用來定義訊息“已提交”的條件(標準),就是 Broker 端向客戶端承偌已提交的條件,可選值如下:
    • 0
      表示生產者不關係該條訊息在 broker 端的處理結果,只要呼叫 KafkaProducer 的 send 方法返回後即認為成功,顯然這種方式是最不安全的,因為 Broker 端可能壓根都沒有收到該條訊息或儲存失敗。
    • all 或 -1
      表示訊息不僅需要 Leader 節點已儲存該訊息,並且要求其副本(準確的來說是 ISR 中的節點)全部儲存才認為已提交,才向客戶端返回提交成功。這是最嚴格的持久化保障,當然效能也最低。
    • 1
      表示訊息只需要寫入 Leader 節點後就可以向客戶端返回提交成功。
  • retries
    kafka 在生產端提供的另外一個核心屬性,用來控制訊息在傳送失敗後的重試次數,設定為 0 表示不重試,重試就有可能造成訊息在傳送端的重複。
  • batch.size
    kafka 訊息傳送者為每一個分割槽維護一個未傳送訊息積壓快取區,其記憶體大小由batch.size指定,預設為 16K。
    但如果快取區中不足100條,但傳送執行緒此時空閒,是需要等到快取區中積滿100條才能傳送還是可以立即傳送呢?預設是立即傳送,即 batch.size 的作用其實是客戶端一次傳送到broker的最大訊息數量。
  • linger.ms
    為了提高 kafka 訊息傳送的高吞吐量,即控制在快取區中未積滿 batch.size 時來控制 訊息傳送執行緒的行為,是立即傳送還是等待一定時間,如果linger.ms 設定為 0表示立即傳送,如果設定為大於0,則訊息傳送執行緒會等待這個值後才會向broker傳送。該引數者會增加響應時間,但有利於增加吞吐量。有點類似於 TCP 領域的 Nagle 演算法。
  • buffer.memory
    用於控制訊息傳送者快取的總記憶體大小,如果超過該值,往快取區中新增訊息會被阻塞,具體會在下文的訊息傳送流程中詳細介紹,阻塞的最大時間可通過引數 max.block.ms 設定,阻塞超過該值會丟擲超時異常。
  • key.serializer
    指定 key 的序列化處理器。
  • value.serializer
    指定 訊息體的序列化處理器。
  • enable.idempotence
    從 kafka0.11版本開始,支援訊息傳遞冪等,可以做到訊息只會被傳遞一次,通過 enable.idempotence 為 true 來開啟。如果該值設定為 true,其 retries 將設定為 Integer.MAX_VALUE,acks 將被設定為 all。為了確保訊息傳送冪等性,必須避免應用程式端的任何重試,並且如果訊息傳送API如果返回錯誤,應用端應該記錄最後成功傳送的訊息,避免訊息的重複傳送。

從Kafka 0.11開始,kafka 也支援事務訊息。

2、KafkaProducer 類圖


在 Kafka 中,生產者通過介面 Producer 定義,通過該介面的方法,我們基本可以得知 KafkaProducer 將具備如下基本能力:

  • void initTransactions()
    初始化事務,如果需要使用事務方法,該方法必須首先被呼叫。
  • void beginTransaction()
    開啟事務。
  • void sendOffsetsToTransaction(Map< TopicPartition, OffsetAndMetadata> offsets,String consumerGroupId)
    向消費組提交當前事務中的訊息偏移量,將在介紹 Kafka 事務相關文章中詳細介紹。
  • void commitTransaction()
    提交事務。
  • void abortTransaction()
    回滾事務。
  • Future< RecordMetadata> send(ProducerRecord<K, V> record)
    訊息傳送,該方法預設為非同步傳送,如果要實現同步傳送的效果,對返回結果呼叫 get 方法即可,該方法將在下篇文章中詳細介紹。
  • Future< RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
    訊息傳送,支援回撥。
  • void flush()
    忽略 linger.ms 的值,直接喚醒傳送執行緒,將緩衝區中的訊息全部發送到 broker。
  • List< PartitionInfo> partitionsFor(String topic)
    獲取 topic 的路由資訊(分割槽資訊)。
  • Map< MetricName, ? extends Metric> metrics()
    獲取由生產者收集的統計資訊。
  • void close()
    關閉傳送者。
  • void close(Duration timeout)
    定時關閉訊息傳送者。

上面的方法我們會根據需要在後續文章中進行詳細的介紹。接下來我們看一下 KafkaProducer 的核心屬性的含義。

  • String clientId
    客戶端ID。在建立 KafkaProducer 時可通過 client.id 定義 clientId,如果未指定,則預設 producer- seq,seq 在程序內遞增,強烈建議客戶端顯示指定 clientId。
  • Metrics metrics
    度量的相關儲存容器,例如訊息體大小、傳送耗時等與監控相關的指標。
  • Partitioner partitioner
    分割槽負載均衡演算法,通過引數 partitioner.class 指定。
  • int maxRequestSize
    呼叫 send 方法傳送的最大請求大小,包括 key、訊息體序列化後的訊息總大小不能超過該值。通過引數 max.request.size 來設定。
  • long totalMemorySize
    生產者快取所佔記憶體的總大小,通過引數 buffer.memory 設定。
  • Metadata metadata
    元資料資訊,例如 topic 的路由資訊,由 KafkaProducer 自動更新。
  • RecordAccumulator accumulator
    訊息記錄累積器,將在訊息傳送部分詳細介紹。
  • Sender sender
    用於封裝訊息傳送的邏輯,即向 broker 傳送訊息的處理邏輯。
  • Thread ioThread
    用於訊息傳送的後臺執行緒,一個獨立的執行緒,內部使用 Sender 來向 broker 傳送訊息。
  • CompressionType compressionType
    壓縮型別,預設不啟用壓縮,可通過引數 compression.type 配置。可選值:none、gzip、snappy、lz4、zstd。
  • Sensor errors
    錯誤資訊收集器,當成一個 metrics,用來做監控的。
  • Time time
    用於獲取系統時間或執行緒睡眠等。
  • Serializer< K> keySerializer
    用於對訊息的 key 進行序列化。
  • Serializer< V> valueSerializer
    對訊息體進行序列化。
  • ProducerConfig producerConfig
    生產者的配置資訊。
  • long maxBlockTimeMs
    最大阻塞時間,當生產者使用的快取已經達到規定值後,此時訊息傳送會阻塞,通過引數 max.block.ms 來設定最多等待多久。
  • ProducerInterceptors<K, V> interceptors
    生產者端的攔截器,在訊息傳送之前進行一些定製化處理。
  • ApiVersions apiVersions
    維護 api 版本的相關元資訊,該類只能在 kafka 內部使用。
  • TransactionManager transactionManager
    kafka 訊息事務管理器。
  • TransactionalRequestResult initTransactionsResult
    kafka 生產者事務上下文環境初始結果。

經過上面的梳理,詳細讀者朋友對 KafkaProducer 訊息生產者有了一個大概的認識,下一篇會重點介紹訊息傳送流程。接下來我們以一個簡單的示例結束本文的學習。

3、KafkaProducer 簡單示例

package persistent.prestige.demo.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
import java.util.concurrent.Future;
public class KafkaProducerTest {
    public static void main(String[] args){
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092,localhost:9082,localhost:9072,");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        try {
            for (int i = 0; i < 100; i++) {
                Future<RecordMetadata>  future = producer.send(new ProducerRecord<String, String>("TOPIC_ORDER", Integer.toString(i), Integer.toString(i)));
                RecordMetadata recordMetadata = future.get();
                System.out.printf("offset:" + recordMetadata.offset());
            }
        } catch (Throwable e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

本文就介紹到這裡,其主要的目的是瞭解Kafka 的 Producer,引出後續需要學習的內容,下一篇將重點講述 Kafka 訊息的傳送流程,敬請關注。

如果本文對大家有所幫助的話,麻煩幫忙點個贊,謝謝。


作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社群佈道師,公眾號:中介軟體興趣圈 維護者,目前已陸續發表原始碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等原始碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。