1. 程式人生 > >springboot 整合 kafka 示例 教程

springboot 整合 kafka 示例 教程

1、使用IDEA新建工程,建立工程 springboot-kafka-producer。

工程POM檔案程式碼如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.miniooc</groupId>
    <artifactId>springboot-kafka-producer</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>springboot-kafka-producer</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.3.RELEASE</version>
        <relativePath/>
    </parent>

    <properties>
        <spring-cloud.version>Finchley.RELEASE</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- 新增 gson 依賴 -->
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.8.5</version>
        </dependency>
        <!-- 新增 lombok 依賴 -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

註釋部分為手動新增的 gson、lombok 依賴。

2、建立訊息實體類

package com.miniooc.kafka.message;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;
import java.util.List;

@Data
public class OrderBasic implements Serializable {

    /**
     * 訂單ID
     */
    private String orderId;
    /**
     * 訂單編號
     */
    private String orderNumber;
    /**
     * 訂單日期
     */
    private Date date;
    /**
     * 訂單資訊
     */
    private List<String> desc;

}

3、建立訊息生產類

/**
 *
 */
package com.miniooc.kafka.producer;

import com.google.gson.GsonBuilder;
import com.miniooc.kafka.message.OrderBasic;
import lombok.extern.java.Log;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * Kafka訊息生產類
 */
@Log
@Component
public class KafkaProducer {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.topic.order}")
    private String topicOrder;

    /**
     * 傳送訂單訊息
     *
     * @param orderBasic 訂單資訊
     */
    public void sendOrderMessage(OrderBasic orderBasic) {
        GsonBuilder builder = new GsonBuilder();
        builder.setPrettyPrinting();
        builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = builder.create().toJson(orderBasic);
        kafkaTemplate.send(topicOrder, message);
        log.info("\n生產訊息至Kafka\n" + message);
    }
}

 4、編輯資源配置檔案 application.properties

server.port=9527
spring.application.name=kafka-consumer
kafka.bootstrap.servers=localhost:9092
kafka.topic.order=topic-order
kafka.group.id=group-order

5、啟動 zookeeper

[[email protected] kafka]# bin/zookeeper-server-start.sh config/zookeeper.properties

6、啟動 kafka

[[email protected] kafka]# bin/kafka-server-start.sh config/server.properties

7、執行工程,通過控制器呼叫訊息生產類,建立一條訊息到kafka

看到紅框內容,說明訊息傳送成功。

8、再使用IDEA新建工程引導方式,建立訊息消費工程 springboot-kafka-producer。

9、建立訊息消費類,並監聽topic。

package com.miniooc.kafka.consumer;

import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import com.miniooc.kafka.message.OrderBasic;
import lombok.extern.java.Log;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Log
@Component
public class KafkaConsumer {

    @KafkaListener(topics = "${kafka.topic.order}", containerFactory = "kafkaListenerContainerFactory")
    public void consume(@Payload String message) {
        GsonBuilder builder = new GsonBuilder();
        builder.setPrettyPrinting();
        builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
        Gson gson = builder.create();
        OrderBasic orderBasic = gson.fromJson(message, new TypeToken<OrderBasic>() {
        }.getType());
        String json = gson.toJson(orderBasic);
        log.info("\n接受並消費訊息\n" + json);
    }
}

10、執行工程。

看到紅框內容,說明訊息消費成功。

SpringBoot Kafka 整合完成! 


有需要原始碼的,私信我

微訊號: songlu2011

QQ號: 13637818