1. 程式人生 > >SpringBoot開發案例之整合Kafka實現訊息佇列

SpringBoot開發案例之整合Kafka實現訊息佇列

  前言

  最近在做一款秒殺的案例,涉及到了同步鎖、資料庫鎖、分散式鎖、程序內佇列以及分散式訊息佇列,這裡對SpringBoot整合Kafka實現訊息佇列做一個簡單的記錄。

  Kafka簡介

  Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。 這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。 這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的訊息。

  Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,有如下特性:

  通過O(1)的磁碟資料結構提供訊息的持久化,這種結構對於即使數以TB的訊息儲存也能夠保持長時間的穩定效能。

  高吞吐量:即使是非常普通的硬體Kafka也可以支援每秒數百萬的訊息。

  支援通過Kafka伺服器和消費機叢集來分割槽訊息。

  支援Hadoop並行資料載入。

 

  術語介紹

  Broker

  Kafka叢集包含一個或多個伺服器,這種伺服器被稱為broker

  Topic

  每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)

  Partition

  Partition是物理上的概念,每個Topic包含一個或多個Partition.

  Producer

  負責釋出訊息到Kafka broker

  Consumer

  訊息消費者,向Kafka broker讀取訊息的客戶端。

  Consumer Group

  每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。Kafka安裝

  Kafka需要依賴JAVA環境執行,如何安裝JDK這裡不做介紹。

  下載kafka:

  wget http://mirror.bit.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

  將包下載到執行目錄並解壓:

  cd /usr/local/tar -xzvf kafka_2.11-0.10.0.1.tgz

  修改kafka配置檔案:

  cd kafka_2.11-0.10.0.1/config/#編輯配置檔案vi server.propertiesbroker.id=0#埠號、記得開啟埠,雲伺服器要開放安全組port=9092#伺服器IP地址,修改為自己的伺服器IPhost.name=127.0.0.1#zookeeper地址和埠, Kafka支援內建的Zookeeper和引用外部的Zookeeperzookeeper.connect=localhost:2181

  分別啟動 kafka 和 zookeeper:

  ./zookeeper-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/zookeeper.properties ./kafka-server-start.sh /usr/local/kafka_2.11-0.10.0.1/config/server.properties

  SpringBoot整合

  pom.xml引入:

  !--kafka支援--dependency groupIdorg.springframework.kafka/groupId artifactIdspring-kafka/artifactId version1.3.5.RELEASE/version!--$NO-MVN-MAN-VER$--/dependency

  application.properties配置:

  #kafka相關配置spring.kafka.bootstrap-servers=192.168.1.180:9092#設定一個預設組spring.kafka.consumer.group-id=0#key-value序列化反序列化spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializerspring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializerspring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer#每次批量傳送訊息的數量spring.kafka.producer.batch-size=65536spring.kafka.producer.buffer-memory=524288

  生產者KafkaSender:

  /** * 生產者 * @author 科幫網 By https://blog.52itstyle.com */@Componentpublic class KafkaSender { @Autowired private KafkaTemplateString,String kafkaTemplate; /** * 傳送訊息到kafka */ public void sendChannelMess(String channel, String message){ kafkaTemplate.send(channel,message); }}

  消費者:

  /** * 消費者 spring-kafka 2.0 + 依賴JDK8 * @author 科幫網 By https://blog.52itstyle.com */@Componentpublic class KafkaConsumer { /** * 監聽seckill主題,有訊息就讀取 * @param message */ @KafkaListener(topics = {seckill}) public void receiveMessage(String message){ //收到通道的訊息之後執行秒殺操作 }}