1. 程式人生 > >Spring Cloud 系列之 Spring Cloud Stream

Spring Cloud 系列之 Spring Cloud Stream

Spring Cloud Stream 是訊息中介軟體元件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 為訊息中介軟體系統為基礎,介紹 Spring Cloud Stream 的使用。如果你沒有用過訊息中介軟體,可以到 RabbitMQ 的官網看一下,或者參考這個 http://rabbitmq.mr-ping.com/。理解了訊息中介軟體的設計,才能更好的使用它。

訊息中間的幾大應用場景

1、非同步處理

比如使用者在電商網站下單,下單完成後會給使用者推送簡訊或郵件,發簡訊和郵件的過程就可以非同步完成。因為下單付款是核心業務,發郵件和簡訊並不屬於核心功能,並且可能耗時較長,所以針對這種業務場景可以選擇先放到訊息佇列中,有其他服務來非同步處理。

2、應用解耦:

假設公司有幾個不同的系統,各系統在某些業務有聯動關係,比如 A 系統完成了某些操作,需要觸發 B 系統及 C 系統。如果 A 系統完成操作,主動呼叫 B 系統的介面或 C 系統的介面,可以完成功能,但是各個系統之間就產生了耦合。用訊息中介軟體就可以完成解耦,當 A 系統完成操作將資料放進訊息佇列,B 和 C 系統去訂閱訊息就可以了。這樣各系統只要約定好訊息的格式就好了。

3、流量削峰

比如秒殺活動,一下子進來好多請求,有的服務可能承受不住瞬時高併發而崩潰,所以針對這種瞬時高併發的場景,在中間加一層訊息佇列,把請求先入佇列,然後再把佇列中的請求平滑的推送給服務,或者讓服務去佇列拉取。

4、日誌處理

kafka 最開始就是專門為了處理日誌產生的。

當碰到上面的幾種情況的時候,就要考慮用訊息隊列了。如果你碰巧使用的是 RabbitMQ 或者 kafka ,而且同樣也是在使用 Spring Cloud ,那可以考慮下用 Spring Cloud Stream。

使用 Spring Cloud Stream && RabbitMQ

介紹下面的例子之前,假定你已經對 RabbitMQ 有一定的瞭解。

首先來認識一下 Spring Cloud Stream 中的幾個重要概念。

Destination Binders:目標繫結器,目標指的是 kafka 還是 RabbitMQ,繫結器就是封裝了目標中介軟體的包。如果操作的是 kafka 就使用 kafka binder ,如果操作的是 RabbitMQ 就使用 rabbitmq binder。

Destination Bindings:外部訊息傳遞系統和應用程式之間的橋樑,提供訊息的“生產者”和“消費者”(由目標繫結器建立)

Message:一種規範化的資料結構,生產者和消費者基於這個資料結構通過外部訊息系統與目標繫結器和其他應用程式通訊。

可能看完了上面的三個概念仍然是一頭霧水,沒有關係,實踐過程中自然就明白了。

先來一個最簡單的例子

因為用到的是 rabbitmq,所以在本地搭好 rabbitmq 環境,然後裝好 rabbitmq-management 外掛,這樣就可以訪問 web UI 介面了,預設是 15672 埠。

1、引用對應 rabbitmq 的 stream 包

<dependency> 
  <groupId>org.springframework.cloud</groupId>  
  <artifactId>spring-cloud-starter-stream-rabbit</artifactId> 
</dependency>

2、在 application.yml 中增加配置

spring:
  profiles: stream-rabbit-customer-group1
  cloud:
    stream:
      bindings:
        input:
          destination: default.messages
          binder: local_rabbit
        output:
          destination: default.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 32775
                username: guest
                password: guest
server:
  port: 8201

理解配置檔案很重要,基本上理解清楚了配置,也就明白 spring cloud stream 是怎麼回事了。

spring.cloud.stream.binders,上面提到了 stream 的 3 個重要概念的第一個 「Destination binders」。上面的配置檔案中就配置了一個 binder,命名為 local_rabbit,指定 type 為 rabbit ,表示使用的是 rabbitmq 訊息中介軟體,如果用的是 kafka ,則 type 設定為 kafka。environment 就是設定使用的訊息中介軟體的配置資訊,包括 host、port、使用者名稱、密碼等。可以設定多了個 binder,適配不同的場景。

