1. 程式人生 > >Kafka學習筆記(四)—— API使用

Kafka學習筆記(四)—— API使用

1、Producer API

1.1 訊息傳送流程

Kafka的Producer傳送訊息採用的是非同步傳送的方式。在訊息傳送的過程中,涉及到了兩個執行緒——main執行緒和Sender執行緒,以及一個執行緒共享變數——RecordAccumulator。main執行緒將訊息傳送給RecordAccumulator,Sender執行緒不斷從RecordAccumulator中拉取訊息傳送到Kafka broker。

來一個動圖品品:

注意圖中的三個元件:

  • interceptor:攔截器,後邊寫程式碼會自定義攔截器
  • Serializer:序列化器
  • Partitioner:分割槽器

關於這三個小元件到後邊程式碼中,都會有所體現~

1.2 非同步傳送訊息

中國有句古話:talk is cheap,show me the code ~

1.2.1 簡單的程式碼示例:

1)匯入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2)Producer程式碼:

public class MyProducer {
    public static void main(String[] args) {

        //1.建立Kafka生產者的配置資訊
        Properties properties = new Properties();

        //2.指定Kafka連線的叢集
        properties.put("bootstrap.servers", "hadoop102:9092");

        //3.指定ACK應答級別
        properties.put("acks", "all");

        //4.批次大小,16KB
        properties.put("batch.size", 16384);

        //5.等待時間(即使資料量沒有到達16KB,也會在這之後傳送資料,防止等待時間過長)
        properties.put("linger.ms", 1);

        //6.重試次數
        properties.put("retries", 3);

        //7. RecordAccumulator 緩衝區大小 32MB
        properties.put("buffer.memory", 33554432);

        //8. key value序列化的類
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        //9.建立生產者物件
        KafkaProducer producer = new KafkaProducer<String, String>(properties);

        //10.傳送資料
        for (int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<String, String>("first", "simon-1024"+Integer.toString(i)));
        }
        //11.注意要關閉資源,原因在於:整個程式執行下來不到1毫秒,資料不會被髮送出去。
        producer.close();
    }
}

3)啟動消費者開始消費 (事先建立了topic為first,有2個分割槽,2副本)

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first

4)檢視消費結果:

注意:消費者並不是按照 01234...這樣的順序消費訊息的,這是為什麼呢?體會分割槽的意義!!

1.2.2 帶有回撥函式的send方法

補充:其實send()方法是有過載的,注意看下面這種寫法:

public class CallbackProducer {
    public static void main(String[] args) {

        //1.建立kafka生產者配置資訊
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        //2. 建立生產者物件
        KafkaProducer kafkaProducer = new KafkaProducer<String,String>(properties);

        //3. 傳送資料
        for (int i = 0; i <10 ; i++) {
         kafkaProducer.send(new ProducerRecord("second", "simon-1024--" + i), new Callback() {// send的過載方法,可以有回撥函式
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    //打印出本條資料傳送到哪個分割槽了,偏移量是多少
    System.out.println(recordMetadata.partition()+"   "+recordMetadata.offset());
                }
            });
        }
        kafkaProducer.close();
    }
}

消費結果與上邊一致,下圖是我的執行結果:

直接證明了之前講過的offset並不是全域性唯一的,只保證區內有序。

回撥函式會在 producer 收到 ack 時呼叫,為非同步呼叫,該方法有兩個引數,分別是RecordMetadata 和 Exception,如果 Exception 為 null,說明訊息傳送成功,如果Exception 不為 null,說明訊息傳送失敗。
注意:訊息傳送失敗會自動重試,不需要我們在回撥函式中手動重試。

ProducerRecord的構造方法還有好多個過載,不再一一舉例,如下:

1.2.3 自定義分割槽

如果我們在傳送訊息的時候沒有指定分割槽,那麼Kafka會使用預設的分割槽器,看一下原始碼,分割槽器都幹了些什麼(原始碼分析在註釋中給出).

查閱了一下官方文件,預設的分割槽器為:org.apache.kafka.clients.producer.internals.DefaultPartitioner,直接檢視它計算分割槽的方法:

