關於使用Spring Boot的Kafka教程 - DZone大資料
關於Kafka背後的體系結構及其pub-sub模型的教程,以及我們如何使用流行的Java框架Spring Boot。
Apache Kafka是一個分散式流媒體平臺,具有釋出和訂閱記錄流,以容錯方式儲存記錄以及處理該記錄流等功能。
它用於構建實時流資料管道,可以執行功能,例如將記錄流從一個應用程式可靠地傳遞到另一個應用程式,以及處理記錄並將其傳輸到目標應用程式。
TOPIC
Kafka作為一個或多個伺服器中的叢集執行,叢集儲存/檢索名為Topics的Feed /類別中的記錄。主題中的每條記錄都儲存有鍵,值和時間戳。
主題可以包含零個,一個或多個使用者,他們將訂閱寫入該主題的資料。在Kafka術語中,主題Topic始終是多使用者multi-subscriber feed的一部分。
分割槽
Kafka群集為每個主題使用分割槽日誌。
分割槽維護資料的插入順序,一旦將記錄釋出到主題,它將保留在那裡,具體取決於保留期(可配置)。記錄始終附加在分割槽的末尾。它維護一個名為“offsets”的標誌,用於唯一標識分割槽中的每條記錄。
偏移量由消費者應用程式控制。使用偏移量,消費者可能會回溯到較舊的偏移量並在需要時重新處理記錄。
生產者
記錄流(即資料)由生產者釋出到主題。他們還可以在將資料釋出到主題時分配分割槽。生產者可以迴圈方式傳送資料,或者可以基於根據記錄的優先順序將記錄傳送到某些分割槽來實現優先順序系統。
消費者
消費者消費使用主題的記錄。它們是基於消費者組的概念,其中一些消費者被分配在群組中。釋出到主題的記錄是分配一個消費者組,再傳遞給其中消費者的一個例項。Kafka內部使用消費者組內消費記錄的機制。消費者的每個例項將獲得特定分割槽日誌,使得在消費者組內,記錄可以由每個消費者並行處理。
Spring Boot Kafka
Spring為Kafka提供了很好的支援,並提供了與原生Kafka Java客戶端一起使用的抽象層。
我們可以新增以下依賴項來開始使用Spring Boot和Kafka。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.3.RELEASE</version> </dependency>
要下載並安裝Kafka,請參閱官方指南https://kafka.apache.org/quickstart 。
下載Kafka後,您可以發出命令來啟動ZooKeeper,Kafka使用它來儲存元資料。
zookeeper-server-start.bat .\config\zookeeper.properties
接下來,我們需要通過發出以下命令在本地啟動Kafka叢集。
kafka-server-start.bat .\config\server.properties
現在,預設情況下,Kafka伺服器啟動在localhost:9092
編寫一個簡單的REST控制器並公開一個端點, /publish如下所示。它用於將訊息釋出到主題。
<b>package</b> com.rahul.kafkaspringboot.controllers; <b>import</b> com.rahul.kafkaspringboot.services.Producer; <b>import</b> org.springframework.beans.factory.annotation.Autowired; <b>import</b> org.springframework.web.bind.annotation.PostMapping; <b>import</b> org.springframework.web.bind.annotation.RequestMapping; <b>import</b> org.springframework.web.bind.annotation.RequestParam; <b>import</b> org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping(value = <font>"/kafka"</font><font>) <b>public</b> <b>class</b> KafkaController { <b>private</b> <b>final</b> Producer producer; @Autowired <b>public</b> KafkaController(Producer producer) { <b>this</b>.producer = producer; } @PostMapping(value = </font><font>"/publish"</font><font>) <b>public</b> <b>void</b> sendMessageToKafkaTopic(@RequestParam(</font><font>"message"</font><font>) String message){ <b>this</b>.producer.sendMessage(message); } } </font>
然後我們可以編寫使用Spring的生產者 KafkaTemplate 將訊息傳送到一個名為的主題 users, 如下所示。
<b>package</b> com.rahul.kafkaspringboot.services; <b>import</b> org.slf4j.Logger; <b>import</b> org.slf4j.LoggerFactory; <b>import</b> org.springframework.beans.factory.annotation.Autowired; <b>import</b> org.springframework.kafka.core.KafkaTemplate; <b>import</b> org.springframework.stereotype.Service; @Service <b>public</b> <b>class</b> Producer { <b>private</b> <b>static</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Producer.<b>class</b>); <b>private</b> <b>static</b> <b>final</b> String TOPIC = <font>"users"</font><font>; @Autowired <b>private</b> KafkaTemplate<String,String> kafkaTemplate; <b>public</b> <b>void</b> sendMessage(String message){ logger.info(String.format(</font><font>"$$ -> Producing message --> %s"</font><font>,message)); <b>this</b>.kafkaTemplate.send(TOPIC,message); } } </font>
我們還可以編寫如下所示的消費者,消費者使用主題使用者的訊息並將日誌輸出到控制檯。
<b>package</b> com.rahul.kafkaspringboot.services; <b>import</b> org.slf4j.Logger; <b>import</b> org.slf4j.LoggerFactory; <b>import</b> org.springframework.kafka.annotation.KafkaListener; <b>import</b> org.springframework.stereotype.Service; @Service <b>public</b> <b>class</b> Consumer { <b>private</b> <b>final</b> Logger logger = LoggerFactory.getLogger(Consumer.<b>class</b>); @KafkaListener(topics = <font>"users"</font><font>, groupId = </font><font>"group_id"</font><font>) <b>public</b> <b>void</b> consume(String message){ logger.info(String.format(</font><font>"$$ -> Consumed Message -> %s"</font><font>,message)); } } </font>
現在,我們需要一種方法告訴我們的應用程式在哪裡找到Kafka伺服器並建立一個主題併發布到它。我們可以使用 application.yaml 如下所示的方法。
server: port: 9000 spring: kafka: consumer: bootstrap-servers: localhost:9092 group-id: group-id auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer producer: bootstrap-servers: localhost:9092 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
現在,如果我們執行應用程式並點選如下所示的端點,我們就已經發布了一條訊息。
如果我們從控制檯檢查日誌,它應該打印發送到釋出端點的訊息。
總結
在這篇文章中,我們已經看到了Kafka系統中使用的基本術語。我們還看到使用Spring Boot配置Kafka是多麼容易。大多數魔法都是由Spring Boot在幕後完成的。一種簡單快捷的方法是在application.yml檔案中配置與Kafka相關的詳細資訊,如果我們更改Kafka叢集並且必須將伺服器指向新的Kafka叢集地址,這是很好的。