spring.cloud.stream.bindings ,對應上面提到到 「Destination Bindings」。這裡面可以配置多個 input 或者 output,分別表示訊息的接收通道和傳送通道,對應到 rabbitmq 上就是不同的 exchange。這個配置檔案裡定義了兩個input 、兩個output,名稱分別為 input、log_input、output、log_output。這個名稱不是亂起的,在我們的程式程式碼中會用到,用來標示某個方法接收哪個 exchange 或者傳送到哪個 exchange 。

每個通道下的 destination 屬性指 exchange 的名稱,binder 指定在 binders 裡設定的 binder,上面配置中指定了 local_rabbit 。

可以看到 input、output 對應的 destination 是相同的,log_input、log_output 對應的 destination 也相同, 也就是對應相同的 exchange。一個表示訊息來源,一個表示訊息去向。

另外還可以設定 group 。因為服務很可能不止一個例項,如果啟動多個例項,那麼沒必要每個例項都消費同一個訊息,只要把功能相同的例項的 group 設定為同一個,那麼就會只有一個例項來消費訊息,避免重複消費的情況。如果設定了 group,那麼 group 名稱就會成為 queue 的名稱,如果沒有設定 group ,那麼 queue 就會根據 destination + 隨機字串的方式命名。

3、接下來做一個最簡單的例子,來演示如何接收訊息。

首先來介紹一下 stream 內建的簡單訊息通道(訊息通道也就是指訊息的來源和去向)介面定義,一個 Source 和 一個 Sink 。

Source.java

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

訊息傳送通道定義,定義了一個 MessageChannel 型別的 output() 方法,用 @Output 註解標示,並指定了 binding 的名稱為 output。

Sink.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

訊息接收通道定義,定義了一個 SubscribableChannel 型別的 input() 方法,表示訂閱一個訊息的方法,並用 @Input 註解標識,並且指定了 binging 的名稱為 input 。

建立一個簡單的訊息接收方法:

@SpringBootApplication
@EnableBinding(value = {Processor.class})
@Slf4j
public class DefaultApplication {

    public static void main(String[] args) {
        SpringApplication.run(DefaultApplication.class, args);
    }
}

在專案啟動類上加上註解 @EnableBinding(value = {Processor.class}) ,表明啟用 stream ,並指定定義的 Channel 定義介面類。

然後,建立一個 service 服務類,用來訂閱訊息,並對訊息進行處理。

@Slf4j
@Component
public class DefaultMessageListener {

    @StreamListener(Processor.INPUT)
    public void processMyMessage(String message) {
        log.info("接收到訊息:" + message);
    }
}

在方法 processMyMessage() 上使用 @StreamListener 註解,表示對訊息進行訂閱監控,指定 binding 的名稱,其中 Processor.INPUT 就是 Sink 的 input ,也就是字串 input ,對應的上面的配置檔案,就是 spring.cloud.stream.bindings.input。

啟動 DefaultApplication ,可以在 rabbitmq 管理控制檯的 exchanges 中看到增加的這幾個 bindings 。

可以看到 exchange 的名稱對應的就是 bindings 的兩個 input 和 兩個 output 的 destination 的值。

用 rabbitmq UI 控制檯傳送訊息測試

點選上圖的 default.input.messages 進入 exchange 詳請頁面,在 publish message 部分填寫上 Payload ,然後點選 Publish message 按鈕。

之後回到 DefaultApplication 的輸出控制檯,會看到訊息已經被接收。

模擬一個日誌處理

接下來模擬生產者和消費者處理訊息的過程,模擬一個日誌處理的過程。

  • 原始日誌傳送到 kite.log.messages exchange
  • 接收器在 kite.log.messages exchange 接收原始日誌,經過處理格式化,傳送到 kite.log.format.messages exchange
  • 接收器在 kite.log.format.messages exchange 接收格式化後的日誌

1、自定義訊息通道介面,上面介紹了 stream 自帶的 Sink 和 Source,也僅僅能做個演示,真正的業務中還是需要自己定義更加靈活的介面。

@Component
public interface MyProcessor {

    String MESSAGE_INPUT = "log_input";

    String MESSAGE_OUTPUT = "log_output";

    String LOG_FORMAT_INPUT = "log_format_input";

    String LOG_FORMAT_OUTPUT = "log_format_output";

    @Input(MESSAGE_INPUT)
    SubscribableChannel logInput();

    @Output(MESSAGE_OUTPUT)
    MessageChannel logOutput();

    @Input(LOG_FORMAT_INPUT)
    SubscribableChannel logFormatInput();