/**
     * Compute the partition for the given record.
     * 給指定的訊息計算分割槽
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes serialized key to partition on (or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        
        //1. 獲得叢集中的該topic資訊
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        //2. 共有幾個分割槽
        int numPartitions = partitions.size();
        
        //3. 如果待發送的訊息沒有指定key
        if (keyBytes == null) {
            //3.1 做累加操作
            //【為什麼累加呢?比如第一次nextVlue = a,那麼下一次為a+1,實現了輪詢策略】
            int nextValue = nextValue(topic);
            //3.2 獲取所有可用的分割槽(分割槽所在的機器沒掛掉)
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            //3.2.1 如果有可用的分割槽
            if (availablePartitions.size() > 0) {
                // 負數轉正後做摸運算
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                // 返回相應的分割槽數
                return availablePartitions.get(part).partition();
            } 
            //3.2.2 如果沒有可用的分割槽
            else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } 
        
        //4. 如果待發送的訊息指定了key
        else {
            // hash the keyBytes to choose a partition
            //4.1 根據key的雜湊值和分割槽數相與運算,得到分割槽號
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

ok,可以看到整個業務邏輯流程還是很清楚的。那麼我們自己嘗試寫一個分割槽器:

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //所有的訊息發往0號分割槽
        return 0;
    }

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 使得所有的訊息發往0號分割槽
        return 0;
    }

    @Override
    public void close() {

    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

    public void close() {

    }
    @Override
    public void configure(Map<String, ?> configs) {
    }
}

執行程式,觀察回撥函式的執行效果:

1.3 同步傳送訊息

上面說到的都是訊息都是通過非同步的方式傳送的,使用到了main執行緒和sender執行緒。但是如果sender執行緒在工作的時候,我們阻塞住main執行緒,那兩個執行緒實現了序列工作的效果,也就相當於同步傳送了。注意這裡同步的意思是:一條訊息發出去之後,會阻塞當前執行緒,直到返回ack。

由於 send 方法返回的是一個 Future 物件,根據 Futrue 物件的特點,我們也可以實現同步傳送的效果,只需在呼叫 Future 物件的 get 方發即可。瞭解即可,不去深究。對上邊的程式碼進行簡單的改造:

        //傳送資料的程式碼片段
        for (int i = 0; i < 10; i++) {
            
            //send方法返回一個Future物件
            Future future = producer.send(new ProducerRecord<String, String>("sencond", "simon-1024", "hello world " + Integer.toString(i)));
            try {
                //由future物件獲得返回值,並且阻塞住執行緒
                future.get();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }

之前說過Kafka保證的是分割槽有序,而不是全域性有序。如果要保證全域性有序,那麼最直接的方案就是隻用一個分割槽,並且使用同步傳送的方式,保證資料不丟失。

2、Consumer API

Consumer 消費資料時的可靠性是很容易保證的,因為資料在 Kafka 中是持久化的,故不用擔心資料丟失問題。
由於 consumer 在消費過程中可能會出現斷電宕機等故障,consumer 恢復後,需要從故障前的位置繼續消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復後繼續消費。所以 offset 的維護是 Consumer 消費資料是必須考慮的問題。

下面是兩個例子,分別是自動提交offset和手動提交offset

2.1 自動提交offset

1)匯入依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

2)程式碼示例

先解釋一下用到的類:

KafkaConsumer:需要建立一個消費者物件,用來消費資料ConsumerConfig:獲取所需的一系列配置引數
ConsuemrRecord:每條資料都要封裝成一個 ConsumerRecord 物件

為了使我們能夠專注於自己的業務邏輯,Kafka 提供了自動提交offset 的功能。自動提交 offset 的相關引數:
enable.auto.commit:是否開啟自動提交 offset 功能
auto.commit.interval.ms:自動提交 offset 的時間間隔

public class MyConsumer {
    public static void main(String[] args) {

        //1. 建立消費者的配置物件
        Properties properties = new Properties();

        //2. 消費者連線的叢集資訊
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");

        //3. 反序列化訊息的key和value
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        //4. 允許自動提交:拉取到訊息就自動提交offset下標,可能造成資料丟失
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        //5. 自動提交的間隔為1毫秒
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1");

        //6. 設定消費者groupID
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"simon-0");

        //7. 建立消費者物件
        KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer(properties);

        //8. 訂閱的主題,引數是個集合,可以訂閱多個主題
        kafkaConsumer.subscribe(Arrays.asList("sencond"));

        //Tip:迴圈拉取訊息
        while (true){
            //9. 拉取訊息,並且10毫秒拉取一次
            ConsumerRecords<String,String> records = kafkaConsumer.poll(10);

            //10. 解析訊息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset   =   %d, key =   %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
   }
}

先啟動消費者,然後隨便起一個生產者,我就以我之前建立的生產者為例,消費結果如下:

關於ConsumerConfig的屬性AUTO_OFFSET_RESET_CONFIG的補充:

假如有一個消費者,消費到offset = 10訊息,然後關機了。7天之後機器重啟,現在的訊息的offset為1000。現在按道理來說應該從11開始消費,但是Kafka的訊息預設儲存訊息7天,所以現在消費者持有的offset是無效的。

這時AUTO_OFFSET_RESET_CONFIG有兩個值可以選擇:earliest 和 latest,看一眼官方Doc:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer's group
  • anything else: throw exception to the consumer.

Talk is cheap ,show me the code:

//1. 增加一條配置
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
//2. 修改消費者分組,手動使得offset失效
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"simon-1");

先執行一次producer生成一組資料,然後再啟動consumer,可以消費到之前傳送的所有資料!!

2.2 手動提交offset

前面的程式碼演示了自動提交offset,每次消費完成消費者都會提交offset,下次消費從offset+1開始。但是如果關閉自動提交,那麼消費完成也不會提交offset,也就是說重新開啟消費者還會從頭開始消費。

如果消費者採用自動提交,拿到資料之後就提交offset。如果處理資料的時候出現了問題,那麼這個資料就丟失了。

所以,Kafka提供了兩種手動提交 offset 的方法: commitSync(同步提交)和 commitAsync(非同步提交)。兩者相同是,都會將本次 poll 的一批資料最高的偏移量提交;不同是:commitSync阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitAsync 則沒有失敗重試機制,故有可能提交失敗。

2.2.1 同步提交

同步提交的方法為:consumer.commitSync();
同步提交只要不發生不可恢復的錯誤,會一直嘗試至提交成功,因此,會將降低程式的讀取、處理速度。

//關閉自動提交 offset
properties.put("enable.auto.commit", "false");

//加在消費完成程式碼之後,消費者同步提交,當前執行緒會阻塞直到 offset 提交成功
consumer.commitSync();

2.2.2 非同步提交offset

雖然同步提交offset更加安全可靠一點,但是它會造成執行緒的阻塞,直到提交成功。因此吞吐量會受到很大的影響。在更多的情況下,選用非同步提交方式。

非同步提交的方法為:consumer.commitAsync();
非同步提交不會等待broker 的響應,而是隻管傳送,不管是否成功。提高了應用程式吞吐量,但下次讀取訊息的遺失或重複可能性大大提升。

//關閉自動提交 offset
properties.put("enable.auto.commit", "false");

//非同步提交
consumer.commitAsync(new OffsetCommitCallback() {
    @Override
    public void onComplete(Map<TopicPartition,
                           OffsetAndMetadata> offsets, Exception exception) {
        if (exception != null) {
            System.err.println("Commit failed for" +
                               offsets);
        }
    }
});

無論是同步提交還是非同步提交 offset,都有可能會造成資料的漏消費或者重複消費。先提交 offset 後消費,有可能造成資料的漏消費;而先消費後提交 offset,有可能會造成資料的重複消費。

2.3自定義儲存offset

Kafka 0.9 版本之前,offset 儲存在 zookeeper,0.9 版本及之後,預設將 offset 儲存在 Kafka

的一個內建的 topic 中。除此之外,Kafka 還可以選擇自定義儲存 offset。

offset 的維護非常繁瑣,因為需要考慮到消費者的 Rebalance。

當有新的消費者加入消費者組、已有的消費者退出消費者組或者所訂閱的主題的分割槽發生變化,就會觸發到分割槽的重新分配,重新分配的過程叫做 Rebalance。

消費者發生 Rebalance 之後,每個消費者消費的分割槽就會發生變化。因此消費者要首先獲取到自己被重新分配到的分割槽,並且定位到每個分割槽最近提交的 offset 位置繼續消費。

要實現自定義儲存 offset,需要藉助 ConsumerRebalanceListener,以下為示例程式碼,其

中提交和獲取 offset 的方法,需要根據所選的 offset 儲存系統自行實現。

        //消費者訂閱主題
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {

        //該方法會在 Rebalance 之前呼叫
        @Override
        public void
            onPartitionsRevoked(Collection<TopicPartition> partitions) {
            commitOffset(currentOffset);
        }
        //該方法會在 Rebalance 之後呼叫
        @Override
        public void
            onPartitionsAssigned(Collection<TopicPartition> partitions) {
            currentOffset.clear();
            for (TopicPartition partition : partitions) {
                //定位到最近提交的 offset 位置繼續消費
                consumer.seek(partition, getOffset(partition));
            }
        }
    });

    while (true) {
        //消費者拉取資料
        ConsumerRecords<String, String> records = consumer.poll(100);
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("offset = %d, key = %s, value = %s%n",        record.offset(), record.key(), record.value());
            currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
        }
        //非同步提交
        commitOffset(currentOffset);
    }
}
    //獲取某分割槽的最新 offset,比如可以mysql資料庫中獲取
    private static long getOffset(TopicPartition partition) {
        return 0;
    }
    //提交該消費者所有分割槽的 offset,可以將其存入到MySQL中一份
    private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
        
    }

3、自定義攔截器

3.1 攔截器原理

Producer攔截器(interceptor)是在Kafka 0.10版本被引入的,主要用於實現clients端的定製化控制邏輯。

對於producer而言,interceptor使得使用者在訊息傳送前以及producer回撥邏輯前有機會對訊息做一些定製化需求,比如修改訊息等。同時,producer允許使用者指定多個interceptor按序作用於同一條訊息從而形成一個攔截鏈(interceptor chain)。Intercetpor的實現介面是org.apache.kafka.clients.producer.ProducerInterceptor

看一下結構,總共就三個方法,另外還有一個方法繼承自父類

詳細解讀一下各個方法:

  • configure(Map<String, ?> configs) :

    獲取配置資訊和初始化資料時呼叫。

  • onSend(ProducerRecord<K, V> record):

    該方法封裝進KafkaProducer.send方法中,即它執行在使用者主執行緒中。Producer確保在訊息被序列化以及計算分割槽前呼叫該方法。使用者可以在該方法中對訊息做任何操作,但最好保證不要修改訊息所屬的topic和分割槽,否則會影響目標分割槽的計算。

  • onAcknowledgement(RecordMetadata, Exception):

    該方法會在訊息從RecordAccumulator成功傳送到Kafka Broker之後,或者在傳送過程中失敗時呼叫。並且通常都是在producer回撥邏輯觸發之前。onAcknowledgement執行在producer的IO執行緒中,因此不要在該方法中放入很重的邏輯,否則會拖慢producer的訊息傳送效率。

  • close:

    關閉interceptor,主要用於執行一些資源清理工作

3.2 攔截器案例

需求如下:實現一個簡單的雙interceptor組成的攔截鏈。第一個interceptor會在訊息傳送前將時間戳資訊加到訊息value的最前部;第二個interceptor會在訊息傳送後更新成功傳送訊息數或失敗傳送訊息數。

程式碼如下~~:

TimeInterceptor.java

public class TimeInterceptor implements ProducerInterceptor<String,String> {

    /**
     * 在待發送的訊息之前加入時間戳
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(record.topic(), record.partition(),
                record.key(), System.currentTimeMillis() + record.value());
        return producerRecord;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> configs) {}
}

CounterInterceptor.java

public class CounterInterceptor implements ProducerInterceptor<String,String> {

    int success ;
    int error ;

    /**
     * 不改變訊息的內容,直接返回
     * @param record
     * @return
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        return record;
    }

    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {

        if (metadata!=null){
            success++;
        }else {
            error++;
        }
    }

    /**
     * 打印發送成功和失敗的訊息條數
     */
    @Override
    public void close() {
        System.out.println("success :"+success);
        System.out.println("error :"+error);
 }

    @Override
    public void configure(Map<String, ?> configs) {}
}

開啟消費者,消費資料:

看控制檯輸出 ,列印了傳送成功和失敗訊息的條數: