1. 程式人生 > >Kafka: 0.10 Producer 新增timestamp 以及使用配置

Kafka: 0.10 Producer 新增timestamp 以及使用配置

本文目錄結構:

  1. Producer API入門

  2. 非同步傳送流程

  3. Producer設計說明

  4. Producer Configuration

1. Producer API入門:

         KafkaProducer是一個傳送record到Kafka Cluster的客戶端API。這個類執行緒安全的。在應用程式中,通常的作法是:所有發往一個Kafka Cluster的執行緒使用同一個Producer物件.。如果你的程式要給多個Cluster傳送訊息,則需要使用多個Producer。

ProducerRecord說明
      從上面程式碼裡可以看出,代表要傳送的訊息記錄類是ProducerRecord:

一條record通常包括5個欄位:
·topic:指定該record發往哪個topic下。[Required]
·partition:指定該record發到哪個partition中。[Optional]
·key:一個key。[Optional]
·value:記錄人內容。[Required]
·timestamp:時間戳。[Optional]

預設情況下:
       如果使用者指定了partition,那麼就發往使用者指定的partition。如果使用者沒有指定partition,那麼就會根據key來決定放到哪個partition,如果key也沒有指定,則由producer隨機選取一個partition。
在Producer端,如果使用者指定了timestamp,則record使用使用者指定的時間,如果使用者沒有指定,則會使用producer端的當前時間。在broker端,如果配置了時間戳採用createtime方式,則使用producer傳給Broker的record中的timestramp時間,如果指定為logappendtime,則在broker寫入到Log檔案時會重寫該時間。

2. 非同步傳送流程

2.1、使用者執行緒呼叫send方法將record放到BufferPool中

       可能在之前的kafka-client版本中,還支援同步方式傳送訊息記錄。不過在我看的版本(0.10.0.0)中,已經不再支援同步方式傳送了。當用戶使用KafkaProducer#send()傳送record時,執行流程是:

1、由interceptor chain對ProducerRecord做傳送前的處理
        攔截器介面是:ProducerInterceport,使用者可以自定義自己的攔截器實現。

       該攔截器鏈,在Producer物件初始化時初始化,之後不會再變了。所以呢,攔截器鏈中的攔截器都是公用的,如果要自定義攔截器的話,這個是需要注意的。
        ProducerInterceptor有兩個方法:
·onSend: KafkaProducer#send 呼叫時就會執行此方法。
·onAcknowledgement:傳送失敗,或者傳送成功(broker 通知producer代表傳送成功)時都會呼叫該方法。

此階段執行的就是onSend方法。

2、阻塞方式獲取到broker cluster 上broker cluster的資訊
    採用RPC方式獲取到的broker資訊,由一個MetaData類封裝。它包括了broker cluster的必要資訊,譬如有:所有的broker資訊(idhostport等)、所有的topic名稱、每一個topic對於的partition情況(id、leader node、replica nodes、ISR nodes等)。
雖然該過程是阻塞的,但並不是每傳送一個record都會通過RPC方式來獲取的。Metadata會在Producer端快取,只有在record中指定的topic不存在時、或者MetaData輪詢週期到時才會執行。

3、對record中key、value進行序列化
      這個沒有什麼可說的。內建了基於String、Integer、Long、Double、Bytes、ByteBuffer、ByteArray的序列化工具。

4、為record設定partition屬性
       前面說過,建立ProducerRecord時,partition是Optional的。所以如果使用者建立record時,沒有指定partition屬性。則由partition計算工具(Partitioner 介面)來計算出partition。這個計算方式可以自定義。Kafka Producer 提供了內建的實現:
·如果提供了Key值,會根據key序列化後的位元組陣列的hashcode進行取模運算。
·如果沒有提供key,則採用迭代方式(其實取到的值並非完美的迭代,而是類似於隨機數)。

5、校驗record的長度是否超出閾值
MAX_REQUEST_SIZE_CONFIG=”max.request.size”
BUFFER_MEMORY_CONFIG=”buffer.memory”
超出任何一項就會丟擲異常。