    @Output(LOG_FORMAT_OUTPUT)
    MessageChannel logFormatOutput();

}

2、建立消費者應用

配置檔案如下 :

spring:
  profiles: stream-rabbit-customer-group1
  cloud:
    stream:
      bindings:
        log_input:
          destination: kite.log.messages
          binder: local_rabbit
          group: logConsumer-group1
        log_output:
          destination: kite.log.messages
          binder: local_rabbit
          group: logConsumer-group1
        log_format_input:
          destination: kite.log.format.messages
          binder: local_rabbit
          group: logFormat-group1
        log_format_input:
          destination: kite.log.format.messages
          binder: local_rabbit
          group: logFormat-group1
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 32775
                username: guest
                password: guest
server:
  port: 8201

此配置檔案要參照 MyProcessor 介面檢視,定義了 4 個 binding,但是 destination 兩兩相同,也就是兩個 exchange。

建立 spring boot 啟動類

@SpringBootApplication
@EnableBinding(value = {MyProcessor.class})
@Slf4j
public class CustomerApplication {

    public static void main(String[] args) {
        SpringApplication.run(CustomerApplication.class, args);
    }
}

用 @EnableBinding(value = {MyProcessor.class}) 註解引入 MyProcessor

建立訊息接收處理服務

@Slf4j
@Component
public class LogMessageListener {

    /**
     * 通過 MyProcessor.MESSAGE_INPUT 接收訊息
     * 然後通過 SendTo 將處理後的訊息傳送到 MyProcessor.LOG_FORMAT_OUTPUT
     * @param message
     * @return
     */
    @StreamListener(MyProcessor.MESSAGE_INPUT)
    @SendTo(MyProcessor.LOG_FORMAT_OUTPUT)
    public String processLogMessage(String message) {
        log.info("接收到原始訊息:" + message);
        return "「" + message +"」";
    }

    /**
     * 接收來自 MyProcessor.LOG_FORMAT_INPUT 的訊息
     * 也就是加工後的訊息,也就是通過上面的 SendTo 傳送來的
     * 因為 MyProcessor.LOG_FORMAT_OUTPUT 和 MyProcessor.LOG_FORMAT_INPUT 是指向同一 exchange
     * @param message
     */
    @StreamListener(MyProcessor.LOG_FORMAT_INPUT)
    public void processFormatLogMessage(String message) {
        log.info("接收到格式化後的訊息:" + message);
    }
}

3、建立一個訊息生產者,用於傳送原始日誌訊息

配置檔案:

spring:
  cloud:
    stream:
      bindings:
        log_output:
          destination: kite.log.messages
          binder: local_rabbit
          group: logConsumer-group1
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 32775
                username: guest
                password: guest
server:
  port: 8202

僅僅指定了一個 binding log_output,用來發送訊息,如果只做生產者就不要指定 log_input,如果指定了 log_input ,應用就會認為這個生產者服務也會消費訊息,如果這時沒有在此服務中訂閱訊息,當訊息被髮送到這個服務時,因為並沒有訂閱訊息,也就是沒有 @StreamListener 註解的方法,就會出現如下異常:

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel

建立 spring boot 啟動類

@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {

    @Autowired
    private MyProcessor myProcessor;

    @GetMapping(value = "sendLogMessage")
    public void sendLogMessage(String message){
        Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
        myProcessor.logOutput().send(stringMessage);
    }
}

同樣的引入 @EnableBinding(value = {MyProcessor.class})

建立一個 Controller 用來發送訊息

@Slf4j
@RestController
@EnableBinding(value = {MyProcessor.class})
public class MyMessageController {

    @Autowired
    private MyProcessor myProcessor;

    @GetMapping(value = "sendLogMessage")
    public void sendLogMessage(String message){
        Message<String> stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(message).build();
        myProcessor.logOutput().send(stringMessage);
    }
}

之後,訪問連結:

http://localhost:8202/sendLogMessage?message=原始日誌

可以在消費服務端看到如下輸出:

其他

訊息除了可以是字串型別,還可以是其他型別,也可以是實體型別,例如

@GetMapping(value = "sendObjectLogMessage") 
public void sendObjectLogMessage() {
    LogInfo logInfo = new LogInfo();
    logInfo.setClientIp("192.168.1.111");
    logInfo.setClientVersion("1.0");
    logInfo.setUserId("198663383837434");
    logInfo.setTime(Date.from(Instant.now()));
    Message < LogInfo > stringMessage = org.springframework.messaging.support.MessageBuilder.withPayload(logInfo).build();
    myProcessor.logOutput().send(stringMessage);
}

