1. 程式人生 > >SpringBoot Kafka 整合實例教程

SpringBoot Kafka 整合實例教程

內容 string n) spring pic win tst app ken

1、使用IDEA新建工程引導方式,創建消息生產工程 springboot-kafka-producer。

技術分享圖片

工程POM文件代碼如下:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 3          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
> 4 <modelVersion>4.0.0</modelVersion> 5 6 <groupId>com.miniooc</groupId> 7 <artifactId>springboot-kafka-producer</artifactId> 8 <version>1.0.0-SNAPSHOT</version> 9 <packaging>jar</packaging> 10 11 <name>springboot-kafka-producer</
name> 12 <description>Demo project for Spring Boot</description> 13 14 <parent> 15 <groupId>org.springframework.boot</groupId> 16 <artifactId>spring-boot-starter-parent</artifactId> 17 <version>2.0.3.RELEASE</version>
18 <relativePath/> 19 </parent> 20 21 <properties> 22 <spring-cloud.version>Finchley.RELEASE</spring-cloud.version> 23 </properties> 24 25 <dependencies> 26 <dependency> 27 <groupId>org.springframework.boot</groupId> 28 <artifactId>spring-boot-starter-web</artifactId> 29 </dependency> 30 <dependency> 31 <groupId>org.springframework.boot</groupId> 32 <artifactId>spring-boot-starter-actuator</artifactId> 33 </dependency> 34 <dependency> 35 <groupId>org.springframework.kafka</groupId> 36 <artifactId>spring-kafka</artifactId> 37 </dependency> 38 <dependency> 39 <groupId>org.springframework.boot</groupId> 40 <artifactId>spring-boot-starter-test</artifactId> 41 <scope>test</scope> 42 </dependency> 43 44 <!-- 添加 gson 依賴 --> 45 <dependency> 46 <groupId>com.google.code.gson</groupId> 47 <artifactId>gson</artifactId> 48 <version>2.8.5</version> 49 </dependency> 50 <!-- 添加 lombok 依賴 --> 51 <dependency> 52 <groupId>org.projectlombok</groupId> 53 <artifactId>lombok</artifactId> 54 <version>1.16.22</version> 55 <scope>provided</scope> 56 </dependency> 57 </dependencies> 58 59 <dependencyManagement> 60 <dependencies> 61 <dependency> 62 <groupId>org.springframework.cloud</groupId> 63 <artifactId>spring-cloud-dependencies</artifactId> 64 <version>${spring-cloud.version}</version> 65 <type>pom</type> 66 <scope>import</scope> 67 </dependency> 68 </dependencies> 69 </dependencyManagement> 70 71 <build> 72 <plugins> 73 <plugin> 74 <groupId>org.springframework.boot</groupId> 75 <artifactId>spring-boot-maven-plugin</artifactId> 76 </plugin> 77 </plugins> 78 </build> 79 80 81 </project>

註釋部分為手動添加的 gson、lombok 依賴。

2、創建消息實體類

 1 package com.miniooc.kafka.message;
 2 
 3 import lombok.Data;
 4 
 5 import java.io.Serializable;
 6 import java.util.Date;
 7 import java.util.List;
 8 
 9 @Data
10 public class OrderBasic implements Serializable {
11 
12     /**
13      * 訂單ID
14      */
15     private String orderId;
16     /**
17      * 訂單編號
18      */
19     private String orderNumber;
20     /**
21      * 訂單日期
22      */
23     private Date date;
24     /**
25      * 訂單信息
26      */
27     private List<String> desc;
28 
29 }

3、創建消息生產類

 1 /**
 2  *
 3  */
 4 package com.miniooc.kafka.producer;
 5 
 6 import com.google.gson.GsonBuilder;
 7 import com.miniooc.kafka.message.OrderBasic;
 8 import lombok.extern.java.Log;
 9 import org.springframework.beans.factory.annotation.Value;
10 import org.springframework.kafka.core.KafkaTemplate;
11 import org.springframework.stereotype.Component;
12 
13 import javax.annotation.Resource;
14 
15 /**
16  * Kafka消息生產類
17  */
18 @Log
19 @Component
20 public class KafkaProducer {
21 
22     @Resource
23     private KafkaTemplate<String, String> kafkaTemplate;
24 
25     @Value("${kafka.topic.order}")
26     private String topicOrder;
27 
28     /**
29      * 發送訂單消息
30      *
31      * @param orderBasic 訂單信息
32      */
33     public void sendOrderMessage(OrderBasic orderBasic) {
34         GsonBuilder builder = new GsonBuilder();
35         builder.setPrettyPrinting();
36         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
37         String message = builder.create().toJson(orderBasic);
38         kafkaTemplate.send(topicOrder, message);
39         log.info("\n" + message);
40     }
41 }

4、編輯資源配置文件 application.properties

1 server.port=9526
2 spring.application.name=kafka-producer
3 kafka.bootstrap.servers=localhost:9092
4 kafka.topic.order=topic-order
5 kafka.group.id=group-order

5、啟動 zookeeper

D:\kafka>bin\windows\zookeeper-server-start.bat config\zookeeper.properties

技術分享圖片

6、啟動 kafka

D:\kafka>bin\windows\kafka-server-start.bat config\server.properties

技術分享圖片

7、運行工程,通過控制器調用消息生產類,創建一條消息到kafka

技術分享圖片

看到紅框內容,說明消息發送成功。

8、再使用IDEA新建工程引導方式,創建消息消費工程 springboot-kafka-producer。

9、創建消息消費類,並監聽topic。

 1 package com.miniooc.kafka.consumer;
 2 
 3 import com.google.gson.Gson;
 4 import com.google.gson.GsonBuilder;
 5 import com.google.gson.reflect.TypeToken;
 6 import com.miniooc.kafka.message.OrderBasic;
 7 import lombok.extern.java.Log;
 8 import org.springframework.kafka.annotation.KafkaListener;
 9 import org.springframework.messaging.handler.annotation.Payload;
10 import org.springframework.stereotype.Component;
11 
12 @Log
13 @Component
14 public class KafkaConsumer {
15 
16     @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
17     public void consume(@Payload String message) {
18         GsonBuilder builder = new GsonBuilder();
19         builder.setPrettyPrinting();
20         builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
21         Gson gson = builder.create();
22         OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
23         }.getType());
24         String json = gson.toJson(orderBasic);
25         log.info("\n接受並消費消息\n" + json);
26     }
27 }

10、運行工程。

技術分享圖片

看到紅框內容,說明消息消費成功。

SpringBoot Kafka 整合完成!

SpringBoot Kafka 整合實例教程