1. 程式人生 > >KafkaConsumer一些概念解釋(從官方文件整理而來)

KafkaConsumer一些概念解釋(從官方文件整理而來)

閱讀之前假設您已經對kafka有了一定的瞭解

KafkaConsumer類簡介

public class KafkaConsumer<K,V>extends Object implements Consumer<K,V>
(從官方文件copy而來)

KafkaConsumer是kafka服務的一個java版本的客戶端。它會自動處理kafka叢集中出現的錯誤,自動適應kafka叢集中資料分割槽的遷移。它還可以以消費群組(consumer groups)的方式同伺服器互動,從而實現訊息處理負載均衡( load balance consumption)。
消費者(consumer )跟代理伺服器(broker)之間保持著一個TCP連線來獲取資料。如果在使用完consumer之後沒有呼叫close()方法來關閉的話,就會導致連線洩露。KafkaConsumer不是執行緒安全的,切記。(Producer類是執行緒安全的,通常我們將Producer以單例的模式實現)

偏移量(Offsets )和消費者位置(Consumer Position)

簡短解說,偏移量就相當於資料庫中的ID,是一個唯一識別符號,代表著一條訊息在一個主題(topic)的某個分割槽(partition)中的位置,消費者位置就是某個訂閱該主題的消費者在它所使用的那個主題分割槽中的位置,也就是它目前處理訊息的位置。資料庫遊標知道吧,訊息偏移量就相當於資料庫中的資料ID,而消費者位置就相當於資料庫遊標位置。如果消費者不小心掛掉了,再重啟還是會從當前消費者位置來讀取資料,這就是Consumer Position的作用。

消費者群組(Consumer Groups)和主題訂閱(Topic Subscriptions)

kafka使用消費者群組(Consumer Groups)的概念來分割處理訊息的工作。一個consumer group中可以有多個消費者,這多個消費者可以在同一臺機器上執行,也可以在不同的機器上執行。這樣,就有了一定的擴充套件性和容錯功能。
每個kafka的consumer都可以配置一個群組名(consumer group),並且可以通過subscribe()方法動態設定想要訂閱的topic列表。kafka伺服器將把每個topic中的每條訊息都發送給訂閱它的群組,每個群組中只會有一個consumer來處理這條訊息。具體的實現機制就是kafka伺服器將每個topic的每個partition都分配給訂閱它的群組中的一個consumer,從而實現併發處理和負載均衡。簡短解說,我們將概念抽象出來,把topic抽象成一包糖,partition相當於一個一個的糖豆,一個consumer group抽象成一堆熊孩子,那一個consumer就是這堆熊孩子中的一個熊孩子了。假設現在這包糖有90個,這堆熊孩子有30個,那每個熊孩子能飛到3個糖果。假設這包糖有89個,熊孩子還是30個,你該說分不勻了,這就不是你管的事了,kafka伺服器會去決定哪個熊孩子倒黴,少分一個。這是在一對一的情況下,將這個概念延伸開來,在多對一,一對多,多對多的情況下也是適用的。你只需要將每包糖果看成彼此獨立,每堆熊孩子彼此獨立,每包糖果對每堆熊孩子之間都相互獨立,互不影響應該就能理解。

kafka伺服器檢測consumer的失效(Detecting Consumer Failures)

當一個consumer訂閱一個topic,在該consumer呼叫poll(long)方法後就會自動加入它所屬於的那個群組。poll(long)方法在設計上可以維持該consumer的活性,只要該consumer持續呼叫poll(方法)方法。表象之下,poll(long)方法在每次被呼叫的時候都會向kafka伺服器傳送一個心跳(heartbeat)來告訴kafka伺服器自己依然健在。如果你停止呼叫poll(long)方法(可能是因為異常導致程式掛掉了),那consumer就不會再向kafka伺服器傳送heartbeat,然後過一段時間,伺服器就會認為該consumer掛掉了,然後就會被踢出consumer所屬的群組,然後本來被分配給該consumer的partition就會被重新分配就其他的consumer(就相當於是某個熊孩子不小心把自己玩死了,然後他的糖果就會被拿回去,重新分給其他的熊孩子)。這樣設計是為了防止某些consumer掛掉之後依舊握著partition不鬆手,導致某些訊息無法被其他健在的consumer處理的情況發生。
在單執行緒情況下,這種設計就要求consumer處理接受到的訊息的時間要小於呼叫poll(long)的週期,從而保證heartbeat的正常傳送,從而讓伺服器知道自己依然健在。這裡有一個session timeout的概念,session timeout就是consumer傳送兩次有效heartbeat的最長時間間隔,嚴格來說就是在不超過多長的時間內,你讓伺服器接收到heartbeat,從而確定你的consumer的活性。如果你接收到了訊息,然後用來處理這些訊息的時間過長,從而導致無法呼叫poll(long)而無法傳送heartbeat,那在session timeout之後,伺服器就會認為你的consumer掛掉了。如果伺服器認為你的consumer掛掉了,那你consumer相應的partition就會被kafka伺服器的負載均衡機制給均衡掉,重新分配給其他的consumer。
KafkaConsumer類有兩項配置可以控制這種行為:
1、session.timeout.ms:從名稱就可以看出來,就是heartbeat超時時間,增加該設定值可以給consumer更多處理poll(long)返回的訊息的時間。唯一的缺點是如果你的consumer不小心玩脫掛掉了,伺服器可能不能及時檢測到,這就會導致伺服器不能及時根據你consumer的情況進行負載均衡。如果你呼叫close()方法來告知伺服器你的consumer要退出了,伺服器會及時進行負載均衡,這種情況不受該設定的影響。
2、max.poll.records:意思也顯而易見,就是每次呼叫poll(long)方法的時候,最多返回多少條訊息記錄。訊息處理時間通常跟要處理的訊息記錄的條數是成比例的,所以通常人們希望在每次呼叫poll(long)的返回條數上做限制。預設情況下,該設定的值為無限制(no limit)。