6、為record設定timestamp
      如果使用者建立ProducerRecord時沒有指定timestamp,此處為止設定為producer的當前時間。
其實在java client中,設計了一個Time介面,專門用於設定這個時間的。內建了一個實現SystemTime,這裡將record timestamp設定為當前時間,就是由SystemTime來完成的。所以如果希望在kafka producer java client中使用其它的時間,可以自定義Time的實現。

7、將該record壓縮後放到BufferPool中
       這一步是由RecordAccumulator來完成的。RecordAccumulator中為每一個topic維護了一個雙端佇列Deque<RecordBatch>,佇列中的元素是RecordBatch(RecordBatch則由多個record壓縮而成)。RecordAccumulator要做的就是將record壓縮後放到與之topic關聯的那個Deque的最後面。

關於record的壓縮方式,kafka producer在支援了幾種方式:
·NONE:就是不壓縮。
·GZIP:壓縮率為50%
·SNAPPY:壓縮率為50%
·LZ4:壓縮率為50%

       在將record放到Deque中最後一個RecordBatch中的過程如下:如果最後一個recordbatch可以放的下就放,放不下就新建一個RecordBatch。
       RecordBatch實際上是儲存於BufferPool中,所以這個過程實際上是把record放在BufferPool中。在建立BufferPool之初,會指定BufferPool的總大小,BufferPool中每一個RecordBatch的大小等等配置。

8、喚醒傳送模組
       執行到上一步時,KafkaProducer#sender的處理基本算是完畢。這個一步的目的就是喚醒NIO Selector。

       此外,在上述步驟2~8,不論哪一步出現問題,都會丟擲異常。而丟擲異常時,就會被KafkaProducer捕獲到,然後交由Sensor(感測器)進行處理。而Sensor通常會呼叫第1步中提到的interceptor chain 執行onAcknowledgement告知使用者。

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {

        // intercept the record, which can be potentially modified; this method does not throw exceptions

        ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? record : this.interceptors.onSend(record);

        return doSend(interceptedRecord, callback);

    }

 

    /**

     * Implementation of asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.

     * See {@link #send(ProducerRecord, Callback)} for details.

     */

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

        TopicPartition tp = null;

        try {

            // first make sure the metadata for the topic is available

            long waitedOnMetadataMs = waitOnMetadata(record.topic(), this.maxBlockTimeMs);

            long remainingWaitMs = Math.max(0, this.maxBlockTimeMs - waitedOnMetadataMs);

            byte[] serializedKey;

            try {

                serializedKey = keySerializer.serialize(record.topic(), record.key());

            } catch (ClassCastException cce) {

                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +

                        " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +

                        " specified in key.serializer");

            }

            byte[] serializedValue;

            try {

                serializedValue = valueSerializer.serialize(record.topic(), record.value());

            } catch (ClassCastException cce) {

                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +

                        " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +

                        " specified in value.serializer");

            }

            int partition = partition(record, serializedKey, serializedValue, metadata.fetch());

            int serializedSize = Records.LOG_OVERHEAD + Record.recordSize(serializedKey, serializedValue);

            ensureValidRecordSize(serializedSize);

            tp = new TopicPartition(record.topic(), partition);

            long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();

            log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);

            // producer callback will make sure to call both 'callback' and interceptor callback

            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);

            RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs);

            if (result.batchIsFull || result.newBatchCreated) {

                log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);

                this.sender.wakeup();

            }

            return result.future;

            // handling exceptions and record the errors;

            // for API exceptions return them in the future,

            // for other exceptions throw directly

        } catch (ApiException e) {

            log.debug("Exception occurred during message send:", e);

            if (callback != null)

                callback.onCompletion(null, e);

            this.errors.record();

            if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);

            return new FutureFailure(e);

        } catch (InterruptedException e) {

            this.errors.record();

            if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);

            throw new InterruptException(e);

        } catch (BufferExhaustedException e) {

            this.errors.record();

            this.metrics.sensor("buffer-exhausted-records").record();

            if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);

            throw e;

        } catch (KafkaException e) {

            this.errors.record();

            if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);

            throw e;

        } catch (Exception e) {

            // we notify interceptor about all exceptions, since onSend is called before anything else in this method

            if (this.interceptors != nullthis.interceptors.onSendError(record, tp, e);

            throw e;

        }

    }

