1. 程式人生 > >Spring Boot教程十二:整合Kafka

Spring Boot教程十二:整合Kafka

Kafka是最初由Linkedin公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 專案。Kafka中釋出訂閱的物件是topic。我們可以為每類資料建立一個topic,把向topic釋出訊息的客戶端稱作producer,從topic訂閱訊息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫資料。一個kafka叢集由一個或多個broker伺服器組成,它負責持久化和備份具體的kafka訊息。
  • Broker
    :Kafka節點,一個Kafka節點就是一個broker,多個broker可以組成一個Kafka叢集。
  • Topic:一類訊息,訊息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
  • Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列
  • Segment:partition物理上由多個segment組成,每個Segment存著message資訊
  • Producer : 生產message傳送到topic
  • Consumer : 訂閱topic消費message, consumer作為一個執行緒來消費
  • Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 執行緒)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 執行緒 )消費,如果一個message可以被多個consumer(consumer 執行緒 ) 消費的話,那麼這些consumer必須在不同的組。Kafka不支援一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的資料的時候,由於要保證不能多個執行緒拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的效能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer執行緒去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴充套件,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴充套件性,吞吐量極高。這也就形成了分散式消費的概念。

springboot專案中使用kafka的配置如下:

首先引入必須的依賴jar:

 <!-- springboot整合kafka -->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
            <version>4.3.6.RELEASE</version>
            <classifier>sources</classifier>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.7.RELEASE</version>
        </dependency>

配置檔案如下:

#============== kafka ===================
#kafka相關配置
spring.kafka.bootstrap-servers=39.105.104.132:9092
#設定一個預設組
spring.kafka.consumer.group-id=alarmTopic
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量傳送訊息的數量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288

訊息生產者:

/**
 * @author Shuyu.Wang
 * @package:com.ganinfo.kafka
 * @className:
 * @description:生產者
 * @date 2018-07-05 16:30
 **/
@Component
public class KafkaSender {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;
    /** * 傳送訊息到kafka */
    public void sendChannelMess(String channel, String message) {
        kafkaTemplate.send(channel,message);
    }
訊息消費者:
/**
 * @author Shuyu.Wang
 * @package:com.ganinfo.kafka
 * @className:
 * @description:消費者
 * @date 2018-07-05 16:31
 **/
@Component
public class KafkaConsumer {
    @Autowired
    private SendMessageUtil sendMessageUtil;
    /**
     * 監聽alarmTopic主題,有訊息就讀取 *
     * @param message
     */
    @KafkaListener(topics = {"alarmTopic"})
    public void receiveMessage(String message) {
        //收到通道的訊息之後執行秒殺操作
        System.out.println("KafkaConsumer的訂閱訊息:"+message);
        sendMessageUtil.send("h1","1",message);
    }

}

測試類:

    @Autowired
    private KafkaSender kafkaSender;
    private static final String ALRAM_TOPIC = "alarmTopic";

    @Autowired
    private AlarmService alarmService;

    @ResponseBody
    @RequestMapping(value = "/kafka", method = RequestMethod.GET)
    public ApiResult getCarInout(@RequestParam(value = "refId") String refId, @RequestParam(value = "passType") Integer passType, @RequestParam(value = "type") Integer type) {
        ApiResult apiResult = new ApiResult();
        try {
            Map<String, Object> map = new HashMap<>();
            map.put("refId", refId);
            map.put("platformCode", "650106");
            map.put("passType", passType);
            map.put("type", type);
            map.put("level", 2);
            map.put("Gettime", 1000);
            kafkaSender.sendChannelMess(ALRAM_TOPIC, GsonUtil.GsonString(map));
            System.out.println("傳送資料:" + GsonUtil.GsonString(map));
//            kafkaTemplate.send(ALRAM_TOPIC, "alarm", GsonUtil.GsonString(map));
        } catch (Exception e) {
            e.printStackTrace();
        }
        return apiResult;
    }
kafka的相關程式碼就完成了,請求測試方法,就會在消費者的類中列印相關的引數了,另外可以通過外掛檢視kafka的相關主題和消費者情況。