1. 程式人生 > >Kafka學習筆記(7)----Kafka使用Cosumer接收訊息

Kafka學習筆記(7)----Kafka使用Cosumer接收訊息

1. 什麼是KafkaConsumer?

  應用程式使用KafkaConsul'le 「向Kafka 訂閱主題,並從訂閱的主題上接收訊息。Kafka的訊息讀取不同於從其他訊息系統讀取資料,它涉及了一些獨特的概念和想法。

  1.1 消費者和消費者群組

  單個的消費者就跟前面的訊息系統的消費者一樣,建立一個消費者物件,然後訂閱一個主題並開始接受訊息,然後做自己的業務邏輯,但是Kafka天生就是支援體量很大的資料消費,如果只是使用單個的消費者消費訊息,當生產者寫入訊息的速度遠遠大於了消費者的速度,大量訊息堆積在消費者上可能會導致效能反而降低或撐爆消費者,所以橫向伸縮是很有必要的,就想多個生產者可以向相同的主題寫訊息一樣,我們也可以使用多個消費者從同一個主題讀取訊息,對訊息進行分流,這多個消費者就從屬於一個消費者群組。一個群組裡的消費者訂閱的是同一個主題,每個消費者接收主題一部分分割槽的訊息。

  假設主題T1有四個分割槽,我們建立了消費者群組G1,建立了一個消費者C1從屬於G1,它是G1裡的唯一的消費者,此時訂閱主題情況為,C1將會接收到主題中四個分割槽中的訊息,如圖:

  此時我們在消費者群組中新增一個消費者C2,那麼每個消費者將分別從兩個分割槽接受訊息,如圖:

  如果我們有四個消費者時,將會每個消費者都分到一個分割槽。

  如果群組中的消費者超過了主題的分割槽數,那麼有一部分消費者就會被閒置,不會接收任何訊息。如圖:

 

  往群組裡增加消費者是橫向伸縮消費能力的主要方式。

  對於多個群組來說,每個群組都會從Kafka中接收到所有的訊息,並且各個群組之間是互不干擾的。所以橫向伸縮Kafka消費者和消費者群組並不會對效能造成負面影響。簡而言之就是,為每一個需要獲取一個或多個主題全部訊息的應用程式建立一個消費者群組,然後往群組裡新增消費者來伸縮讀取能力和處理能力,群組裡的每個消費者只處理一部分訊息。如圖:

  1.2  消費者群組和分割槽再均衡

  一個新的消費者加入群組時,它讀取的是原本由其他消費者讀取的訊息。當一個消費者被關閉或發生奔潰時,它就離開群組,原本由它讀取的分割槽將由群組裡的其他消費者來讀取。在主題發生變化時, 比如管理員添加了新的分割槽,會發生分割槽重分配。分割槽的所有權從一個消費者變成了裡另一個消費者,這樣的行為被稱為再均衡。再均衡非常重要, 它為消費者群組帶來了高可用性和伸縮性(我們可以放心地新增或移除消費者),不過在正常情況下,我們並不希望發生這樣的行為。在再均衡期間,消費者無法讀取訊息,造成整個群組一小段時間的不可用。另外,當分割槽被重新分配給另一個消費者時,消費者當前的讀取狀態會丟失,它有可能還需要去重新整理快取,在它重新恢復狀態之前會拖慢應用程式。

  消費者通過向被指派為群組協調器的broker (不同的群組可以有不同的協調器)傳送心跳來維持它們和群組的從屬關係以及它們對分割槽的所有權關係。只要消費者以正常的時間間隔傳送心跳,就被認為是活躍的,說明它還在讀取分割槽裡的訊息。消費者會在輪詢訊息(為了獲取訊息)或提交偏移量時傳送心跳。如果消費者停止傳送心跳的時間足夠長,會話就會過期,群組協調器認為它已經死亡,就會觸發一次再均衡。如果一個消費者發生崩潰,井停止讀取訊息,群組協調器會等待幾秒鐘,確認它死亡了才會觸發再均衡。在這幾秒鐘時間裡,死掉的消費者不會讀取分割槽裡的訊息。在清理消費者時,消費者會通知協調器它將要離開群組,協調器會立即觸發一次再均衡,儘量降低處理停頓。

2. 建立Kafka消費者並讀取訊息

  在建立KafkaConsumer之前,需要將消費者想要的屬性存放到Properties中,然後再將properties傳給KafkaConsumer。

  Consuer也有三個必須的屬性。bootstrap.servers,這裡跟Producer一樣,另外兩個key.deserializer和value.deserializer也與Producer類似,不過一個是序列化,一個是反序列化而已。

  還有一個group.id不是必須的,但是我們通常都會指定改消費者屬於哪個群組,所以也可以認為是必須的。

  設定Properties的程式碼片段如下: 

 Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093");
        //配置key-value允許使用引數化型別,反序列化
        kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        //指定消費者所屬的群組
        kafkaPropertie.put("group.id","one");

  接下來建立消費者,將Properties物件傳入到消費者,然後訂閱主題,如下:

  KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie);
        /*訂閱主題,這裡使用的是最簡單的訂閱testTopic主題,這裡也可以出入正則表示式,來區分想要訂閱的多個指定的主題,如:
         *Pattern pattern = new Pattern.compile("testTopic");
         * consumer.subscribe(pattern);
         */

        consumer.subscribe(Collections.singletonList("testTopic"));

  接下來輪詢訊息,如下:

 //輪詢訊息
        while (true) {
            //獲取ConsumerRecords,一秒鐘輪訓一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //消費訊息,遍歷records
            for (ConsumerRecord<String, String> r : records) {
                LOGGER.error("partition:", r.partition());
                LOGGER.error("topic:", r.topic());
                LOGGER.error("offset:", r.offset());
                System.out.println(r.key() + ":" + r.value());
            }
            Thread.sleep(1000);
        }

  生產者傳送訊息,然後檢視消費者列印情況:

 KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world0
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world1
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world2
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world3
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world4
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world5
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world6
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world7
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world8
KafkaConsuerDemo - partition:
 KafkaConsuerDemo - topic:
 KafkaConsuerDemo - offset:
 key1:hello world9

  只存在一個組群和一個消費者時:

  當我們啟動兩個消費者,同一個組群,並在Topic上建立兩個Partition(分割槽),傳送訊息

   final ProducerRecord<String, String> record = new ProducerRecord<String, String>("one",i %  2,"key3","hello world" + i); 
將訊息分發到0和1兩個partition

  此時兩個消費者消費的訊息總和等於傳送的訊息的總和,使用不同的群組的不同的訂閱同一個topic,每個消費者群組都能收到所有的訊息。

  輪詢不只是獲取資料那麼簡單。在第一次呼叫新消費者的poll ()方法時,它會負責查詢GroupCoordinator , 然後加入群組,接受分配的分割槽。如果發生了再均衡,整個過程也是在輪詢期間進行的。當然,心跳也是從輪詢裡傳送出去的。所以,我們要確保在輪詢期間所做的任何處理工作都應該儘快完成。

   消費者完整程式碼如下:

package com.wangx.kafka.client;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsuerDemo {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaConsuerDemo.class);
    public static void main(String[] args) throws InterruptedException {
        Properties kafkaPropertie = new Properties();
        //配置broker地址,配置多個容錯
        kafkaPropertie.put("bootstrap.servers", "node2:9092,node1:9092,node1:9093");
        //配置key-value允許使用引數化型別,反序列化
        kafkaPropertie.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        kafkaPropertie.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        //指定消費者所屬的群組
        kafkaPropertie.put("group.id","1");
        //建立KafkaConsumer,將kafkaPropertie傳入。
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaPropertie);
        /*訂閱主題,這裡使用的是最簡單的訂閱testTopic主題,這裡也可以出入正則表示式,來區分想要訂閱的多個指定的主題,如:
         *Pattern pattern = new Pattern.compile("testTopic");
         * consumer.subscribe(pattern);
         */

        consumer.subscribe(Collections.singletonList("one"));
        //輪詢訊息
        while (true) {
            //獲取ConsumerRecords,一秒鐘輪訓一次
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            //消費訊息,遍歷records
            for (ConsumerRecord<String, String> r : records) {
                LOGGER.error("partition:", r.partition());
                LOGGER.error("topic:", r.topic());
                LOGGER.error("offset:", r.offset());
                System.out.println(r.key() + ":" + r.value());
            }
            Thread.sleep(1000);
        }
    }
}

3. 消費者的配置

  1. fetch.min.bytes: 該屬性指定了消費者從伺服器獲取記錄的最小位元組數。

  2. fetch.max.wait.ms:我們通過 fetch.min.byte告訴Kafka ,等到有足夠的資料時才把它返回給消費者。

而 fetch.max.wait.ms則用於指定broker 的等待時間

  3. max.partition.fetch.bytes:預設值是1MB,該屬性指定了伺服器從每個分割槽裡返回給消費者的最大位元組數.

  4. session.timeout.ms: 預設3s,該屬性指定了消費者在被認為死亡之前可以與伺服器斷開連線的時

  5. auto.offset.reset:該屬性指定了消費者在讀取一個沒有偏移量的分割槽或者偏移量無效的情況下(因消費者長時間失效,包含偏移量的記錄已經過時井被刪除)該作何處

  6. enable.auto.commit:該屬性指定了消費者是否自動提交偏移量,預設值是true。

  7. partition.assignment.strategy: 分割槽分配給消費者群組的分配策略,有如下兩種策略:

  Range:該策略會把主題的若干個連續的分割槽分配給消費者.

  RoundRobin:該策略把主題的所有分割槽逐個分配給消費.

  8. client.id:該屬性可以是任意字串, broker 用它來標識從客戶端傳送過來的訊息,通常被用在日誌、度量指標和配額裡。

  9. max.poll.records: 該屬性用於控制單次呼叫call () 方法能夠返回的記錄數量,可以幫你控制在輪詢裡需要處理的資料量。

  10.  receive.buffer.bytes 和send.buffer.bytes: socket 在讀寫資料時用到的TCP 緩衝區也可以設定大小。如果它們被設為-1,就使用作業系統的預設值。如果生產者或消費者與broker處於不同的資料中心內,可以適當增大這些值,因為跨資料中心的網路一般都有比較高的延遲和比較低的頻寬。