1. 程式人生 > >SpringBoot2 整合Kafka元件,應用案例和流程詳解

SpringBoot2 整合Kafka元件,應用案例和流程詳解

本文原始碼:GitHub·點這裡 || GitEE·點這裡

一、搭建Kafka環境

1、下載解壓

-- 下載
wget http://mirror.bit.edu.cn/apache/kafka/2.2.0/kafka_2.11-2.2.0.tgz
-- 解壓
tar -zxvf kafka_2.11-2.2.0.tgz
-- 重新命名
mv kafka_2.11-2.2.0 kafka2.11

2、啟動Kafka服務

kafka依賴ZooKeeper服務,需要本地安裝並啟動ZooKeeper。

參考文章:Linux系統搭建ZooKeeper3.4中介軟體,常用命令總結

-- 執行位置
-- /usr/local/mysoft/kafka2.11
bin/kafka-server-start.sh config/server.properties

3、檢視服務

ps -aux |grep kafka

4、開放地址埠

-- 基礎路徑
-- /usr/local/mysoft/kafka2.11/config
vim server.properties
-- 新增下面註釋
advertised.listeners=PLAINTEXT://192.168.72.130:9092

二、Kafka基礎概念

1、基礎描述

Kafka是由Apache開源,具有分散式、分割槽的、多副本的、多訂閱者,基於Zookeeper協調的分散式處理平臺,由Scala和Java語言編寫。通常用來蒐集使用者在應用服務中產生的動作日誌資料,並高速的處理。日誌類的資料需要高吞吐量的效能要求,對於像Hadoop一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka的目的是通過Hadoop的並行載入機制來統一線上和離線的訊息處理,也是為了通過叢集來提供實時的訊息。

2、功能特點

(1)、通過磁碟資料結構提供訊息的持久化,訊息儲存也能夠保持長時間穩定性;

(2)、高吞吐量,即使是非常普通的硬體Kafka也可以支援每秒超高的併發量;

(3)、支援通過Kafka伺服器和消費機叢集來分割槽訊息;

(4)、支援Hadoop並行資料載入;

(5)、API包封裝的非常好,簡單易用,上手快 ;

(6)、分散式訊息佇列。Kafka對訊息儲存時根據Topic進行歸類,傳送訊息者稱為Producer,訊息接受者稱為Consumer;

3、訊息功能

  • 點對點模式

點對點模型通常是一個基於拉取或者輪詢的訊息傳遞模型,消費者主動拉取資料,訊息收到後從佇列移除訊息,這種模型不是將訊息推送到客戶端,而是從佇列中請求訊息。特點是傳送到佇列的訊息被一個且只有一個消費者接收處理,即使有多個消費者監聽佇列也是如此。

  • 釋出訂閱模式

釋出訂閱模型則是一個基於推送的訊息傳送模型,訊息產生後,推送給所有訂閱者。釋出訂閱模型可以有多種不同的訂閱者,臨時訂閱者只在主動監聽主題時才接收訊息,而持久訂閱者則監聽主題的所有訊息,即使當前訂閱者不可用,處於離線狀態。

4、訊息佇列作用

  • 程式解耦,生產者和消費者獨立,各自非同步執行;
  • 訊息資料進行持久化儲存,直到被全部消費,規避了資料丟失風險;
  • 流量削峰,使用訊息佇列承接訪問壓力,儘量避免程式雪崩 ;
  • 降低程序間的耦合度,系統部分元件崩潰時,不會影響到整個系統;
  • 保證訊息順序執行,解決特定場景業務需求 ;

5、專業術語簡介

  • Broker

一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。

  • Producer

訊息生產者,就是向kafka broker發訊息的客戶端。

  • Consumer

訊息消費者,向kafka broker取訊息的客戶端。

  • Topic

每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic,可以理解為一個佇列。

  • Consumer Group