使用示例

自動提交位置

     /*從官方文件copy而來*/
     Properties props = new Properties();
     /*配置broker*/
     props.put("bootstrap.servers", "localhost:9092");
     /*配置group id*/
     props.put("group.id", "test");
     /*配置自動提交位置*/
     props.put("enable.auto.commit", "true");
     /*配置自動提交的時間,以毫秒為單位*/
     props.put("auto.commit.interval.ms", "1000");
     /*配置session timeout時間,以毫秒為單位*/
     props.put("session.timeout.ms", "30000");
     /*這兩個deserializer一般不要動,直接拿來用就行了*/
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     /*建立consumer*/
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     /*配置consumer訂閱的主題,這裡用 foo 和 bar 做為例子*/
     consumer.subscribe(Arrays.asList("foo", "bar"));
     /*一般我們在一個死迴圈裡呼叫poll(long)和處理訊息*/
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
     }

手動提交位置

     /*從官方文件copy而來*/
     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     /*這裡關掉自動提交*/
     props.put("enable.auto.commit", "false");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     /*配置一個限制,當訊息數量達到這個限制,我們處理訊息*/
     final int minBatchSize = 200;
     /*訊息快取連結串列*/
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         /*呼叫poll(long)來獲取訊息資料*/
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             /*達到限制,開始處理訊息*/
             insertIntoDb(buffer);
             /*處理訊息後,用同步方法提交consumer position,也就是消費者位置*/
             consumer.commitSync();
             buffer.clear();
         }
     }

上面的例子是在所有的訊息都成功處理完之後一次性提交所有consumer所關聯的分割槽位置,我們還可以更進一步,更細化的控制位置提交的時機,比如我們可以一個分割槽一個分割槽的來處理訊息,然後每處理完一個分割槽的訊息,我們就提交一下consumer在當前分割槽的位置。程式碼如下:

/*從官方文件copy而來*/
     try {
         while(running) {
         /*獲取訊息資料*/
             ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
             /*遍歷訊息資料中有關的分割槽*/
             for (TopicPartition partition : records.partitions()) {
             /*取出當前訊息分割槽中的訊息*/
                 List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                 /*處理訊息*/
                 for (ConsumerRecord<String, String> record : partitionRecords) {
                     System.out.println(record.offset() + ": " + record.value());
                 }
                 /*計算當前位置*/
                 long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                 /*用同步方法提交當前位置*/
                 consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
             }
         }
     } finally {
     /*最後別忘了關閉consumer*/
       consumer.close();
     }

手動控制分割槽分配

在之前的例子中,我們只是訂閱了我們感興趣的topic,然後kafka伺服器會自動為我們的consumer分配topic下的partition。然而某些情況下我們可能需要手動配置我們的consumer所要使用的partition,例如我們想要重新獲取以前已經使用過的訊息,我們就需要手動來配置partition。
如果想要手動配置分割槽,就不能再呼叫subscribe()方法,需要呼叫assign(Collection)來配置,Collection表示所有想要配置的分割槽的集合。示例程式碼如下:

     /*程式碼是從官方文件copy而來*/
     /*想要訂閱的topic*/
     String topic = "foo";
     /*想要配置的分割槽們*/
     TopicPartition partition0 = new TopicPartition(topic, 0);
     TopicPartition partition1 = new TopicPartition(topic, 1);
     /*配置分割槽*/
     consumer.assign(Arrays.asList(partition0, partition1));

配置完分割槽之後,具體的使用就跟前面介紹過的一樣了。如果想要更換分割槽,只需重新呼叫assign()方法就行了。手動配置的分割槽是沒有consumer group的自動負載均衡功能的,所以如果你的consumer掛掉了,並不會引起群組的負載均衡,也就沒有其他的consumer自動接管你的consumer的作用,那麼訊息就不能被該群組處理了。同時,如果一個群組中有多個consumer分配了同一個topic下的同一個分割槽,那麼可能會導致consumer position的commit問題,可能一個consumer提交了一個靠前的位置,而兩一個consumer隨後提交了一個靠後的位置,從而導致訊息重複。為了避免這種衝突,你應該確保使用手動分配partition的群組只有一個consumer,同時這個consumer要分配它所訂閱的topic下的所有partition來接受所有的訊息。這樣,consumer就可以安全的讀取任意partition的任意位置的訊息了。

注意:

kafka不支援將手動分割槽分配和自動動態分割槽分配混合使用,也就是說如果你的群組中有一個consumer是手動分配,則其他的都會成為手動分配,所以建議手動分配的consumer group只配置一個consumer。

控制consumer分配的partition

假設你使用了手動配置分割槽,且你的consumer group中只有一個consumer,這時你就可以呼叫seek(TopicPartition,long)方法來讀取任意位置的訊息了。

整理不易,煩請支援原創,轉載請註明出處。