上面程式碼傳送了一個 LogInfo 實體物件,在消費者端依然可以用字串型別接收,因為 @StreamListener 註解會預設把實體轉為 json 字串。

另外,可以試著啟動兩個消費者端,把 group 設定成相同的,這時,傳送的訊息只會被一個消費者接收。

如果把 group 設定成不一樣的,那麼傳送的訊息會被兩個消費者接收。

還可以看其他 Spring Cloud 系列:

如果你也打算學習和使用 Spring Cloud

Spring Cloud Eureka 實現服務註冊與發現

為 Eureka 服務註冊中心實現安全控制

Spring Cloud Eureka 實現高可用服務發現註冊中心

Spring Cloud Config 配置中心 看這一篇就夠了

服務註冊發現、配置中心集一體的 Spring Cloud Consul

不要吝惜你的「推薦」呦

歡迎關注,不定期更新本系列和其他文章
古時的風箏 ,進入公眾號可以加入交流群

相關推薦

Spring Cloud 系列 Spring Cloud Stream

Spring Cloud Stream 是訊息中介軟體元件,它集成了 kafka 和 rabbitmq 。本篇文章以 Rabbit MQ 為訊息中介軟體系統為基礎,介紹 Spring Cloud Stream 的使用。如果你沒有用過訊息中介軟體,可以到 RabbitMQ 的官網看一下,或者參考這個 http

Spring Security系列Spring Social社交登入的繫結與解綁(十五)

