Stream是什麼及Binder介紹

什麼是Spring Cloud Stream?

官方定義Spring Cloud Stream是一個構建訊息驅動微服務的框架。

應用程式通過inputs或者 outputs 來與Spring Cloud Stream中binder物件互動。

通過我們配置來binding(繫結),而Spring Cloud Stream 的binder物件負責與訊息中介軟體互動。所以,我們只需要搞清楚如何與Spring Cloud Stream互動就可以方便使用訊息驅動的方式。

通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動。

Spring Cloud Stream為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,引用了釋出-訂閱、消費組、分割槽的三個核心概念。

目前僅支援RabbitMQ、 Kafka。

總結:其實總體來說就是類似於JDBC的規範,通過這個Stream驅動元件去訪問訊息中介軟體,從而達到與中介軟體的分離

Stream的設計思想

標準MQ

  • 生產者/消費者之間靠訊息媒介傳遞資訊內容
  • 訊息必須走特定的通道 - 訊息通道 Message Channel
  • 訊息通道里的訊息如何被消費呢,誰負責收發處理 - 訊息通道MessageChannel的子介面SubscribableChannel,由MessageHandler訊息處理器所訂閱。

為什麼用Cloud Stream?

比方說我們用到了RabbitMQ和Kafka,由於這兩個訊息中介軟體的架構上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分割槽。

這些中介軟體的差異性導致我們實際專案開發給我們造成了一定的困擾,我們如果用了兩個訊息佇列的其中一種,後面的業務需求,我想往另外一種訊息佇列進行遷移,這時候無疑就是一個災難性的,一大堆東西都要重新推倒重新做,因為它跟我們的系統耦合了,這時候Spring Cloud Stream給我們提供了—種解耦合的方式。

Stream憑什麼可以統一底層差異?

在沒有繫結器這個概念的情況下,我們的SpringBoot應用要直接與訊息中介軟體進行資訊互動的時候,由於各訊息中介軟體構建的初衷不同,它們的實現細節上會有較大的差異性通過定義繫結器作為中間層,完美地實現了應用程式與訊息中介軟體細節之間的隔離。通過嚮應用程式暴露統一的Channel通道,使得應用程式不需要再考慮各種不同的訊息中介軟體實現。

通過定義繫結器Binder作為中間層,實現了應用程式與訊息中介軟體細節之間的隔離

Binder:

  • INPUT對應於消費者
  • OUTPUT對應於生產者

Stream中的訊息通訊方式遵循了釋出-訂閱模式

Topic主題進行廣播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

Stream編碼常用註解簡介

Spring Cloud Stream標準流程套路

  • Binder - 很方便的連線中介軟體,遮蔽差異。
  • Channel - 通道,是佇列Queue的一種抽象,在訊息通訊系統中就是實現儲存和轉發的媒介,通過Channel對佇列進行配置。
  • Source和Sink - 簡單的可理解為參照物件是Spring Cloud Stream自身,從Stream釋出訊息就是輸出,接受訊息就是輸入。

編碼API和常用註解

組成

說明

Middleware

中介軟體,目前只支援RabbitMQ和Kafka

Binder

Binder是應用與訊息中介軟體之間的封裝,目前實行了Kafka和RabbitMQ的Binder,通過Binder可以很方便的連線中介軟體,可以動態的改變訊息型別(對應於Kafka的topic,RabbitMQ的exchange),這些都可以通過配置檔案來實現

@Input

註解標識輸入通道,通過該輸乎通道接收到的訊息進入應用程式

@Output

註解標識輸出通道,釋出的訊息將通過該通道離開應用程式

@StreamListener

監聽佇列,用於消費者的佇列的訊息接收

@EnableBinding

指通道channel和exchange繫結在一起

案例說明

準備RabbitMQ環境

工程中新建三個子模組

  • cloud-stream-rabbitmq-provider8801,作為生產者進行發訊息模組
  • cloud-stream-rabbitmq-consumer8802,作為訊息接收模組
  • cloud-stream-rabbitmq-consumer8803,作為訊息接收模組

Stream訊息驅動之生產者

新建8801工程

新建Module:cloud-stream-rabbitmq-provider8801

修改POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>com.dance.springcloud</artifactId>
<groupId>com.dance</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-provider8801</artifactId> <properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </project>

新建yml配置

(application.yml)

server:
port: 8801 spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
output: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為json,文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定 eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: send-8801.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址

新建主啟動類

package com.dance.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}

新建業務類

新建傳送訊息介面

package com.dance.springcloud.service;

public interface IMessageProvider {
public String send();
}

新建實現類

package com.dance.springcloud.service.impl;

import com.dance.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel; import javax.annotation.Resource;
import java.util.UUID; /**
* 定義訊息的推送管道
*/
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider { /**
* 訊息傳送管道
*/
@Resource
private MessageChannel output; @Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: " + serial);
return null;
}
}

新建Controller