每個Consumer屬於一個特定的Consumer Group,可為每個Consumer指定group name,若不指定group name則屬於預設的分組。

  • Partition

一個龐大大的topic可以分佈到多個broker上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體的順序。Partition是物理上的概念,方便在叢集中擴充套件,提高併發。

三、整合SpringBoot2框架

1、案例結構

  • 訊息生產者 : kafka-producer-server

  • 訊息消費方 : kafka-consumer-server

2、基礎依賴

<!-- SpringBoot依賴 -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依賴 -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.4.RELEASE</version>
</dependency>

3、生產者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092

4、訊息生成

@RestController
public class ProducerWeb {

    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String sendMsg () {
        MsgLog msgLog = new MsgLog(1,"訊息生成",
                                 1,"訊息日誌",new Date()) ;
        String msg = JSON.toJSONString(msgLog) ;
        // 這裡Topic如果不存在,會自動建立
        kafkaTemplate.send("cicada-topic", msg);
        return msg ;
    }
}

5、消費者配置

spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      group-id: test-consumer-group

6、訊息消費

@Component
public class ConsumerMsg {

    private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);

    @KafkaListener(topics = "cicada-topic")
    public void listenMsg (ConsumerRecord<?,String> record) {
        String value = record.value();
        LOGGER.info("ConsumerMsg====>>"+value);
    }
}

四、訊息流程分析

1、生產者分析

  • 寫入方式

生產者基於推push推模式將訊息釋出到broker,每條訊息都被追加到分割槽patition中,屬於磁碟順序寫,效率比隨機寫記憶體要高,保障kafka高吞吐量。

  • 分割槽概念

訊息傳送時都被髮送到一個topic,而topic是由Partition Logs(分割槽日誌)組成,其組織結構如下圖所示:

每個Partition中的訊息都是有序的,生產的訊息被不斷追加到Partitionlog上,其中的每一個訊息都被賦予了一個唯一的offset值。每個Partition可以通過調整以適配它所在的機器,而一個topic又可以有多個Partition組成,因此整個叢集就可以適應任意大小的資料。分割槽的原則:指定patition,則直接使用;未指定patition但指定key,通過對key的value進行hash出一個patition;patition和key都未指定,使用輪詢選出一個patition。

2、消費者分析

  • 消費圖解

消費者是以consumer group消費者組的方式工作,由一個或者多個消費者組成一個組,共同消費一個topic。每個分割槽在同一時間只能由group中的一個消費者讀取,但是多個group可以同時消費一個partition。

  • 消費方式

消費者採用pull拉模式從broker中讀取資料。對於Kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費訊息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的資料傳輸場景。

五、原始碼地址

GitHub·地址
https://github.com/cicadasmile/middle-ware-parent
GitEE·地址
https://gitee.com/cicadasmile/middle-ware-parent

推薦閱讀:SpringBoot2整合中介軟體

《整合 shard-jdbc 中介軟體,實現資料分庫分表》

《整合 JavaMail ,實現非同步傳送郵件功能》

《整合 RocketMQ ,實現請求非同步處理》

《整合 Swagger2 ,構建介面管理介面》

《整合 QuartJob ,實現定時器實時管理》

《整合 Redis叢集 ,實現訊息佇列場景》

《整合 Dubbo框架 ,實現RPC服務遠端呼叫》

《整合 ElasticSearch框架,實現高效能搜尋引擎》

《整合 JWT 框架,解決Token跨域驗證問題》

《整合 FastDFS 中介軟體,實現檔案分佈管理》

《整合 Shiro 框架,實現使用者許可權管理》

《整合 Security 框架,實現使用者許可權管理》

《整合 ClickHouse資料庫,實現資料高效能查詢分析》

《整合 Drools規則引擎,實現高效的業務規則》

《整合MybatisPlus增強外掛,配置多資料來源》

《整合 Zookeeper元件,管理架構中服務協調》

《整合Nacos元件,環境搭建和入門案例詳解