centos安裝kafka,以及springboot的整合
阿新 • • 發佈:2018-12-10
- 下載kafkkafka,http://kafka.apache.org/downloads 我這裡下載的版本是 kafka-1.1.0,檔案: kafka_2.11-1.1.0.tgz
- 下載後上傳到/usr/local/kafka,並解壓。解壓後文件路徑為:/usr/local/kafka/kafka_2.11-1.1.0,因為kafka的執行需要用到zookeeper,所以在此之前我們需要安裝下zookeeper,可以參考之前我寫的一篇安裝https://blog.csdn.net/u011890101/article/details/82491770,kafka的配置檔案中預設連線的是本機的zookeeper地址,如果你想連線其他機器上的zk,可進入/usr/local/kafka/kafka_2.11-1.1.0/config 下的server.properties,修改zookeeper.connect=localhost:2181 地址即可
- 啟動kafka,進入/usr/local/kafka/kafka_2.11-1.1.0/bin,執行./kafka-server-start.sh -daemon ../config/server.properties即可開啟kafka
- 建立topic,同樣在/usr/local/kafka/kafka_2.11-1.1.0/bin,執行./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_topic
- springboot整合kafka生產者,建立專案後新增kafka相關依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
在application配置檔案中新增
kafka.producer.servers=kafkaip地址:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960
建立KafkaProducerConfig類,該類程式碼如下
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
建立一個測試controller,注入KafkaTemplate
@Resource private KafkaTemplate template; @RequestMapping("test") public String test() { template.send("test_topic", "helloworld"); return "success"; }
-
springboot整合kafka生產者,建立專案後新增kafka依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
在application配置檔案中新增
spring: kafka: consumer: enable-auto-commit: true group-id: applog auto-offset-reset: latest bootstrap-servers: kafkaIP地址:9092
編寫測試訊息消費類KafkaConsumer
@Component public class KafkaConsumer { @KafkaListener(topics = {"test_topic"}) public void receive(String msg) { System.out.print("receive" + msg); } }
-
對整合好的生產者controller測試api進行訪問,進行訊息測試傳送訪問