1. 程式人生 > >Spring中手動開啟kafka監聽.md

Spring中手動開啟kafka監聽.md

0. 背景重現

最近搭建一個新專案,基於SpringBoot框架,使用Kafka做訊息中介軟體。
使用@KafkaListener註解來建立一個消費者,實現對Kafka訊息的消費。我計劃的執行順序是這樣的:服務啟動之後,建立Consumer例項,執行loadResourceConfig初始化方法,之後才開始消費Kafka的訊息。
但是出現了一個問題:沒有等loadResourceConfig方式執行完畢,@KafkaListener就開始消費訊息了。
這顯然不是我們期望的,下面是大概的程式碼:

@Component      
public  class Consumer{
    @PostConstruct
    private void loadResourceConfig () {//載入資料
        // 載入資源配置
     }
    /**
     * 接收資料處理
     * @param record
     */
    @KafkaListener(id = "device-data",topics = {"${DataTopic}"})
    public void listen(ConsumerRecord<String, ?> record) {
        Optional kafkaMessage = Optional.ofNullable(record.value());
        Optional<String> kafkaKey = Optional.ofNullable(record.key());
        if (kafkaKey.isPresent()) {
            Object value = kafkaMessage.get();
            String gatewayId = kafkaKey.get();
              //使用 載入的資源資訊對資料進行處理
        }
    }
}

1.原因分析

@KafkaListener這個註解所標註的方法並沒有在IOC容器中註冊為Bean,而是會被註冊在KafkaListenerEndpointRegistry中,KafkaListenerEndpointRegistry在SpringIOC中已經被註冊為Bean,具體可以看一下該類的原始碼,當然不是使用註解方式註冊。
KafkaListenerEndpointRegistry註冊完Kafka中的topic之後,就會自動啟動監聽容器,如此KafkaListener註解的方法就開始消費訊息了。這個過程可能在自定義Bean建立完成之前執行。

知道了問題,以及原因,解決方法就比較簡單了,我們只需要完成2點:
1.禁止KafkaListener自啟動(AutoStartup)
2.手動啟動單個Kafka的topic的監聽

2.解決方法

@Component      
public    class Consumer{
	@Autowired
     KafkaManager  kafkaManager;
     @PostConstruct
     private void loadResourceConfig () {//載入資料
        // 載入資源配置
        kafkaManager.startListener();//開啟topic的監聽
    }

}


@Component
public class KafkaManager {
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    @Autowired
    private ConsumerFactory consumerFactory;

    @Bean
    public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
        ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
        container.setConsumerFactory(consumerFactory);
        //禁止自動啟動
        container.setAutoStartup(false);
        return container;
    }

    /**
     * 開啟kafka監聽
     */
    public void startListener() {
        if (!registry.getListenerContainer("device-data").isRunning()) {
            registry.getListenerContainer("device-data").start();
        }
        registry.getListenerContainer("device-data").resume();
   
}

上面的程式碼做了幾件事:
1.使用ConsumerFactory 構建Kafka監聽容器工廠ConcurrentKafkaListenerContainerFactory
2.Kafka監聽容器工廠註冊為Bean
3.禁止Kafka監聽容器自動啟動
4.在loadResourceConfig方法載入完成資源之後,呼叫startListener方法,手動啟動Kafka容器監聽。注意registry.getListenerContainer(“device-data”)的引數,就是 @KafkaListener註解中的id引數。
5.startListener中我們先判斷容器是否執行(isRunning),如果沒有則呼叫start方法啟動。 resume方法是恢復執行。這樣寫的目的是,即便startListener多次執行,也沒有問題。