sprigboot 整合使用 kafka
阿新 • • 發佈:2018-12-18
1、環境
Windows10、jdk1.8、idea、zookeeper3.4.12、kafka_2.12-1.0.0
2、zookeeper 叢集搭建
3、kafka叢集搭建
4、整合使用
springboot的配置:
#kafka #kafka地址 brokers叢集地址用,隔開 spring.kafka.bootstrap-servers=127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093 #生產者的配置,大部分我們可以使用預設的,這裡列出幾個比較重要的屬性 #每批次傳送訊息的數量 spring.kafka.producer.batch-size=16 #傳送失敗重試次數 spring.kafka.producer.retries=2 #即32MB的批處理緩衝區 spring.kafka.producer.buffer-memory=33554432 #key序列化方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer #消費者的配置 ##Kafka中沒有初始偏移或如果當前偏移在伺服器上不再存在時,預設區最新 ,有三個選項 【latest, earliest, none】 spring.kafka.consumer.auto-offset-reset=latest #是否開啟自動提交 spring.kafka.consumer.enable-auto-commit=true #自動提交的時間間隔 spring.kafka.consumer.auto-commit-interval=100 #key的解碼方式 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer #value的解碼方式 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer #在kafka/config檔案的consumer.properties中有配置 spring.kafka.consumer.group-id=test-consumer-group
kafka訊息topic 自定義:
package com.zh.service.kafka; /** * @author zhangH * @date 2018/10/29 */ public class KFKTopic { // default public static final String K_DEFAULT = "k_default"; // user public static final String USER_BASE = "USER_BASE"; public static final String USER_ADDRESS = "USER_ADDRESS"; }
kafka訊息生產者
package com.zh.service.kafka; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * @author zhangH * @date 2018/10/29 */ @Component public class KFKMessageProduce { @Resource private KafkaTemplate kafkaTemplate; public void sendMsg(String topic, Object context) { try { kafkaTemplate.send(topic, context); } catch (Exception e) { e.printStackTrace(); } } }
kafka訊息消費者:
package com.zh.service.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.slf4j.LoggerFactory; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; /** * @author zhangH * @date 2018/10/29 */ @Component public class KFKMessageConsumer { @KafkaListener(topics = {KFKTopic.K_DEFAULT}) public void listenT(ConsumerRecord record) { LoggerFactory.getLogger("kafka").info("當前監聽主題:" + KFKTopic.K_DEFAULT + ", msg : " + record.value().toString()); } @KafkaListener(topics = {KFKTopic.USER_BASE, KFKTopic.USER_ADDRESS}) public void listenT2(ConsumerRecord record) { LoggerFactory.getLogger("kafka").info("當前監聽主題:" + KFKTopic.USER_BASE + ", msg : " + record.value().toString()); } }
測試類:
package com.zh.kafkas; import com.zh.common.BaseTest; import com.zh.service.kafka.DataBean; import com.zh.service.kafka.KFKMessageProduce; import com.zh.service.kafka.KFKTopic; import org.junit.Test; import javax.annotation.Resource; /** * @author zhangH * @date 2018/10/29 */ public class KaFKaTest extends BaseTest { @Resource private KFKMessageProduce kfkMessageProduce; @Test public void test() { for (int i = 0; i < 10; i++) { if (i % 3 == 1) { kfkMessageProduce.sendMsg(KFKTopic.USER_BASE, new DataBean("test1", i).toString()); } else if (i % 3 == 2) { kfkMessageProduce.sendMsg(KFKTopic.USER_ADDRESS, new DataBean("test2", i).toString()); } else { kfkMessageProduce.sendMsg(KFKTopic.K_DEFAULT, new DataBean("test", i).toString()); } } } }
基礎測試類BaseTest :
package com.zh.common; import com.zh.SpringBootDemoApplication; import org.junit.runner.RunWith; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import org.springframework.test.context.web.WebAppConfiguration; /** * @author zhangH * @date 2018/10/29 * 進行測試,可使用註解等 */ @RunWith(SpringRunner.class) @SpringBootTest(classes = SpringBootDemoApplication.class) @WebAppConfiguration public class BaseTest { }
總結:
kafka跟activemq的使用大同小異,建議針對性學習,一通百通,同類型的針對一個學習,其餘的對比學習,見效快。
一天一點積累,就是想要的,我為我代言。
!!!