1. 程式人生 > >sprigboot 整合使用 kafka

sprigboot 整合使用 kafka

1、環境

Windows10、jdk1.8、idea、zookeeper3.4.12、kafka_2.12-1.0.0

2、zookeeper 叢集搭建

3、kafka叢集搭建

4、整合使用

springboot的配置:

#kafka
#kafka地址 brokers叢集地址用,隔開
spring.kafka.bootstrap-servers=127.0.0.1:9091,127.0.0.1:9092,127.0.0.1:9093
#生產者的配置,大部分我們可以使用預設的,這裡列出幾個比較重要的屬性
#每批次傳送訊息的數量
spring.kafka.producer.batch-size=16
#傳送失敗重試次數
spring.kafka.producer.retries=2
#即32MB的批處理緩衝區
spring.kafka.producer.buffer-memory=33554432
#key序列化方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#消費者的配置
##Kafka中沒有初始偏移或如果當前偏移在伺服器上不再存在時,預設區最新 ,有三個選項 【latest, earliest, none】
spring.kafka.consumer.auto-offset-reset=latest
#是否開啟自動提交
spring.kafka.consumer.enable-auto-commit=true
#自動提交的時間間隔
spring.kafka.consumer.auto-commit-interval=100
#key的解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#value的解碼方式
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#在kafka/config檔案的consumer.properties中有配置
spring.kafka.consumer.group-id=test-consumer-group

kafka訊息topic 自定義:

package com.zh.service.kafka;

/**
 * @author zhangH
 * @date 2018/10/29
 */
public class KFKTopic {
    // default
    public static final String K_DEFAULT = "k_default";
    // user
    public static final String USER_BASE = "USER_BASE";
    public static final String USER_ADDRESS = "USER_ADDRESS";
}

kafka訊息生產者

package com.zh.service.kafka;

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * @author zhangH
 * @date 2018/10/29
 */
@Component
public class KFKMessageProduce {

    @Resource
    private KafkaTemplate kafkaTemplate;

    public void sendMsg(String topic, Object context) {
        try {
            kafkaTemplate.send(topic, context);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

kafka訊息消費者:

package com.zh.service.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author zhangH
 * @date 2018/10/29
 */
@Component
public class KFKMessageConsumer {

    @KafkaListener(topics = {KFKTopic.K_DEFAULT})
    public void listenT(ConsumerRecord record) {
        LoggerFactory.getLogger("kafka").info("當前監聽主題:" + KFKTopic.K_DEFAULT + ", msg : " + record.value().toString());
    }

    @KafkaListener(topics = {KFKTopic.USER_BASE, KFKTopic.USER_ADDRESS})
    public void listenT2(ConsumerRecord record) {
        LoggerFactory.getLogger("kafka").info("當前監聽主題:" + KFKTopic.USER_BASE + ", msg : " + record.value().toString());
    }
}

測試類:

package com.zh.kafkas;

import com.zh.common.BaseTest;
import com.zh.service.kafka.DataBean;
import com.zh.service.kafka.KFKMessageProduce;
import com.zh.service.kafka.KFKTopic;
import org.junit.Test;

import javax.annotation.Resource;

/**
 * @author zhangH
 * @date 2018/10/29
 */

public class KaFKaTest extends BaseTest {
    @Resource
    private KFKMessageProduce kfkMessageProduce;

    @Test
    public void test() {
        for (int i = 0; i < 10; i++) {
            if (i % 3 == 1) {
                kfkMessageProduce.sendMsg(KFKTopic.USER_BASE, new DataBean("test1", i).toString());
            } else if (i % 3 == 2) {
                kfkMessageProduce.sendMsg(KFKTopic.USER_ADDRESS, new DataBean("test2", i).toString());
            } else {
                kfkMessageProduce.sendMsg(KFKTopic.K_DEFAULT, new DataBean("test", i).toString());
            }
        }
    }
}

基礎測試類BaseTest :

package com.zh.common;

import com.zh.SpringBootDemoApplication;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.context.web.WebAppConfiguration;

/**
 * @author zhangH
 * @date 2018/10/29
 * 進行測試,可使用註解等
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringBootDemoApplication.class)
@WebAppConfiguration
public class BaseTest {
}

總結:

kafka跟activemq的使用大同小異,建議針對性學習,一通百通,同類型的針對一個學習,其餘的對比學習,見效快。

一天一點積累,就是想要的,我為我代言。

!!!