kafka叢集Producer基本資料結構及工作流程深入剖析-kafka 商業環境實戰
本套技術專欄是作者(秦凱新)平時工作的總結和昇華,通過從真實商業環境抽取案例進行總結和分享,並給出商業應用的調優建議和叢集環境容量規劃等內容,請持續關注本套部落格。期待加入IOT時代最具戰鬥力的團隊。QQ郵箱地址:[email protected],如有任何學術交流,可隨時聯絡。
1 Producer端基本資料結構
-
ProducerRecord: 一個ProducerRecord表示一條待發送的訊息記錄,主要由5個欄位構成:
topic所屬topic partition所屬分割槽 key鍵值 value訊息體 timestamp時間戳 複製程式碼
-
RecordMetadata: Kafka伺服器端返回給客戶端的訊息的元資料資訊,前3項相對比較重要,Producer端可以使用這些訊息做一些訊息傳送成功之後的處理。
offset該條訊息的位移 timestamp訊息時間戳 topic + partition所屬topic的分割槽 checksum訊息CRC32碼 serializedKeySize序列化後的訊息鍵位元組數 serializedValueSize序列化後的訊息體位元組數 複製程式碼
2 Producer端訊息傳送流程

-
在send()的傳送訊息動作觸發之前,通過props屬性中指定的servers連線到broker叢集,從Zookeeper收集叢集Metedata資訊,從而瞭解哪些broker掌管哪一個Topic的哪一個partition,以及brokers的健康狀態。
-
下面就是流水線操作,ProducerRecord物件攜帶者topic,partition,message等資訊,在Serializer這個“車間”被序列化。
-
序列化過後的ProducerRecord物件進入Partitioner“車間”,按照上文所述的Partitioning 策略決定這個訊息將被分配到哪個Partition中。
-
確定partition的ProducerRecord進入一個緩衝區,通過減少IO來提升效能,在這個“車間”,訊息被按照TopicPartition資訊進行歸類整理,相同Topic且相同parition的ProducerRecord被放在同一個RecordBatch中,等待被髮送。什麼時候傳送?都在Producer的props中被指定了,有預設值,顯然我們可以自己指定。
(1) batch.size:設定每個RecordBatch可以快取的最大位元組數 (2) buffer.memory:設定所有RecordBatch的總共最大位元組數 (3) linger.ms設定每個RecordBatch的最長延遲傳送時間 (4) max.block.ms 設定每個RecordBatch的最長阻塞時間 複製程式碼
-
一旦,當單個RecordBatch的linger.ms延遲到達或者batch.size達到上限,這個 RecordBatch會被立即傳送。另外,如果所有RecordBatch作為一個整體,達到了buffer.memroy或者max.block.ms上限,所有的RecordBatch都會被髮送。
-
ProducerRecord訊息按照分配好的Partition傳送到具體的broker中,broker接收儲存訊息,更新Metadata資訊,同步給Zookeeper。
-
Producer端其他優化點:
(5) acks:Producer的資料確認阻塞設定,0表示不管任何響應,只管發,發完了立即執行下個任務,這種方式最快,但是很不保險。1表示只確保leader成功響應,接收到資料。2表示確保leader及其所有follwer成功接收儲存訊息,也可以用”all”。 (6) retries:訊息傳送失敗重試的次數。 (7) retry.backoff.ms:失敗補償時間,每次失敗重試的時間間隔,不可設定太短,避免第一條訊息的響應還沒返回,第二條訊息又發出去了,造成邏輯錯誤。 (8) max.in.flight.request.per.connection:同一時間,每個Producer能夠傳送的訊息上限。 (9) compression.typeproducer所使用的壓縮器,目前支援gzip, snappy和lz4。壓縮是在使用者主執行緒完成的,通常都需要花費大量的CPU時間,但對於減少網路IO來說確實利器。生產環境中可以結合壓力測試進行適當配置 複製程式碼