1. 程式人生 > >centos安裝kafka,以及springboot的整合

centos安裝kafka,以及springboot的整合

  1. 下載kafkkafka,http://kafka.apache.org/downloads 我這裡下載的版本是 kafka-1.1.0,檔案: kafka_2.11-1.1.0.tgz
  2. 下載後上傳到/usr/local/kafka,並解壓。解壓後文件路徑為:/usr/local/kafka/kafka_2.11-1.1.0,因為kafka的執行需要用到zookeeper,所以在此之前我們需要安裝下zookeeper,可以參考之前我寫的一篇安裝https://blog.csdn.net/u011890101/article/details/82491770,kafka的配置檔案中預設連線的是本機的zookeeper地址,如果你想連線其他機器上的zk,可進入/usr/local/kafka/kafka_2.11-1.1.0/config 下的server.properties,修改zookeeper.connect=localhost:2181 地址即可
  3. 啟動kafka,進入/usr/local/kafka/kafka_2.11-1.1.0/bin,執行./kafka-server-start.sh -daemon ../config/server.properties即可開啟kafka
  4. 建立topic,同樣在/usr/local/kafka/kafka_2.11-1.1.0/bin,執行./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
  5. springboot整合kafka生產者,建立專案後新增kafka相關依賴
    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    		</dependency>

    在application配置檔案中新增

    kafka.producer.servers=kafkaip地址:9092
    kafka.producer.retries=0
    kafka.producer.batch.size=4096
    kafka.producer.linger=1
    kafka.producer.buffer.memory=40960

    建立KafkaProducerConfig類,該類程式碼如下

    @Configuration
    @EnableKafka
    public class KafkaProducerConfig {
    
        @Value("${kafka.producer.servers}")
        private String servers;
        @Value("${kafka.producer.retries}")
        private int retries;
        @Value("${kafka.producer.batch.size}")
        private int batchSize;
        @Value("${kafka.producer.linger}")
        private int linger;
        @Value("${kafka.producer.buffer.memory}")
        private int bufferMemory;
    
        public Map<String, Object> producerConfigs() {
            Map<String, Object> props = new HashMap<>();
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
            props.put(ProducerConfig.RETRIES_CONFIG, retries);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
            props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            return props;
        }
    
        public ProducerFactory<String, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }
    
        @Bean
        public KafkaTemplate<String, String> kafkaTemplate() {
            return new KafkaTemplate<String, String>(producerFactory());
        }
    }

    建立一個測試controller,注入KafkaTemplate

        @Resource
        private KafkaTemplate template;
    
        @RequestMapping("test")
        public String test() {
            template.send("test_topic", "helloworld");
            return "success";
        }

     

  6. springboot整合kafka生產者,建立專案後新增kafka依賴

    		<dependency>
    			<groupId>org.springframework.kafka</groupId>
    			<artifactId>spring-kafka</artifactId>
    		</dependency>

    在application配置檔案中新增

    spring:
      kafka:
        consumer:
          enable-auto-commit: true
          group-id: applog
          auto-offset-reset: latest
          bootstrap-servers: kafkaIP地址:9092

    編寫測試訊息消費類KafkaConsumer

    @Component
    public class KafkaConsumer {
    
        @KafkaListener(topics = {"test_topic"})
        public void receive(String msg) {
            System.out.print("receive" + msg);
        }
    }

     

  7. 對整合好的生產者controller測試api進行訪問,進行訊息測試傳送訪問