1. 程式人生 > >【springcloud】使用 Spring Cloud Stream 構建訊息驅動微服務

【springcloud】使用 Spring Cloud Stream 構建訊息驅動微服務

微服務的目的: 鬆耦合

事件驅動的優勢:高度解耦

Spring Cloud Stream 的幾個概念

Spring Cloud Stream is a framework for building message-driven microservice applications.

官方定義 Spring Cloud Stream 是一個構建訊息驅動微服務的框架。

Spring Cloud Stream Application

應用程式通過 inputs 或者 outputs 來與 Spring Cloud Stream 中binder 互動,通過我們配置來 binding ,而 Spring Cloud Stream 的 binder 負責與中介軟體互動。所以,我們只需要搞清楚如何與 Spring Cloud Stream 互動就可以方便使用訊息驅動的方式

Binder

Binder 是 Spring Cloud Stream 的一個抽象概念,是應用與訊息中介軟體之間的粘合劑。目前 Spring Cloud Stream 實現了 Kafka 和 Rabbit MQ 的binder。

通過 binder ,可以很方便的連線中介軟體,可以動態的改變訊息的
destinations(對應於 Kafka 的topic,Rabbit MQ 的 exchanges),這些都可以通過外部配置項來做到。

甚至可以任意的改變中介軟體的型別而不需要修改一行程式碼。

Publish-Subscribe

訊息的釋出(Publish)和訂閱(Subscribe)是事件驅動的經典模式。Spring Cloud Stream 的資料互動也是基於這個思想。生產者把訊息通過某個 topic 廣播出去(Spring Cloud Stream 中的 destinations)。其他的微服務,通過訂閱特定 topic 來獲取廣播出來的訊息來觸發業務的進行。

這種模式,極大的降低了生產者與消費者之間的耦合。即使有新的應用的引入,也不需要破壞當前系統的整體結構。

Consumer Groups

“Group”,如果使用過 Kafka 的童鞋並不會陌生。Spring Cloud Stream 的這個分組概念的意思基本和 Kafka 一致。

微服務中動態的縮放同一個應用的數量以此來達到更高的處理能力是非常必須的。對於這種情況,同一個事件防止被重複消費,只要把這些應用放置於同一個 “group” 中,就能夠保證訊息只會被其中一個應用消費一次。

Durability

訊息事件的持久化是必不可少的。Spring Cloud Stream 可以動態的選擇一個訊息佇列是持久化,還是 present。

Bindings

bindings 是我們通過配置把應用和spring cloud stream 的 binder 繫結在一起,之後我們只需要修改 binding 的配置來達到動態修改topic、exchange、type等一系列資訊而不需要修改一行程式碼。

基於 RabbitMQ 使用

訊息接收

Spring Cloud Stream 基本用法,需要定義一個介面,如下是內建的一個介面。

public interface Sink {
    String INPUT = "input";

    @Input("input")
    SubscribableChannel input();
}

註釋__ @Input__ 對應的方法,需要返回 __ SubscribableChannel __ ,並且參入一個引數值。

這就介面聲明瞭一個__ binding __命名為 “input” 。

其他內容通過配置指定:

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: mqTestDefault   

destination:指定了訊息獲取的目的地,對應於MQ就是 exchange,這裡的exchange就是 mqTestDefault

@SpringBootApplication
@EnableBinding(Sink.class)
public class Application {

    // 監聽 binding 為 Sink.INPUT 的訊息
    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("一般監聽收到:" + message.getPayload());
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
}

定義一個 class (這裡直接在啟動類),並且添加註解@EnableBinding(Sink.class) ,其中 Sink 就是上述的介面。同時定義一個方法(此處是 input)標明註解為 __ @StreamListener(Processor.INPUT) __,方法引數為 Message 。

啟動後,預設是會建立一個臨時佇列,臨時佇列繫結的exchange為 “mqTestDefault”,routing key為 “#”。

所有傳送 exchange 為“mqTestDefault” 的MQ訊息都會被投遞到這個臨時佇列,並且觸發上述的方法。

以上程式碼就完成了最基本的消費者部分。

訊息傳送

訊息的傳送同訊息的接受,都需要定義一個介面,不同的是介面方法的返回物件是 MessageChannel,下面是 Spring Cloud Stream 內建的介面:

public interface Source {
    String OUTPUT = "output";

    @Output("output")
    MessageChannel output();
}

這就介面聲明瞭一個 binding 命名為 “output” ,不同於上述的 “input”,這個binding 聲明瞭一個訊息輸出流,也就是訊息的生產者。

spring:
  cloud:
    stream:
      bindings:
        output:
          destination: mqTestDefault
          contentType: text/plain

destination:指定了訊息傳送的目的地,對應 RabbitMQ,會發送到 exchange 是 mqTestDefault 的所有訊息佇列中。

程式碼中呼叫:

@SpringBootApplication
@EnableBinding(Source.class)
public class Application implements CommandLineRunner {

    @Autowired
    @Qualifier("output")
    MessageChannel output;

    @Override
    public void run(String... strings) throws Exception {
        // 字串型別傳送MQ
        System.out.println("字串資訊傳送");
        output.send(MessageBuilder.withPayload("大家好").build());
    }
    
    public static void main(String[] args) {
        SpringApplication.run(Application.class);
    }
    
}

通過注入MessageChannel的方式,傳送訊息。

通過注入Source 介面的方式,傳送訊息。 具體可以檢視樣例

以上程式碼就完成了最基本的生產者部分。

自定義訊息傳送接收

自定義介面

Spring Cloud Stream 內建了兩種介面,分別定義了 binding 為 “input” 的輸入流,和 “output” 的輸出流,而在我們實際使用中,往往是需要定義各種輸入輸出流。使用方法也很簡單。

interface OrderProcessor {

    String INPUT_ORDER = "inputOrder";
    String OUTPUT_ORDER = "outputOrder";

    @Input(INPUT_ORDER)
    SubscribableChannel inputOrder();

    @Output(OUTPUT_ORDER)
    MessageChannel outputOrder();
}

一個介面中,可以定義無數個輸入輸出流,可以根據實際業務情況劃分。上述的介面,定義了一個訂單輸入,和訂單輸出兩個 binding。

使用時,需要在 @EnableBinding 註解中,新增自定義的介面。
使用 @StreamListener 做監聽的時候,需要指定 OrderProcessor.INPUT_ORDER

spring:
  cloud:
    stream:
      defaultBinder: defaultRabbit
      bindings:
        inputOrder:
          destination: mqTestOrder
        outputOrder:
          destination: mqTestOrder

如上配置,指定了 destination 為 mqTestOrder 的輸入輸出流。

分組與持久化

上述自定義的介面配置中,Spring Cloud Stream 會在 RabbitMQ 中建立一個臨時的佇列,程式關閉,對應的連線關閉的時候,該佇列也會消失。而在實際使用中,我們需要一個持久化的佇列,並且指定一個分組,用於保證應用服務的縮放。

只需要在消費者端的 binding 新增配置項 spring.cloud.stream.bindings.[channelName].group = XXX 。對應的佇列就是持久化,並且名稱為:mqTestOrder.XXX

rabbitMQ routing key 繫結

用慣了 rabbitMQ 的童鞋,在使用的時候,發現 Spring Cloud Stream 的訊息投遞,預設是根據 destination + group 進行區分,所有的訊息都投遞到 routing key 為 “#‘’ 的訊息佇列裡。

如果我們需要進一步根據 routing key 來進行區分訊息投遞的目的地,或者訊息接受,需要進一步配,Spring Cloud Stream 也提供了相關配置:

spring:
  cloud:
    stream:
      bindings:
        inputProductAdd:
          destination: mqTestProduct
          group: addProductHandler      # 擁有 group 預設會持久化佇列
        outputProductAdd:
          destination: mqTestProduct
      rabbit:
        bindings:
          inputProductAdd:
            consumer:
              bindingRoutingKey: addProduct.*       # 用來繫結消費者的 routing key
          outputProductAdd:
            producer:
              routing-key-expression: '''addProduct.*'''  # 需要用這個來指定 RoutingKey

spring.cloud.stream.rabbit.bindings.[channelName].consumer.bindingRoutingKey
指定了生成的訊息佇列的routing key

spring.cloud.stream.rabbit.bindings.[channelName].producer.routing-key-expression 指定了生產者訊息投遞的routing key

DLX 佇列

DLX 作用

DLX:Dead-Letter-Exchange(死信佇列)。利用DLX, 當訊息在一個佇列中變成死信(dead message)之後,它能被重新publish到另一個Exchange,這個Exchange就是DLX。訊息變成死信一向有一下幾種情況:

訊息被拒絕(basic.reject/ basic.nack)並且requeue=false
訊息TTL過期(參考:RabbitMQ之TTL(Time-To-Live 過期時間)
佇列達到最大長度

DLX也是一個正常的Exchange,和一般的Exchange沒有區別,它能在任何的佇列上被指定,實際上就是設定某個佇列的屬性,當這個佇列中有死信時,RabbitMQ就會自動的將這個訊息重新發布到設定的Exchange上去,進而被路由到另一個佇列,可以監聽這個佇列中訊息做相應的處理。

Spring Cloud Stream 中使用

spring.cloud.stream.rabbit.bindings.[channelName].consumer.autoBindDlq=true

spring.cloud.stream.rabbit.bindings.[channelName].consumer.republishToDlq=true

結論

Spring Cloud Stream 最大的方便之處,莫過於抽象了事件驅動的一些概念,對於訊息中介軟體的進一步封裝,可以做到程式碼層面對中介軟體的無感知,甚至於動態的切換中介軟體,切換topic。使得微服務開發的高度解耦,服務可以關注更多自己的業務流程。

相關文件