1. 程式人生 > >Kafka程式碼實現--from-beginning,讀取歷史未消費的資料

Kafka程式碼實現--from-beginning,讀取歷史未消費的資料

Kafka實際環境有可能會出現Consumer全部宕機,雖然基於Kafka的高可用特性,消費者群組中的消費者可以實現再均衡,所有Consumer不處理資料的情況很少,但是還是有可能會出現,此時就要求Consumer重啟的時候能夠讀取在宕機期間Producer傳送的資料。基於消費者訂閱模式預設是無法實現的,因為只能訂閱最新發送的資料。

通過消費者命令列可以實現,只要在命令列中加上--from-beginning即可(具體可見文章 Kafka安裝與配置 ),但是通過Java客戶端程式碼如何實現呢?這就要用到訊息偏移量的重定位方法 seek() 或者直接使用 seekToBeginning()

方法,基於再均衡監聽器,在給消費者分配分割槽的時候將訊息偏移量跳轉到起始位置 。

程式碼示例如下:

public class Consumer {
    private static final String server = "192.168.3.22:9092";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", server);
        properties.put("group.id"
, "kafka.group.user"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties, new StringDeserializer(), new StringDeserializer()); consumer.subscribe(Collections.singletonList("kafka.topic.user"), new ConsumerRebalanceListener() { @Override public
void onPartitionsRevoked(Collection<TopicPartition> collection) { } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { Map<TopicPartition,Long> beginningOffset = consumer.beginningOffsets(collection); //讀取歷史資料 --from-beginning for(Map.Entry<TopicPartition,Long> entry : beginningOffset.entrySet()){ // 基於seek方法 //TopicPartition tp = entry.getKey(); //long offset = entry.getValue(); //consumer.seek(tp,offset); // 基於seekToBeginning方法 consumer.seekToBeginning(collection); } } }); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.println("partition:" + record.partition() + ",key:" + record.key() + ",value:" + record.value()); consumer.commitAsync(); } } } catch (Exception e) { e.printStackTrace(); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } } }