1. 程式人生 > >Kafka消費者——從 Kafka讀取資料

Kafka消費者——從 Kafka讀取資料

應用程式使用 KafkaConsumer向 Kafka 訂閱主題,並從訂閱的主題上接收訊息 。 從 Kafka 讀取資料不同於從其他悄息系統讀取資料,它涉及一些獨特的概念和想法。如果不先理解 這些概念,就難以理解如何使用消費者 API。所以我們接下來先解釋這些重要的概念,然 後再舉幾個例子,橫示如何使用消費者 API 實現不同的應用程式。

消費者和消費者群組

假設我們有一個應用程式需要從-個 Kafka主題讀取訊息井驗證這些訊息,然後再把它們 儲存起來。應用程式需要建立一個消費者物件,訂閱主題並開始接收訊息,然後驗證訊息 井儲存結果。過了 一陣子,生產者往主題寫入訊息的速度超過了應用程式驗證資料的速 度,這個時候該怎麼辦?如果只使用單個消費者處理訊息,應用程式會遠跟不上訊息生成 的速度。顯然,此時很有必要對消費者進行橫向伸縮。就像多個生產者可以向相同的 主題 寫入訊息一樣,我們也可以使用多個消費者從同一個主題讀取訊息,對訊息進行分流。

Kafka 消費者從屬於消費者群組。一個群組裡的消費者訂閱的是同一個主題,每個消費者 接收主題一部分分割槽的訊息。

假設主題 T1 有 4 個分割槽,我們建立了消費者 C1 ,它是群組 G1 裡唯 一 的消費者,我們用 它訂閱主題 T1。消費者 Cl1將收到主題 T1全部 4個分割槽的訊息,如圖 4-1 所示。

如果在群組 G1 裡新增一個消費者 C2,那麼每個消費者將分別從兩個分割槽接收訊息。我 假設消費者 C1接收分割槽 0 和分割槽 2 的訊息,消費者 C2 接收分割槽 1 和分割槽 3 的訊息,如圖 4-2 所示。

如果群組 G1 有 4 個消費者,那麼每個消費者可以分配到 一個分割槽,如圖 4-3 所示。

如果我們往群組裡新增更多的消費者,超過主題的分割槽數量,那麼多出的消費者就會被閒置,不會接收到任何訊息。

往群組裡增加消費者是橫向伸縮消費能力的主要方式。 Kafka 消費者經常會做一些高延遲的操作,比如把資料寫到資料庫或 HDFS,或者使用資料進行比較耗時的計算。在這些情況下,單個消費者無法跟上資料生成的速度,所以可以增加更多的消費者,讓它們分擔負載,每個消費者只處理部分分割槽的訊息,這就是橫向伸縮的主要手段。我們有必要為主題建立大量的分割槽,在負載增長時可以加入更多的消費者。不過要性意,不要讓消費者的數量超過主題分割槽的數量,多餘的消費者只會被閒置。

除了通過增加消費者來橫向伸縮單個應用程式外,還經常出現多個應用程式從同一個主題讀取資料的情況。實際上, Kafka 設計的主要目標之一 ,就是要讓 Kafka 主題裡的資料能夠滿足企業各種應用場景的需求。在這些場景裡,每個應用程式可以獲取到所有的訊息, 而不只是其中的 一部分。只要保證每個應用程式有自己的消費者群組,就可以讓它們獲取到主題所有的訊息。不同於傳統的訊息系統,橫向伸縮 Kafka消費者和消費者群組並不會對效能造成負面影響。

在上面的例子裡,如果新增一個只包含一個消費者的群組 G2,那麼這個消費者將從主題 T1 上接收所有的訊息,與群組 G1 之間互不影響。群組 G2 可以增加更多的消費者,每個消費者可以消費若干個分割槽,就像群組 G1 那樣,如圖 4-5 所示。總的來說,群組 G2 還是會接收到所有訊息,不管有沒有其他群組存在。

簡而言之,為每一個需要獲取一個或多個主題全部訊息的應用程式建立一個消費者群組, 然後往群組裡新增消費者來伸縮讀取能力和處理能力,群組裡的每個消費者只處理一部分訊息。

消費者群組和分割槽再均衡

