1. 程式人生 > >Kafka設計解析(十四)Kafka producer介紹

Kafka設計解析(十四)Kafka producer介紹

解析 共享 發送 丟失 ash 整體 ket flight memory

轉載自 huxihx,原文鏈接 Kafka producer介紹

Kafka 0.9版本正式使用Java版本的producer替換了原Scala版本的producer。本文著重討論新版本producer的設計原理以及基本的使用方法。

目錄

一、基本數據結構

1. ProducerRecord

2. RecordMetadata

二、工作流程

1. 基本設計特點

2. 關鍵參數

3. 內部原理

3.1 Step 1: 序列化+計算目標分區

3.2 Step 2: 追加寫入消息緩沖區(accumulator)

3.3 Step 3: Sender線程預處理及消息發送

3.4 Step 4: Sender線程處理response

三、基本使用

新版本Producer

首先明確一下,新版本producer指的是o.a.k.clients.producer.KafkaProducer,而不是kafka.producer.Producer。如果你依然在使用後者,我們強烈建議你趕快升級到Kafka0.9以後的版本。

一、基本數據結構

新版本客戶端(包含新版本producer和新版本consumer)重寫了之前服務器端代碼提供的很多數據結構以擺脫對服務器端代碼的依賴。其中有一些是你理解新版本producer所必需的,它們包括(但不限於):

1. ProducerRecord

一個ProducerRecord表示一條待發送的消息記錄,主要由5個字段構成:

  • topic   所屬topic
  • partition 所屬分區
  • key 鍵值
  • value 消息體
  • timestamp 時間戳

ProducerRecord允許用戶在創建消息對象的時候就直接指定要發送的分區,這樣producer後續發送該消息時可以直接發送到指定分區,而不用先通過Partitioner計算目標分區了。另外,我們還可以直接指定消息的時間戳——但一定要慎重使用這個功能,因為它有可能會令時間戳索引機制失效(筆者曾經直接指定時間戳故意打亂發送順序進行測試,比如先發送消息的時間戳大於後發送消息的時間戳,最後發現通過時間戳定位消息時會發生混亂。為此我還特意開了一個jira issue,不過被認為是"當前不被支持的用法"

)

2. RecordMetadata

該類表示Kafka服務器端返回給客戶端的消息的元數據信息,包含以下內容:

  • offset 該條消息的位移
  • timestamp 消息時間戳
  • topic + partition 所屬topic的分區
  • checksum 消息CRC32碼
  • serializedKeySize 序列化後的消息鍵字節數
  • serializedValueSize 序列化後的消息體字節數

上面的元數據信息前3項信息是比較重要的,producer端可以使用這些信息做一些消息發送成功之後的處理,比如寫入日誌等。

二、工作流程

如果把Producer統一看成一個盒子,那麽整個producer端的工作原理便如下圖所示:

技術分享圖片

大體上來說,用戶首先構建待發送的消息對象ProducerRecord,然後調用KafkaProducer#send方法進行發送。KafkaProducer接收到消息後首先對其進行序列化,然後結合本地緩存的元數據信息一起發送給partitioner去確定目標分區,最後追加寫入到內存中的消息緩沖池(accumulator)。此時KafkaProducer#send方法成功返回。

KafkaProducer中還有一個專門的Sender IO線程負責將緩沖池中的消息分批次發送給對應的broker,完成真正的消息發送邏輯。

1. 基本設計特點

結合源代碼,筆者認為新版本的producer從設計上來說具有以下幾個特點(或者說是優勢):

  1. 總共創建兩個線程:執行KafkaPrducer#send邏輯的線程——我們稱之為“用戶主線程”;執行發送邏輯的IO線程——我們稱之為“Sender線程”
  2. 不同於Scala老版本的producer,新版本producer完全異步發送消息,並提供了回調機制(callback)供用戶判斷消息是否成功發送
  3. batching機制——“分批發送“機制。每個批次(batch)中包含了若幹個PRODUCE請求,因此具有更高的吞吐量
  4. 更加合理的默認分區策略:對於無key消息而言,Scala版本分區策略是一段時間內(默認是10分鐘)將消息發往固定的目標分區,這容易造成消息分布的不均勻,而新版本的producer采用輪詢的方式均勻地將消息分發到不同的分區
  5. 底層統一使用基於Selector的網絡客戶端實現,結合Java提供的Future實現完整地提供了更加健壯和優雅的生命周期管理。

其實,新版本producer的設計優勢還有很多,諸如監控指標更加完善等這樣的就不一一細說了。總之,新版本producer更加地健壯,性能更好~

2. 關鍵參數

新版本producer的參數有幾十個之多,我們重點了解其中的6個就夠了,它們是:

  • batch.size 我把它列在了首位,因為該參數對於調優producer至關重要。之前提到過新版producer采用分批發送機制,該參數即控制一個batch的大小。默認是16KB
  • acks 關乎到消息持久性(durability)的一個參數。高吞吐量和高持久性很多時候是相矛盾的,需要先明確我們的目標是什麽? 高吞吐量?高持久性?亦或是中等?因此該參數也有對應的三個取值:0, -1和1
  • linger.ms 減少網絡IO,節省帶寬之用。原理就是把原本需要多次發送的小batch,通過引入延時的方式合並成大batch發送,減少了網絡傳輸的壓力,從而提升吞吐量。當然,也會引入延時
  • compression.type producer所使用的壓縮器,目前支持gzip, snappy和lz4。壓縮是在用戶主線程完成的,通常都需要花費大量的CPU時間,但對於減少網絡IO來說確實利器。生產環境中可以結合壓力測試進行適當配置
  • max.in.flight.requests.per.connection 關乎消息亂序的一個配置參數。它指定了Sender線程在單個Socket連接上能夠發送未應答PRODUCE請求的最大請求數。適當增加此值通常會增大吞吐量,從而整體上提升producer的性能。不過筆者始終覺得其效果不如調節batch.size來得明顯,所以請謹慎使用。另外如果開啟了重試機制,配置該參數大於1可能造成消息發送的亂序(先發送A,然後發送B,但B卻先行被broker接收)
  • retries 重試機制,對於瞬時失敗的消息發送,開啟重試後KafkaProducer會嘗試再次發送消息。對於有強烈無消息丟失需求的用戶來說,開啟重試機制是必選項。

3. 內部原理

上面的那張圖中其實並沒有深入展開producer的工作原理。這裏筆者打算詳細說說Producer內部到底是如何工作的,也就是梳理一下當用戶調用KafkaProducer.send(ProducerRecord, Callback)時Kafka內部都發生了什麽事情。

3.1 Step 1: 序列化+計算目標分區

這是KafkaProducer#send邏輯的第一步,即為待發送消息進行序列化並計算目標分區,如下圖所示:

技術分享圖片

如上圖所示,一條所屬topic是"test",消息體是"message"的消息被序列化之後結合KafkaProducer緩存的元數據(比如該topic分區數信息等)共同傳給後面的Partitioner實現類進行目標分區的計算。

3.2 Step 2: 追加寫入消息緩沖區(accumulator)

producer創建時會創建一個默認32MB(由buffer.memory參數指定)的accumulator緩沖區,專門保存待發送的消息。除了之前在“關鍵參數”段落中提到的linger.ms和batch.size等參數之外,該數據結構中還包含了一個特別重要的集合信息:消息批次信息(batches)。該集合本質上是一個HashMap,裏面分別保存了每個topic分區下的batch隊列,即前面說的批次是按照topic分區進行分組的。這樣發往不同分區的消息保存在對應分區下的batch隊列中。舉個簡單的例子,假設消息M1, M2被發送到test的0分區但屬於不同的batch,M3分送到test的1分區,那麽batches中包含的信息就是:{"test-0" -> [batch1, batch2], "test-1" -> [batch3]}

單個topic分區下的batch隊列中保存的是若幹個消息批次。每個batch中最重要的3個組件包括:

  • compressor: 負責執行追加寫入操作
  • batch緩沖區:由batch.size參數控制,消息被真正追加寫入到的地方
  • thunks:保存消息回調邏輯的集合

這一步的目的就是將待發送的消息寫入消息緩沖池中,具體流程如下圖所示:

技術分享圖片

okay!這一步執行完畢之後理論上講KafkaProducer.send方法就執行完畢了,用戶主線程所做的事情就是等待Sender線程發送消息並執行返回結果了。

3.3 Step 3: Sender線程預處理及消息發送

此時,該Sender線程登場了。嚴格來說,Sender線程自KafkaProducer創建後就一直都在運行著 。它的工作流程基本上是這樣的:

  1. 不斷輪詢緩沖區尋找已做好發送準備的分區
  2. 將輪詢獲得的各個batch按照目標分區所在的leader broker進行分組
  3. 將分組後的batch通過底層創建的Socket連接發送給各個broker
  4. 等待服務器端發送response回來

為了說明上的方便,我還是基於圖的方式來解釋Sender線程的工作原理:

技術分享圖片

3.4 Step 4: Sender線程處理response

上圖中Sender線程會發送PRODUCE請求給對應的broker,broker處理完畢之後發送對應的PRODUCE response。一旦Sender線程接收到response將依次(按照消息發送順序)調用batch中的回調方法,如下圖所示:

技術分享圖片

做完這一步,producer發送消息就可以算作是100%完成了。通過這4步我們可以看到新版本producer發送事件完全是異步過程。因此在調優producer前我們就需要搞清楚性能瓶頸到底是在用戶主線程還是在Sender線程。具體的性能測試方法以及調優方法以後有機會的話我再寫一篇出來和大家討論。

三、基本使用

由於KafkaProducer是線程安全的,因此在使用上有兩種基本的使用方法:

說明 優勢 劣勢

單KafkaProducer實例

所有線程共享一個KafkaProducer實例

實現簡單,性能好

1. 所有線程共用一個內存緩存池,可能需要較多的內存空間
2. 一旦崩潰所有用戶線程都無法工作

多KafkaProducer實例 每個線程維護一個KafkaProducer實例

1. 每個用戶線程擁有專屬的KafkaProducer實例、緩沖區空間以及一組配置參數,支持細粒度化調優
2. 單個KafkaProducer崩潰不會影響其他producer工作

較大的對象分配開銷

總 結

最後簡單總結一下,本文主要討論了新版本producer的一些設計特點及基本的使用方法。再次強調一下,新版本的producer使用完全異步化的多線程處理方式,同時結合分批處理機制,極大地提升了整體的性能。由於目前Kafka社區早已不維護Scala版producer了,所以還在使用0.8.2.x版本的用戶有條件的話盡量還是升級到最新的Kafka版本吧。

Kafka設計解析(十四)Kafka producer介紹