1. 程式人生 > >Spring Boot、kafka、spring-kafka 生產者消費者實踐(從搭建kafka叢集開始)

Spring Boot、kafka、spring-kafka 生產者消費者實踐(從搭建kafka叢集開始)

一、搭建kafka叢集

參考文件:http://kafka.apache.org/quickstart 官方文件講的很詳細,而且沒坑,照著做很快就可以搭好

注意點 or 建議:

1、在Linux下,啟動的kafka叢集經常無故退出,看日誌也沒有報錯,就是啟動了關閉流程,正常關閉。

      解決方案:用守護程序啟動,參考:https://blog.csdn.net/xiaoyu_bd/article/details/52268659

bin/kafka-server-start.sh  -daemon  config/server.properties > k0.log &

2、kafka各項配置以及預設值說明:http://kafka.apache.org/documentation/#configuration  還是官方文件

3、kafka是Java程序,因此可以用 jps 命令方便的檢視對應的埠。

4、以下配置為 監聽地址,預設是 localhost,這樣的話無法遠端連線,需要配置為 特定的IP地址,然後用配置的IP來連線。

listeners=PLAINTEXT://172.17.10.89:9092

如上配置,本地連線也需要 使用 172.17.10.89:9092這個地址,而不是 localhost。

二、kafka介紹

參考文件:

http://kafka.apache.org/intro 官方文件  

                  https://www.jianshu.com/p/d3e963ff8b70  網友的中文版本,很詳細,但要注意有些配置在新版本中發生了變化,比如:                              auto.create.topics.auto 變成了  auto.create.topics.enable  並且預設值是 true topic不存在時,按照預設配置建立topic

                  https://www.jianshu.com/p/4e00dff97f39  關閉 offset自動提交,讓  spring-kafka 來提交offset

                  https://blog.csdn.net/lishuangzhe7047/article/details/74530417  kafka auto.offset.reset值詳解,offset缺失情況下的消費者消                      費策略

                  網上文件很多,不要猶豫該看哪些,而是都看下,對照著看。

三、Spring Boot 整合 kafka 

pom spring-kafka 的版本要稍微注意下 我用的 kafka_2.11-2.0.0 配合 spring-kafka-1.1.3.RELEASE 可以正常使用,但是用  spring-kafka-2.xxx的時候 專案無法啟動,會報錯:not found

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.3.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <version>1.18.0</version>
   <scope>provided</scope>
</dependency>

yml配置:

spring:
  kafka:
    bootstrap-servers: 172.17.10.89:9092,172.17.10.89:9093,172.17.10.89:9094
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 65536
      buffer-memory: 524288
    consumer:
      group-id: default-group   #預設組id  後面會配置多個消費者組
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      auto-offset-reset: latest  
      enable-auto-commit: false   #關閉自動提交 改由spring-kafka提交
      auto-commit-interval: 100
      max-poll-records: 20      #批量消費 一次接收的最大數量

bootstrap-servers 這邊配置的地址 可以事先看下 是否可以訪問到對應的埠

配置讀取類:

import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * @author fandong
 * @create 2018/11/1
 */
@Configuration
@Data
public class KafkaConsumerProps {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String defaultGroupId;

    @Value("${spring.kafka.consumer.auto-offset-reset}")
    private String autoOffsetReset;

    @Value("${spring.kafka.consumer.enable-auto-commit}")
    private String enableAutoCommit;

    @Value("${spring.kafka.consumer.auto-commit-interval}")
    private String autoCommitInterval;

    @Value("${spring.kafka.consumer.max-poll-records}")
    private String maxPollRecords;

    public KafkaConsumerProps() {
    }
}

消費者配置類,配置多個消費者組、批量消費、併發數

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author fandong
 * @create 2018/11/1
 */
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private KafkaConsumerProps kafkaConsumerProps;
    
    private static final String GROUP0_ID = "group0";
    private static final String GROUP1_ID = "group1";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory0() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory0());
        //對應topic分割槽數
        factory.setConcurrency(3);
        //設定為批量消費,每個批次數量在Kafka配置引數中設定ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory0() {
        Map<String, Object> map = consumerConfigs();
        map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP0_ID);
        return new DefaultKafkaConsumerFactory<>(map);
    }

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory1());
        //對應topic分割槽數
        factory.setConcurrency(3);
        //設定為批量消費,每個批次數量在Kafka配置引數中設定ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory1() {
        Map<String, Object> map = consumerConfigs();
        map.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1_ID);
        return new DefaultKafkaConsumerFactory<>(map);
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>(16);
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProps.getBootstrapServers());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaConsumerProps.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaConsumerProps.getAutoCommitInterval());
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProps.getDefaultGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaConsumerProps.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerProps.getMaxPollRecords());
        return propsMap;
    }

}

注意:

1、factory.setConcurrency(3);  此處設定的目的在於:假設 topic test 下有 0、1、2三個 partition,Spring Boot中只有一個 @KafkaListener() 消費者訂閱此 topic,此處設定併發為3,啟動後 會有三個不同的消費者分別訂閱 p0、p1、p2,本地實際有三個消費者執行緒。而 factory.setConcurrency(1); 的話 本地只有一個消費者執行緒, p0、p1、p2被同一個消費者訂閱。由於 一個partition只能被同一個消費者組下的一個消費者訂閱,對於只有一個 partition的topic,即使設定 併發為3,也只會有一個消費者,多餘的消費者沒有 partition可以訂閱。

2、factory.setBatchListener(true); 設定批量消費 ,每個批次數量在Kafka配置引數ConsumerConfig.MAX_POLL_RECORDS_CONFIG中配置,限制的是 一次批量接收的最大條數,而不是 等到達到最大條數才接收,這點容易被誤解。實際測試時,接收是實時的,當生產者大量寫入時,一次批量接收的訊息數量為 配置的最大條數。

生產者我們藉助自動配置,在yml檔案中加入生產者配置之後,直接注入 KafkaTemplate 即可使用。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFuture;

import java.util.List;

/**
 * @author fandong
 * @create 2018/11/1
 */
@Service
public class KafkaServiceImpl implements KafkaService {

    private final KafkaTemplate<String, String> kafkaTemplate;
    private final Logger logger = LoggerFactory.getLogger(this.getClass().getName());

    @Autowired
    public KafkaServiceImpl(KafkaTemplate kafkaTemplate) {
        this.kafkaTemplate = (KafkaTemplate<String, String>) kafkaTemplate;
    }

    @Override
    public void send(String topic, String value) {
        ListenableFuture<SendResult<String, String>> resultListenableFuture = kafkaTemplate.send(topic, value);
        resultListenableFuture.addCallback(
                successCallback -> logger.info("傳送成功:topic= " + topic + " value= " + value),
                failureCallback -> logger.info("傳送失敗:topic= " + topic + " value= " + value));
    }

    @Override
    @KafkaListener(topics = {"test"}, containerFactory = "kafkaListenerContainerFactory0")
    public void kafkaConsumerTest(String msg) {
        logger.info("接收到訊息--" + msg);
    }

    @Override
    @KafkaListener(topics = {"3-test"}, containerFactory = "kafkaListenerContainerFactory0")
    public void listenPartition0(List<ConsumerRecord<String, String>> records) {
        System.out.println(records.size());
        for (ConsumerRecord<String, String> consumerRecord : records){
            String value = consumerRecord.value();
            logger.info("a 訊息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() );
        }
    }

    @Override
    @KafkaListener(topics = {"3-test"}, containerFactory = "kafkaListenerContainerFactory1")
    public void listenPartition2(List<ConsumerRecord<String, String>> records) {
        System.out.println(records.size());
        for (ConsumerRecord<String, String> consumerRecord : records){
            String value = consumerRecord.value();
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            logger.info("c 訊息:partition " + consumerRecord.partition() + " value " + consumerRecord.value() + " thread id " + Thread.currentThread().getName());
        }
    }


}

指定  containerFactory = "kafkaListenerContainerFactory1" 引數給消費者分組,值為 之前定義的 KafkaListenerContainerFactory的 Bean 名稱,不指定的情況下 預設是 方法名稱。

注:topic  3-test下有 3個partition,由於之前配置了 factory.setConcurrency(3); 專案啟動之後,本地會有三個消費者執行緒。

使用如下命令可以檢視各個消費者組的情況  以下為檢視  group0消費組,可以看到每個partition由不同的消費者訂閱。

bin/kafka-consumer-groups.sh --bootstrap-server 172.17.10.89:9092 --describe --group group0

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            0          12              12              0               consumer-1-9419e037-1501-4f33-85c9-25df75e3a5a9 /172.17.10.33   consumer-1
3-test          1          1400            1400            0               consumer-8-1a520c8e-c412-4178-a76f-77c68e7472b7 /172.17.10.33   consumer-8
3-test          0          1398            1398            0               consumer-7-ede99b63-ea10-48dd-be62-61139360e39c /172.17.10.33   consumer-7
3-test          2          1397            1397            0               consumer-9-4bacc023-7de7-4e98-aefd-bb39d0bf6547 /172.17.10.33   consumer-9

四、關於提高消費者消費能力的思考

kafka寫具有很好的效能,而消費者在消費時往往會有相對耗時的操作,所以經常出現 消費者效能跟不上的情況。

思路:

1、在topic下適當建立多個 partition,然後使用多個消費者來消費多個partition

2、使用批量消費,一次接收 多條訊息,相比一個一個接收,(猜)可以減少IO次數,提高速度

3、消費者再使用 執行緒池配合適當長度的阻塞佇列,進一步提高處理能力(需要分析任務型別以及考慮處理器的能力)。