2.2、傳送排程

      KafkaProducer#sender只是將record放到BufferPool中,並沒有將record發出去,而傳送排程,則是由另外一個執行緒(Sender)來完成的。
Sender的執行過程如下:
1、 取出就緒的record
      這一步是檢查要傳送的record是否就緒:根據KafkaProducer維護的Metadata檢查要每一個record要發往的Leader node是否存在。如果有不存在的,就設定為需要更新,並且這樣的record認為還未就緒。以保證可以發到相關partition的leader node。

2、 取出RecordBatch,並過濾掉過期的RecordBatch
       對於過期的RecordBatch,會通過Sensor通知Interceptor傳送失敗。

3、為要傳送的RecordBatch建立請求
       一個RecordBatch一個ClientRequest。

4、保留請求併發送
       把請求物件保留到一個inFlightRequest 集合中。這個集合中存放的是正在傳送的請求,是一個topic到Deque的Map。當傳送成功,或者失敗都會移除。

5、處理髮送結果
      如果傳送失敗,會嘗試retry。並由Sensor排程Interceptor。
      如果傳送成功,會由Sensor排程Interceptor。

3. Producer實現說明

       從上述處理流程中,可以看到在java client中的一些設計:
1、Interceptor Chain:可以做為用於自定義外掛的介面。
2、MetaData:producer 不按需以及定期的傳送請求獲取最新的Cluster狀態資訊。Producer根據這個資訊可以直接將record batch傳送到相關partition的Leader中。也就是在客戶端完成Load balance。
3、Partitioner:分割槽選擇工具,選擇傳送到哪些分割槽,結合Metadata,完成Load balance。
4、RecordBatch:在客戶端對record壓縮排RecordBatch,然後一個RecordBatch發一次。這樣可以減少IO操作的次數,提高效能。
5、非同步方式傳送:提高使用者應用效能。

4. Producer Configuration

       在文章開始的地方說明了,使用Kafka Producer Java Client時,只需要建立一個KafkaProducer就可以了。而它在執行過程中,會使用到很多配置項,這些配置項都是在KafkaProducer初始化時完成的。
下面就來看看java client中要求的配置項:

·bootstrap.servers
       用於配置cluster中borker的host/port對。可以配置一項或者多項,不需要將cluster中所有例項都配置上。因為它後自動發現所有的broker。
        如果要配置多項,格式是:host1:port1,host2:port2,host3:port3….

·key.serializer、value.serializer
配置序列化類名。指定 的這些類都要實現Serializer介面。

·acks
       為了確保message record被broker成功接收。Kafka Producer會要求Borker確認請求(傳送RecordBatch的請求)完成情況。
       對於message接收情況的確認,Kafka Broker支援了三種情形:
1、不需要確認;
2)leader接收到就確認;
3)等所有可用的follower複製完畢進行確認。
        可以看出,這三種情況代表不同的確認粒度。在Java Producer Client中,對三種情形都做了支援,上述三種情形分別對應了三個配置項:0、1、-1。其實還有一個值是all,它其實就是-1。

      Kafka Producer Java Client 是如何支援這三種確認呢?
      1、 在為RecordBatch建立請求時,acks的值會被封裝為請求頭的一部分。
      2、 傳送請求後(接收到Broker響應前),立即判斷是否需要確認該請求是否完成(即該RecordBatch是否被Broker成功接收),判斷依據是acks的值是否是0。如果是0,即不需要進行確認。那麼就認定該請求成功完成。既然認定是成功,那麼就不會進行retry了。
       如果值不是0,就要等待Broker的響應了。根據響應情況,來判斷請求是否成功完成。

該配置項預設值是1,即leader接收後就響應。

