1. 程式人生 > >手把手教你Spring Boot2.x整合kafka

手把手教你Spring Boot2.x整合kafka

#首先得自己搭建一個kafka,搭建教程請自行百度,本人是使用docker搭建了一個單機版的zookeeper+kafka作為演示,文末會有完整程式碼包提供給大家下載參考 ![](https://img2020.cnblogs.com/blog/1543487/202103/1543487-20210302195200449-1015025413.png) #廢話不多說,教程開始 ##一、老規矩,先在pom.xml中新增kafka相關依賴 ##二、在application.yml中新增相關配置 spring: #kafka配置 kafka: #這裡改為你的kafka伺服器ip和埠號 bootstrap-servers: 10.24.19.237:9092 #=============== producer ======================= producer: #如果該值大於零時,表示啟用重試失敗的傳送次數 retries: 0 #每當多個記錄被髮送到同一分割槽時,生產者將嘗試將記錄一起批量處理為更少的請求,預設值為16384(單位位元組) batch-size: 16384 #生產者可用於緩衝等待發送到伺服器的記錄的記憶體總位元組數,預設值為3355443 buffer-memory: 33554432 #key的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer key-serializer: org.apache.kafka.common.serialization.StringSerializer #value的Serializer類,實現類實現了介面org.apache.kafka.common.serialization.Serializer value-serializer: org.apache.kafka.common.serialization.StringSerializer #=============== consumer ======================= consumer: #用於標識此使用者所屬的使用者組的唯一字串 group-id: test-consumer-group #當Kafka中沒有初始偏移量或者伺服器上不再存在當前偏移量時該怎麼辦,預設值為latest,表示自動將偏移重置為最新的偏移量 #可選的值為latest, earliest, none auto-offset-reset: earliest #消費者的偏移量將在後臺定期提交,預設值為true enable-auto-commit: true #如果'enable-auto-commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),預設值為5000。 auto-commit-interval: 100 #金鑰的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #值的反序列化器類,實現類實現了介面org.apache.kafka.common.serialization.Deserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer ##三、新增操作kafka的工具類KafkaUtils.java(這裡我只是簡單的封裝了一些方法,大家可以根據需要自行新增需要的方法) package com.example.study.util; import com.google.common.collect.Lists; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; /** * 操作kafka的工具類 * * @author [email protected] * @date 2021/3/2 14:52 */ @Component public class KafkaUtils { @Value("${spring.kafka.bootstrap-servers}") private String springKafkaBootstrapServers; private AdminClient adminClient; @Autowired private KafkaTemplate kafkaTemplate; /** * 初始化AdminClient * '@PostConstruct該註解被用來修飾一個非靜態的void()方法。 * 被@PostConstruct修飾的方法會在伺服器載入Servlet的時候執行,並且只會被伺服器執行一次。 * PostConstruct在建構函式之後執行,init()方法之前執行。 */ @PostConstruct private void initAdminClient() {