在之前的Spring Social系列中,我們只是實現了使用服務提供商賬號登入到業務系統中,但沒有與業務系統中的賬號進行關聯。本章承接之前社交系列來實現社交賬號與業務系統賬號的繫結與解綁。 UserConnection create table UserConnection ( user

Spring Cloud系列Eureka服務治理

寫在前面 Spring Cloud Eureka是基於Netflix Eureka做的二次封裝.主要包含兩部分: 服務註冊中心 eureka server 服務提供者 eureka client ps:Netflix

Spring Cloud 系列 Netflix Hystrix 服務容錯

   什麼是 Hystrix      Hystrix 源自 Netflix 團隊於 2011 年開始研發。2012年 Hystrix 不斷髮展和成熟,Netflix 內部的許多團隊都採用了它。如今,每天在 Netflix 上通過 Hystrix 執行數百億個執行緒隔離和數千億個訊號量隔離的呼叫。極大地提

Spring Cloud 系列 Alibaba Sentinel 服務哨兵

![](https://mrhelloworld.com/resources/articles/spring/spring-cloud/sentinel/43697219-3cb4ef3a-9975-11e8-9a9c-73f4f537442d.png)   前文中我們提到 Netflix 中多項開源產品已

Spring Cloud 系列 Gateway 服務閘道器(一)

什麼是 Spring Cloud Gateway      Spring Cloud Gateway 作為 Spring Cloud 生態系統中的閘道器,目標是替代 Netflix Zuul,其不僅提供統一的路由方式,並且還基於 Filter 鏈的方式提供了閘道器基本的功能。目前最新版 Spring Clou

Spring Cloud 系列 Gateway 服務閘道器(二)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡:Spring Cloud 系列之 Gateway 服務閘道器(一) 本篇文章講解 Gateway 閘道器的多種路由規則、動態路由規則(配合服務發現的路由規則)。    路由規則      點選連結觀看:路由規則(獲取更多請關注公眾號「哈嘍沃德先生」)      

Spring Cloud 系列 Gateway 服務閘道器(三)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡: Spring Cloud 系列之 Gateway 服務閘道器(一) Spring Cloud 系列之 Gateway 服務閘道器(二) 本篇文章講解 Gateway 閘道器過濾器和全域性過濾器以及自定義過濾器。    過濾器      Spring Clo

Spring Cloud 系列 Gateway 服務閘道器(四)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡: Spring Cloud 系列之 Gateway 服務閘道器(一) Spring Cloud 系列之 Gateway 服務閘道器(二) Spring Cloud 系列之 Gateway 服務閘道器(三) 本篇文章講解 Gateway 閘道器如何實現限流、整

Spring Cloud 系列 Sleuth 鏈路追蹤(一)

  隨著微服務架構的流行,服務按照不同的維度進行拆分,一次請求往往需要涉及到多個服務。網際網路應用構建在不同的軟體模組集上,這些軟體模組,有可能是由不同的團隊開發、可能使用不同的程式語言來實現、有可能布在了幾千臺伺服器,橫跨多個不同的資料中心。因此,就需要一些可以幫助理解系統行為、用於分析效能問題的工具,以便

Spring Cloud 系列 Config 配置中心(一)

## 服務配置現狀      配置檔案是我們再熟悉不過的,在微服務系統中,每個微服務不僅僅只有程式碼,還需要**連線其他資源**,例如資料庫的配置或功能性的開關 MySQL、Redis 、Security 等相關的配置。除了專案執行的基礎配置之外,還有一些配置是與我們業務有關係的,比如說七牛儲存、簡訊和郵

Spring Cloud 系列 Config 配置中心(二)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡:Spring Cloud 系列之 Config 配置中心(一) 本篇文章講解 Config 如何實現配置中心自動重新整理。 配置中心自動重新整理 點選連結觀看:配置中心自動重新整理視訊(獲取更多請關注公眾號「哈嘍沃德先生」) Spring Cloud Con

Spring Cloud 系列 Config 配置中心(三)

本篇文章為系列文章,未讀前幾集的同學請猛戳這裡: Spring Cloud 系列之 Config 配置中心(一)Spring Cloud 系列之 Config 配置中心(二) 本篇文章講解 Config 如何實現配置中心加解密,配置中心使用者安全認證。 配置中心加解密 考慮這樣一個問題:所有的配置檔案都

Spring Cloud 系列 Bus 訊息匯流排

什麼是訊息匯流排 訊息代理中介軟體構建一個共用的訊息主題讓所有微服務例項訂閱,當該訊息主題產生訊息時會被所有微服務例項監聽和消費。 訊息代理又是什麼?訊息代理是一個訊息驗證、傳輸、路由的架構模式,主要用來實現接收和分發訊息,並根據設定好的訊息處理流來轉發給正確的應用。它在微服務之間起到通訊排程作用,減少了服

Spring Cloud 系列 Consul 配置中心

前面我們已經學習過 Spring Cloud Config 了: Spring Cloud 系列之 Config 配置中心(一)Spring Cloud 系列之 Config 配置中心(二)Spring Cloud 系列之 Config 配置中心(三) 它提供了配置中心的功能,但是需要配合 git、svn

Spring Cloud 系列 Apollo 配置中心(一)

背景 隨著程式功能的日益複雜,程式的配置日益增多:各種功能的開關、引數的配置、伺服器的地址等等。 對程式配置的期望值也越來越高:配置修改後實時生效,灰度釋出,分環境、分叢集管理配置,完善的許可權、稽核機制等等。 在這樣的大環境下,傳統的通過配置檔案、資料庫等方式已經越來越無法滿足開發人員對配置管理

Spring Cloud 系列 Apollo 配置中心(二)

本篇文章為系列文章,未讀第一集的同學請猛戳這裡:Spring Cloud 系列之 Apollo 配置中心(一) 本篇文章講解 Apollo 部門管理、使用者管理、配置管理、叢集管理。 點選連結觀看:Apollo 部門管理、使用者管理、配置管理、叢集管理視訊(獲取更多請關注公眾號「哈嘍沃德先生」) 部門及使

Spring Cloud 系列 Apollo 配置中心(四)

本篇文章為系列文章,未讀前幾集的同學請猛戳這裡: Spring Cloud 系列之 Apollo 配置中心(一)Spring Cloud 系列之 Apollo 配置中心(二)Spring Cloud 系列之 Apollo 配置中心(三) 本篇文章講解 Apollo 高可用環境搭建,灰度釋出,教大家搭建企業中

Spring Cloud 系列 Alibaba Nacos 註冊中心(一)

前言 從本章節開始,我們學習 Spring Cloud Alibaba 相關微服務元件。 Spring Cloud Alibaba 介紹 Spring Cloud Alibaba 致力於提供微服務開發的一站式解決方案。此專案包含開發分散式應用微服務的必需元件,方便開發者通過 Spring Cloud

Spring Cloud 系列 Alibaba Nacos 配置中心

## Nacos 介紹    ![](https://user-gold-cdn.xitu.io/2020/6/16/172bb81969149073?w=2001&h=391&f=png&s=35417 " ")      Nacos 是 Alibaba 公司推出的開源工具,用於實現分散式系統的服務