1. 程式人生 > >springboot整合kafka實現訊息的生產與消費--訊息的生產

springboot整合kafka實現訊息的生產與消費--訊息的生產

由於工作需要,最近在研究springboot整合kafka。做一個分散式的同步應用程式。springboot整合kafka須注意版本。對於springboot 1.5版本之前的話,需要自己去配置java configuration,而1.5版本以後則提供了auto config,具體詳見org.springframework.boot.autoconfigure.kafka這個包,springboot整合kafka的預設配置都在這個包裡面。


springboot實現kafka的訊息生產者

    從kafka的角度來看,訊息生產者要做的事情無非就是把訊息傳送到指定的topic。這個流程還是比較簡單的。

    spring-kafka提供了KafkaTemplate包裝了一個生產者,並提供了方便的方法將資料傳送到kafka的主題。

     KafkaTemplate提供的傳送訊息方法:

    從方法傳的引數我們知道KafkaTemplate可以向指定主題、分割槽,還有主題中的Key(如果有的話)傳送訊息。利用Springboot(1.5版本及以上)整合kafka實現訊息傳送比較簡單。

1、建立一個springboot工程。匯入kafka相關依賴,注意spring版本與kafka對應版本需要對應。我的springboot版本是2.0.2,spring-kafka的版本是2.1.0。

    

新增spring-kafka依賴:

<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>2.1.0.RELEASE</version>
</dependency>
Kafka生產者配置類:
package com.rose.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import 
org.apache.kafka.common.serialization.StringSerializer; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka public class KafkaProducerConfig { //從配置檔案中引入Kafka生產這的相關配置的值 //kakfa服務端地址 @Value("${kafka.producer.servers}") private String servers; //訊息傳送失敗重試次數 @Value("${kafka.producer.retries}") private int retries; //訊息批量傳送容量 @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; //快取容量 @Value("${kafka.producer.buffer.memory}") private int bufferMemory; /** * 生產者相關配置 * @return */ public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); System.out.println("-----------------servers---------"); System.out.println(servers); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } /** * 生產者建立工廠 * @return */ public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } /** * kafkaTemplate 覆蓋預設配置類中的kafkaTemplate * @return */ @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }

其中@EnableKafka表明使用springboot預設的Kafka配置,對應KafkaAutoConfiguration這個配置類。這個類中配置了一些建立Kafka生產者與消費者的Bean。如果使用預設則不作更改,如需更改覆蓋相同的Bean即可。這也符合springboot的約定優於配置的原則。


之前的部落格已經介紹過如何搭建簡單的kafka叢集以及建立Topic等。Kafka叢集簡單部署

建立Topic kafkaTest。

[[email protected] kafka_2.11-1.0.0]$ bin/kafka-topics.sh --create --zookeeper 192.168.66.94:2181 --replication-factor 1 --partitions 1 --topic kafkaTest

下面進行簡單的傳送訊息測試,通過springboot中的生產這向kafka叢集傳送訊息。

訪問http://localhost:7004/sendData/sendMessageTest?message=kafkaTest Message

訊息傳送成功。在kafka客戶端用命令檢視kafkaTest主題下的訊息:

[[email protected] kafka_2.11-1.0.0]# bin/kafka-console-consumer.sh --zookeeper 192.168.66.94:2181 --topic kafkaTest --from-beginning


訊息傳送成功。springboot整合Kafka實現訊息傳送還是比較簡單的。我在嘗試的時候也遇到過一些小問題,比如專案跑起來後傳送訊息報org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms的錯誤。這種錯誤一般是一下三種原因造成的。我的是由於linux防火牆為關閉(或者開放相應的埠也可)。

問題原因:

1. java client 包與kafka server 版本不一致


2、/kafka_2.11-0.9.0.0/config/server.properties 


listerners 需配置ip ,不能配置主機名,因本地Hosts中不存在對應的Ip配置,導致producer 無法連線

解決辦法:linux防火牆未關閉,導致連不上kafka伺服器。

  linux防火牆關閉: service iptables stop 

springboot整合Kafka實現訊息的生產與消費專案地址:專案Github地址。專案中已經完成了springboot整合kafka實現訊息傳送與生產。本文中只介紹了訊息的傳送,訊息的消費將在後續進行總結。

小結:學習新技術最好先了解一下技術的大體框架和設計思想。多看看相關的官方API文件。官方文件應該是比較靠譜的。就是英文看著有點頭疼!下個詞典就好了,意思也能看個大概。spring_kafka官方文件