我們已經從上一個小節瞭解到,群組裡的消費者共同讀取主題的分割槽。一個新的消費者加 入群組時,它讀取的是原本由其他消費者讀取的訊息。當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分割槽將由群組裡的其他消費者來讀取。在主題發生變化時 , 比如管理員添加了新的分割槽,會發生分割槽重分配。

分割槽的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地新增或移除消費者), 不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取訊息,造成整個群組一小段時間的不可用。另外,當分割槽被重新分配給另 一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去重新整理快取 ,在它重新恢復狀態之前會拖慢應用程式。我們將在本章討論如何進行安全的再均衡,以及如何避免不必要的再均衡。

消費者通過向被指派為 群組協調器的 broker (不同的群組可以有不同的協調器)傳送 心跳 來維持它們和群組的從屬關係以及它們對分割槽的所有權關係。只要消費者以正常的時間間隔傳送心跳,就被認為是活躍的,說明它還在讀取分割槽裡的訊息。消費者會在輪詢訊息 (為了獲取訊息)或提交偏移量時傳送心跳。如果消費者停止傳送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。

如果一個消費者發生崩潰,井停止讀取訊息,群組協調器(broker)會等待幾秒鐘,確認它死亡了才會觸發再均衡。在這幾秒鐘時間裡,死掉的消費者不會讀取分割槽裡的訊息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,儘量降低處理停頓。在本章的後續部分,我們將討論一些用於控制傳送心跳頻率和會話過期時間的配置引數,以及如何根據實際需要來配置這些引數 。

分配分割槽是怎樣的一個過程

當消費者要加入群組時,它會向群組協調器傳送 一 個 JoinGroup 請求。第 一 個加入群組的消費者將成為“群主”。群主從協調器那裡獲得群組的成員列 表(列表中包含了所有最近傳送過心跳的消費者,它們被認為是活躍的), 並負責給每一個消費者分配分割槽。它使用 一個實現了 PartitionAssignor介面的類來決定哪些分 區應該被分配給哪個消費者 。

Kafka 內建了兩種分配策略,在後面的配置引數小節我們將深入討論。分配完畢之後,群主把分配情況列表傳送給群組協調器,協調器再把這些資訊傳送給所有消費者。每個消費者只能看到自己的分配資訊,只有群 主知道群組 裡所有消費者的分配資訊。這個過程會在每次再均衡時重複發生。

建立 Kafka消費者

在讀取訊息之前,需要先建立 一個 KafkaConsumer物件 。 建立 KafkaConsumer 物件與建立 KafkaProducer物件非常相似——把想要傳給消費者的屬性放在 Properties 物件裡。本章 後續部分會深入討論所有的屬性。在這裡,我們只需要使用 3個必要的屬性: bootstrap.servers、 key.deserializer、 value.deserializer。

下面程式碼演示瞭如何建立一個KafkaConsumer物件:

Properties props = new Properties();

props.put("bootstrap.servers", "broker1:9092, broker2:9092");

props.put("group.id", "CountryCounter");

