Kafka——Kafka消費者(從Kafka讀取資料)
KafkaConsumer概念
消費者和消費者群組
假設我們有一個應用程式需要從一個Kafka 主題讀取訊息並驗證這些訊息,然後再把它們儲存起來。應用程式需要建立一個消費者物件,訂閱主題並開始接收訊息,然後驗證訊息井儲存結果。過了一陣子,生產者往主題寫入訊息的速度超過了應用程式驗證資料的速度,這個時候該怎麼辦?如果只使用單個消費者處理訊息,應用程式會遠跟不上訊息生成
的速度。顯然,此時很有必要對消費者進行橫向伸縮。就像多個生產者可以向相同的主題寫入訊息一樣,我們也可以使用多個消費者從同一個主題讀取訊息,對訊息進行分流。
Kafka 消費者從屬於消費者群組。一個群組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分割槽的訊息。
1個消費者,4個分割槽:
2個消費者,4個分割槽:
4個消費者,4個分割槽:
5個消費者,4個分割槽:
我們可以知道, 消費者數目大於分割槽數目的時候,有消費者是閒置的,所以最好不要讓消費者數目超過分割槽的數目。
往群組裡增加消費者是橫向伸縮消費能力的主要方式。Kafka 消費者經常會做一些高延遲的操作,比如把資料寫到資料庫或HDFS ,或者使用資料進行比較耗時的計算。在這些情況下,單個消費者無法跟上資料生成的速度,所以可以增加更多的消費者,讓它們分擔負,每個消費者只處理部分分割槽的訊息,這就是橫向伸縮的主要手段。
除了通過增加消費者來橫向伸縮單個應用程式外,還經常出現多個應用程式從同一個主題讀取資料的情況。為每一個需要獲取一個或多個主題全部訊息的應用程式建立一個消費者群組,然後往群組裡新增消費者來伸縮讀取能力和處理能力,群組裡的每個消費者只處理一部分訊息。
兩個消費者群組對應一個主題:
消費者群組和分割槽再均衡
我們已經從上一個小節瞭解到,群組裡的消費者共同讀取主題的分割槽。一個新的悄費者加入群組時,它讀取的是原本由其他消費者讀取的訊息。當一個消費者被關閉或發生崩潰時,它就離開群組,原本由它讀取的分割槽將由群組裡的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分割槽,會發生分割槽重分配。
分割槽的所有權從一個消費者轉移到另一個消費者,這樣的行為被稱為再均衡。不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取訊息,造成整個群組一小段時間的不可用。另外,當分割槽被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去重新整理快取,在它重新恢復狀態之前會拖慢應用程式。我們將在本章討論如何進行安全的再均衡,以及如何避免不必要的再均衡。
如何觸發均衡呢?
消費者通過向被指派為群組協調器的broker (不同的群組可以有不同的協調器)傳送心跳來維持它們和群組的從屬關係以及它們對分割槽的所有權關係。只要消費者以正常的時間間隔傳送心跳,就被認為是活躍的,說明它還在讀取分割槽裡的訊息。消費者會在輪詢訊息(為了獲取訊息)或提交偏移量時傳送心跳。如果消費者停止傳送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。
建立Kafka消費者
在讀取訊息之前,需要先建立一個KafkaConsumer物件。建立KafkaConsumer物件與建立KafkaProducer物件非常相似一一把想要傳給消費者的屬性放在Properties物件裡。
在這裡我們需要使用對應的3個必要屬性:
bootstrap.servers、key.deserializer、value.deserializer。
不必要的屬性是group.id,它指定了KafkaConsumer屬於哪個消費群組。
下面演示瞭如何建立一個KafkaConsumer物件:
Properties properties=new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("group.id","CountryCounter");
properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer=new KafkaConsumer<String, String>(properties);
訂閱主題
subscribe()方法接受一個主題列表作為引數,使用起來很簡單。
consumer.subscribe(Collections.singletonList("customerContries"));
如果要訂閱所有與test相關的主題,可以如下操作:
consumer.subscribe("test.*");
當然也可以自定義訂閱:
List topics = new ArrayList();
topics.add(BaseTopicService.QUERY_BASE_TOPIC);
this.consumer.subscribe(topics);
輪詢
訊息輪詢是消費者API 的核心,通過一個簡單的輪詢向伺服器請求資料。一旦消費者訂閱了主題,輪詢就會處理所有的細節,包括群組協調、分割槽再均衡、傳送心跳和獲取資料,開發者只需要使用一組簡單的API 來處理從分割槽返回的資料。消費者程式碼的主要部分如下所示:
try {
while (true){
ConsumerRecords<String,String> records=consumer.poll(100);
for (ConsumerRecord<String,String> record: records) {
log.debug("topic=%s,partition=%s,offset=%d,customer=%s,country=%s\n",
record.topic(),record.partition(),record.offset(),
record.key(),record.value());
int updateCount=1;
if (custCountryMap.countainsValue(record.value()))
updateCount=custCountryMap.get(record.value())+1;
custCountryMap.put(record.value(),updateCount);
JSONObject json=new JSONObject(custCountryMap);
System.out.println(json.toString(4));
}
}
}finally {
consumer.close();
}
這是一個無限迴圈。消費者實際上是一個長期執行的應用程式,它通過持續輪詢向Kafka 請求資料。稍後我們會介紹如何退出迴圈,井關閉消費者。
poll()方法引數是一個超時時間,用於控制poll()方法的阻塞(在消費者的緩衝區裡沒有可用資料時會發生阻塞),。如果該引數被設為0, poll()會立即返回,否則它會在指定的毫秒數內一直等待broker 返回資料。poll()返回一個記錄列表。
我們這裡用JSON格式列印結果,實際場景可能是被儲存在資料庫裡。
在退出應用程式之前用close()關閉消費者。
消費者的配置
- fetch. min.bytes
該屬性指定了消費者從伺服器獲取記錄的最小位元組數(很明顯降低負載的作用)。 - fetch.max.wait.ms
這個是最長等待時間,和上面可以同時使用。要麼是位元組數達到最小值,要麼是時間超時。 - session.timeout.ms
該屬性指定了消費者在被認為死亡之前可以與伺服器斷開連線的時間,預設是3s。 - 其他配置以後瞭解
提交和偏移量
每次呼叫poll()方法,它總是返回由生產者寫入Kafka 但還沒有被消費者讀取過的記錄,我們因此可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。
我們把更新分割槽當前位置的操作叫作提交。
那麼消費者是如何提交偏移量的呢?消費者往一個叫作_consumer_offset 的特殊主題傳送訊息,訊息裡包含每個分割槽的偏移量。如果消費者一直處於執行狀態,那麼偏移量就沒有什麼用處。不過,如果悄費者發生崩潰或者有新的消費者加入群組,就會觸發再均衡,完成再均衡之後,每個消費者可能分配到新的分割槽,而不是之前處理的那個。為了能夠繼續之前的工作,消費者需要讀取每個分割槽最後一次提交的偏移量,然後從偏移量指定的地方繼續處理。
對於偏移量。
如果提交的偏移量小於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息就會被重複處理,如圖所示。
如果提交的偏移量大於客戶端處理的最後一個訊息的偏移量,那麼處於兩個偏移量之間的訊息將會丟失,如圖所示。
所以,處理偏移量的方式對客戶端會有很大的影響。KafkaConsumer AP I 提供了很多種方式來提交偏移量。
自動提交
最簡單的提交方式是讓悄費者自動提交偏移量。如果enable.auto.commit
被設為true,那麼每過auto.commit.interval.ms
=5s,消費者會自動把從poll()方法接收到的最大偏移量提交上去。
不過,在使用這種簡便的方式之前,需要知道它將會帶來怎樣的結果。
在最近一次提交之後的3s 發生了再均衡,再均衡之後,消費者從最後一次提交的偏移量位置開始讀取訊息。所以在這3s 內到達的訊息會被重複處理。這種情況是無法避免的。
自動提交雖然方便, 不過並沒有為開發者留有餘地來避免重複處理訊息。
提交當前偏移量
大部分開發者通過控制偏移量提交時間來消除丟失訊息的可能性,井在發生再均衡時減少重複訊息的數量。消費者API 提供了另一種提交偏移量的方式, 開發者可以在必要的時候提交當前偏移量,而不是基於時間間隔。
首先把enable.auto.commit
設為false,使用commitSync()即可。
while (true){
ConsumerRecords<String,String> records=consumer.poll(100);
for (ConsumerRecord<String,String> record: records) {
log.debug("topic=%s,partition=%s,offset=%d,customer=%s,country=%s\n",
record.topic(),record.partition(),record.offset(),
record.key(),record.value());
}
try {
consumer.commitSync();
}catch (CommitFailedException e){
log.error("commit failed",e);
}
}
只要沒有發生不可恢復的錯誤,commitSync()會一直嘗試直至提交成功。如果提交失敗, 我們也只能把異常記錄到錯誤日誌裡。
非同步提交
把commitSync改為commitAsync()即可。它也支援回撥,在broker 作出響應時會執行回撥。回撥經常被用於記錄提交錯誤或生成度量指標, 不過如果你要用它來進行重試, 一定要注意提交的順序。
while (true){
ConsumerRecords<String,String> records=consumer.poll(100);
for (ConsumerRecord<String,String> record: records) {
log.debug("topic=%s,partition=%s,offset=%d,customer=%s,country=%s\n",
record.topic(),record.partition(),record.offset(),
record.key(),record.value());
}
try {
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
if (e!=null){
log.error("commit failed for offsets {}",offsets,e);
}
}
});
}
}
同步非同步組合提交
try {
while (true){
ConsumerRecords<String,String> records=consumer.poll(100);
for (ConsumerRecord<String,String> record: records) {
log.debug("topic=%s,partition=%s,offset=%d,customer=%s,country=%s\n",
record.topic(),record.partition(),record.offset(),
record.key(),record.value());
}
try {
consumer.commitAsync();
}catch (Exception e){
log.error("Unexcept Error",e);
}
}
}finally {
try {
consumer.commitSync();
}finally {
consumer.close();
}
}
當然還可以提交特定的偏移量,我們這裡不作介紹了
再均衡監聽器
rebalanceListener略。
從特定偏移量處開始處理記錄
略。
如何退出
在之前討論輪詢時就說過,不需要擔心消費者會在一個無限迴圈裡輪詢訊息,我們會告訴消費者如何優雅地退出迴圈。
如果確定要退出迴圈,需要通過另一個執行緒呼叫consumer.wakeup()方法。
反序列化器
暫略