·buffer.memory
       BufferPool Size,也就是等待發送的Record的空間大小。預設值是:33554432,即32MB。
       配置項的單位是byte,範圍是:[0,….]

·compression.type
       Kafka提供了多種壓縮型別,可選值有4個: none, gzip, snappy, lz4。預設值是none。

·retries
      當一個RecordBatch傳送失敗時,就會重新改善以確保資料完成交付。該配置設定了重試次數,值範圍[0, Integer.Max]。如果是0,即便失敗,也不會進行重發。
      如果允許重試(即retries>0),但max.in.flight.requests.per.connection 沒有設定成1。這種情況下,就可能會出現records的順序改變的現象。例如:一個prodcuder client的sender執行緒在一次輪詢中,如果有兩個recordbatch都要傳送到同步一個partition中,此時它們肯定是發往同一個broker的,並且是用的同一個TCP connection。如果出現RecordBatch1先發,但是傳送失敗,RecordBatch2緊接著RecordBatch1傳送,它是傳送成功的。然後RecordBatch1會進行重發。這樣一來,就出現了broker接收到的順序是RecordBatch2先於RecordBatch1的情況。

·ssl.key.password
       Keystore 檔案中私鑰的密碼。可選的。

·ssl.keystore.location
        Keystore檔案的位置。可選的。

·ssl.keystore.password
        Keystore 檔案的密碼。可選的。

·ssl.truststore.location
        Trust store 檔案的位置。可選的。

·ssl.truststore.password
        Trust store檔案的密碼。可選的。

·batch.size
        RecordBatch的最大容量。預設值是16384(16KB)。

·client.id
        邏輯名,client給broker發請求是會用到。預設值是:””。

·connections.max.idle.ms
       Connection的最大空閒時間。預設值是540000 (9 min)

·linger.ms
        Socket :solinger。延遲。預設值:0,即不延遲。

·max.block.ms
       當需要的metadata未到達之前(例如要傳送的record的topic,在Client中還沒有相關記錄時),執行KafkaProducer#send時,內部處理會等待MetaData的到達。這是個阻塞的操作。為了防止無限等待,設定這個阻塞時間是必要的。範圍:[0, Long.MAX]

·max.request.size
        最大請求長度,在將record壓縮到RecordBatch之前會進行校驗。超過這個大小會丟擲異常。

·partitioner.class
        用於自定義partitioner演算法。預設值是:
org.apache.kafka.clients.producer.internals.DefaultPartitioner

·receive.buffer.byte
       TCP receiver buffer的大小。取值範圍:[-1, …]。這個配置項的預設值是32768(即 32KB)。
如果設定為-1,則會採用作業系統的預設值。

·request.timeout.ms
       最大請求時長。因為發起請求後,會等待broker的響應,如果超過這個時間就認為請求失敗。

·timeout.ms
        這個時間配置的是follower到leader的ack超時時間。這個時間和 producer傳送的請求的網路無關。

·block.on.buffer.full
       當bufferPool用完後,如果client還在使用KafkaProducer傳送record,要麼是BufferPool拒絕接收,要麼是丟擲異常。
        這個配置是預設值是flase,也就是當bufferpool滿時,不會丟擲BufferExhaustException,而是根據max.block.ms進行阻塞,如果超時丟擲TimeoutExcpetion。

       如果這個屬性值是true,則會把max.block.ms值設定為Long.MAX。另外該配置為true時,metadata.fetch.time.ms將不會生效了。

·interceptor.class
       自定義攔截器類。預設情況下沒有指定任何的interceptor。

·max.in.flight.requests.per.connection
       每個連線中處於傳送狀態的請求數的最大值。預設值是5。範圍是[1, Integer.MAX]

·metric.reporters
       MetricReporter的實現類。預設情況下,會自動的註冊JmxReporter。

·metrics.num.samples
     計算metric時的取樣數。預設值是2。範圍:[1,Integer.MAX]

·metrics.sample.window.ms
     取樣的時間視窗。預設值是30000(30s)。範圍:[0, Long.MAX]