props.put("key.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

props.put("value.deserializer", "org.apache.kafka.common.serializaiton.StrignDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

deserializer使用指定的類(反序列化器)把位元組陣列轉成 Java物件。

group.id指定了KafkaConsumer 屬於哪一個消費者群組。
group.id不是必需的,不過我們現在姑且認為它是必需的。它指定了 KafkaConsumer 屬於哪一個消費者群組。建立不屬於任何一個群組的消費者也是可以的,只是這樣做不太常見。

訂閱主題

建立好消費者之後,下一步可以開始訂閱主題了。subscribe()方法接受一個主題列表作為引數

consumer.subscribe(Collections.singletonList("customerCountries"));

在這裡我們建立了一個包含單個元素的列表,主題的名字叫作“customerCountries”,我們也可以在呼叫subscribe()方法時傳入一個正則表示式,正則表示式可以匹配多個主題如果有人建立了新的主題,並且主題名與正則表示式匹配,那麼會立即觸發一次再均衡,消費者就可以讀取新新增的主題。如果應用程式需要讀取多個主題,並且可以處理不同型別的資料,那麼這種訂閱方式就很管用。在Kafka和其他系統之間複製資料時,使用正則表示式的方式訂閱多個主題時很常見的做法。

要訂閱所有test相關的主題,可以這樣做:consumer.subscribe("test.*");

輪詢

訊息輪詢是消費者 API 的核心,通過一個簡單的輪詢向伺服器請求資料。一旦消費者訂閱了主題 ,輪詢就會處理所有的細節,包括群組協調、分割槽再均衡、傳送心跳和獲取資料, 開發者只需要使用一組簡單的 API 來處理從分割槽返回的資料。消費者程式碼的主要部分如下所示 :

輪詢不只是獲取資料那麼簡單。在第一次呼叫新消費者的 poll() 方法時,它會負責查詢 GroupCoordinator, 然後加入群組,接受分配的分割槽。 如果發生了再均衡,整個過程也是在輪詢期間進行的。當然 ,心跳也是從輪詢裡發迭出去的。所以,我們要確保在輪詢期間所做的任何處理工作都應該儘快完成。

執行緒安全

在同一個群組中,我們無法讓一個執行緒執行多個消費者,也無法讓多個執行緒安全地共享一個消費者。按照規則,一個消費者使用一個執行緒。如果要在同一個消費者群組裡執行多個消費者,需要讓每個消費者執行在自己的執行緒裡。最好是把消費者的邏輯封裝在自己的物件裡,然後使用Java的ExecutorService啟動多個執行緒,使每個消費者執行在自己的執行緒上。Confluent的部落格(https://www.confluent.io/blog/)上有一個教程介紹如何處理這種情況。

消費者的配置

到目前為止,我們學習瞭如何使用消費者 API,不過只介紹了幾個配置屬’性一一如bootstrap.servers、 key.deserializer、 value.deserializer、group.id。 Kafka的文件列出了所有與消費者相關的配置說明。大部分引數都有合理的預設值,一般不需要修改它們,不過有一些引數與消費 者的效能和可用性有很大關係。接下來介紹這些重要的屬性。

1. fetch.min.bytes

該屬性指定了消費者從伺服器獲取記錄的最小位元組數。 broker 在收到消費者的資料請求時, 如果可用的資料量小於 fetch.min.bytes指定的大小,那麼它會等到有足夠的可用資料時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題不是很活躍的時候(或者一天裡的低谷時段)就不需要來來回回地處理訊息。如果沒有很多可用資料,但消費者的 CPU 使用率卻很高,那麼就需要把該屬性的值設得比預設值大。如果消費者的數量比較多,把該屬性的值設定得大一點可以降低 broker 的工作負載。

2. fetch.max.wait.ms

我們通過 fetch.min.bytes 告訴 Kafka,等到有足夠的資料時才把它返回給消費者。而 fetch.max.wait.ms則用於指定 broker的等待時間,預設是 500ms。如果沒有足夠的資料流入 Kafka,消費者獲取最小資料量的要求就得不到滿足,最終導致500ms的延遲。 如果要降低潛在的延遲(為了滿足 SLA),可以把該引數值設定得小一些。如果 fetch.max.wait.ms被設 為 100ms,並且 fetch.min.bytes 被設為 1MB,那麼 Kafka在收到消費者的請求後,要麼返 回 1MB 資料,要麼在 100ms 後返回所有可用的資料 , 就看哪個條件先得到滿足。

3. max.parition.fetch.bytes

該屬性指定了伺服器從每個分割槽裡返回給消費者的最大位元組數。它的預設值是 1MB,也 就是說, KafkaConsumer.poll() 方法從每個分割槽裡返回的記錄最多不超過 max.parition.fetch.bytes 指定的位元組。如果一個主題有 20個分割槽和 5 個消費者,那麼每個消費者需要至少 4MB 的可用記憶體來接收記錄。在為消費者分配記憶體時,可以給它們多分配一些,因 為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分割槽。 max.parition.fetch.bytes 的值必須比 broker能夠接收的最大訊息的位元組數(通過 max.message.size屬 性配置 )大, 否則消費者可能無法讀取這些訊息,導致消費者一直掛起重試。在設定該屬性時,另一個需要考慮的因素是消費者處理資料的時間。 消費者需要頻繁呼叫 poll() 方法來避免會話過期和發生分割槽再均衡,如果單次呼叫 poll() 返回的資料太多,消費者需要更多的時間來處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況, 可以把 max.parition.fetch.bytes 值改小 ,或者延長會話過期時間。

4. session.timeout.ms

該屬性指定了消費者在被認為死亡之前可以與伺服器斷開連線的時間,預設是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內傳送心跳給群組協調器,就被認為已經死亡,協調器就會觸發再均衡,把它的分割槽分配給群組裡的其他消費者 。該屬性與 heartbeat.interval.ms緊密相關。heartbeat.interval.ms 指定了poll()方法向協調器 傳送心跳的頻 率, session.timeout.ms 則指定了消費者可以多久不傳送心跳。所以, 一般需要同時修改這兩個屬性, heartbeat.interval.ms 必須比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 ls。 把 session.timeout.ms 值設 得比預設值小,可以更快地檢測和恢 復崩潰的節點,不過長時間的輪詢或垃圾收集可能導致非預期的再均衡。把該屬性的值設定得大一些,可以減少意外的再均衡 ,不過檢測節點崩潰需要更長的時間。

5. auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分割槽或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被刪除)該作何處理。它的預設值是latest, 意 思是說,在偏移量無效的情況下,消費者將從最新的記錄開始讀取資料(在消費者 啟動之 後生成的記錄)。另一個值是 earliest,意思是說,在偏移量無效的情況下,消費者將從 起始位置讀取分割槽的記錄。

6. enable.auto.commit

我們稍後將介紹 幾種 不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,預設值是 true。為了儘量避免出現重複資料和資料丟失,可以把它設為 false,由自己控制何時提交偏移量。如果把它設為 true,還可以通過配置 auto.commit.interval.mls 屬性來控制提交的頻率。

7. partition.assignment.strategy

我們知道,分割槽會被分配給群組裡的消費者。 PartitionAssignor 根據給定的消費者和主題,決定哪些分割槽應該被分配給哪個消費者。 Kafka 有兩個預設的分配策略 。

  • Range

該策略會把主題的若干個連續的分割槽分配給消費者。假設悄費者 C1 和消費者 C2 同時 訂閱了主題 T1 和主題 T2,井且每個主題有 3 個分割槽。那麼消費者 C1 有可能分配到這 兩個主題的分割槽 0 和 分割槽 1,而消費者 C2 分配到這兩個主題 的分割槽 2。因為每個主題 擁有奇數個分割槽,而分配是在主題內獨立完成的,第一個消費者最後分配到比第二個消費者更多的分割槽。只要使用了 Range策略,而且分割槽數量無法被消費者數量整除,就會出現這種情況。

  • RoundRobin

該策略把主題的所有分割槽逐個分配給消費者。如果使用 RoundRobin 策略來給消費者 C1 和消費者 C2分配分割槽,那麼消費者 C1 將分到主題 T1 的分割槽 0和分割槽 2以及主題 T2 的分割槽 1,消費者 C2 將分配到主題 T1 的分割槽 l 以及主題T2 的分割槽 0和分割槽 2。一般 來說,如果所有消費者都訂閱相同的主題(這種情況很常見), RoundRobin策略會給所 有消費者分配相同數量 的分割槽(或最多就差一個分割槽)。

可以通過設定 partition.assignment.strategy 來選擇分割槽策略。預設使用的是 org. apache.kafka.clients.consumer.RangeAssignor, 這個類實現了 Range策略,不過也可以 把它改成 org.apache.kafka.clients.consumer.RoundRobinAssignor。我們還可以使用自定 義策略,在這種情況下 , partition.assignment.strategy 屬性的值就是自定義類的名字。

8. client.id

該屬性可以是任意字串 , broker用它來標識從客戶端傳送過來的訊息,通常被用在日誌、度量指標和配額裡。

9. max.poll.records

該屬性用於控制單次呼叫 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢裡需要處理的資料量。

10. receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫資料時用到的 TCP 緩衝區也可以設定大小。如果它們被設為-1,就使用作業系統的預設值。如果生產者或消費者與 broker處於不同的資料中心內,可以適當增大這些值,因為跨資料中心的網路一般都有 比較高的延遲和比較低的頻寬 。