package com.dance.springcloud.controller;

import com.dance.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController
public class SendMessageController {
@Resource
private IMessageProvider messageProvider; @GetMapping(value = "/sendMessage")
public String sendMessage() {
return messageProvider.send();
} }

測試

  • 啟動 Eureka叢集
  • 啟動 rabbitmq
  • 後臺將列印serial: UUID字串

Stream訊息驅動之消費者

新建8802工程

新建Module:cloud-stream-rabbitmq-consumer8802

修改POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>com.dance.springcloud</artifactId>
<groupId>com.dance</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion> <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId> <properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties> <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--基礎配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies> </project>

新建yml配置

server:
port: 8802 spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定 eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8802.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址

新建主啟動類

package com.dance.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}

新建業務類Controller

package com.dance.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component; @Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort; @StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消費者1號,----->接受到的訊息: " + message.getPayload() + "\t port: " + serverPort);
}
}

測試

  • 啟動Eureka叢集
  • 啟動StreamMQMain8801
  • 啟動StreamMQMain8802
  • 8801傳送8802接收訊息

Stream之訊息重複消費

參考8802,建立8803

依照8802,克隆出來一份執行8803名稱為:cloud-stream-rabbitmq-consumer8803

測試

啟動

  • RabbitMQ
  • 服務註冊 - Eureka叢集
  • 訊息生產 - 8801
  • 訊息消費 - 8802
  • 訊息消費 - 8802

執行後有兩個問題

  1. 有重複消費問題
    1. 提供者
    1. 消費者
  1. 訊息持久化問題

消費

  • http://localhost:8801/sendMessage
  • 目前是8802/8803同時都收到了,存在重複消費問題
  • 如何解決:分組和持久化屬性group(重要)

生產實際案例

比如在如下場景中,訂單系統我們做叢集部署,都會從RabbitMQ中獲取訂單資訊,那如果一個訂單同時被兩個服務獲取到,那麼就會造成資料錯誤,我們得避免這種情況。這時我們就可以使用Stream中的訊息分組來解決

注意在Stream中處於同一個group中的多個消費者是競爭關係,就能夠保證訊息只會被其中一個應用消費一次。不同組是可以全面消費的(重複消費)。

Stream之group解決訊息重複消費

原理

微服務應用放置於同一個group中,就能夠保證訊息只會被其中一個應用消費一次。

不同的組是可以重複消費的,同一個組內會發生競爭關係,只有其中一個可以消費。

8802/8803都變成不同組,group兩個不同

group: A_Group、B_Group

8802修改YML

server:
port: 8802 spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
group: A_Group # <--------------------分組設定 eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8802.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址

8803修改YML

server:
port: 8803 spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: # 在此處配置要繫結的rabbitmq的服務資訊;
defaultRabbit: # 表示定義的名稱,用於於binding整合
type: rabbit # 訊息元件型別
environment: # 設定rabbitmq的相關的環境配置
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # 服務的整合處理
input: # 這個名字是一個通道的名稱
destination: studyExchange # 表示要使用的Exchange名稱定義
content-type: application/json # 設定訊息型別,本次為物件json,如果是文字則設定“text/plain”
binder: defaultRabbit # 設定要繫結的訊息服務的具體設定
group: B_Group # <--------------------分組設定 eureka:
client: # 客戶端進行Eureka註冊的配置
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 設定心跳的時間間隔(預設是30秒)
lease-expiration-duration-in-seconds: 5 # 如果現在超過了5秒的間隔(預設是90秒)
instance-id: receive-8803.com # 在資訊列表時顯示主機名稱
prefer-ip-address: true # 訪問的路徑變為IP地址

測試

結論:還是重複消費,應為設定的是不同組,所以不存在競爭關係

修改8803為A_Group

8802/8803都變成相同組,group兩個相同

group: A_Group

8802修改YMLgroup: A_Group

8803修改YMLgroup: A_Group

測試

傳送8次訊息

檢視結果

消費者1

消費者2

結論:同一個組的多個微服務例項,每次只會有一個拿到

8802/8803實現了輪詢分組,每次只有一個消費者,8801模組的發的訊息只能被8802或8803其中一個接收到,這樣避免了重複消費。

Stream之訊息持久化

新增分組後自動支援持久化

測試

  • 啟動Eureka叢集
  • 啟動8801 傳送4條訊息
  • 刪除8802的分組配置後啟動
  • 可以發現 在啟動過程中 完全沒有 消費之前傳送的四條訊息,所以會導致訊息丟失
  • 8803沒有刪除分組,直接啟動
  • 可以看到 訊息被消費了,沒有丟失 ID不一樣是應為我之前的8803是啟動者的,後來停止了 8801重新發送完成訊息後,啟動的8803

作者:彼岸舞

時間:2021\09\05

內容關於:Spring Cloud H版

本文屬於作者原創,未經允許,禁止轉發