1. 程式人生 > >springboot 整合kafka系列 一、springboot整合kafka生產者

springboot 整合kafka系列 一、springboot整合kafka生產者

1、新建springboot腳手架工程,pom檔案如下,其中引入了kafka需要的依賴,注意這裡的kafka版本號需要和之前安裝的kafka版本一致,要不然會有問題

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.zeshan</groupId>
	<artifactId>kafka-producer</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>kafka-producer</name>
	<description>kafka整合</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.2.RELEASE</version>
		<relativePath/>
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>2.1.10.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>
2、在application.properties中配置producer基本資訊
kafka.producer.servers=127.0.0.1:9092
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960

3.編寫ProducerController

package com.zeshan.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@RestController
@RequestMapping("kafka-producer")
public class ProducerController {
    @Autowired
    private  KafkaTemplate kafkaTemplate;
    @RequestMapping("send")
    @ResponseBody
    public String  sengMessage(HttpServletRequest request, HttpServletResponse response){
        String message = request.getParameter("message");
        try {
            kafkaTemplate.send("demo",message);
            return "success";
        }catch (Exception e){
            e.printStackTrace();
            return  "error";
        }
    }
}

4.啟動專案,訪問 http://127.0.0.1:6060/kafka-producer/send?message=hello kafka,訪問結果如下

通過觀察我們傳送的訊息已經被consumer消費,至此springboot整合kafka producer成功。

下一篇檔案會介紹springboot 整合kafka consumer