1. 程式人生 > >springcloud系列—Stream—第8章-1: Spring Cloud Stream 訊息驅動

springcloud系列—Stream—第8章-1: Spring Cloud Stream 訊息驅動

參考:http://blog.didispace.com/spring-cloud-starter-dalston-7-2/

目錄

快速入門


Spring Cloud Stream是一個用來為微服務應用構建訊息驅動能力的框架。它可以基於Spring Boot來建立獨立的、可用於生產的Spring應用程式。它通過使用Spring Integration來連線訊息代理中介軟體以實現訊息事件驅動的微服務應用。Spring Cloud Stream為一些供應商的訊息中介軟體產品提供了個性化的自動化配置實現,並且引入了釋出-訂閱、消費組以及訊息分割槽這三個核心概念。簡單的說,Spring Cloud Stream本質上就是整合了Spring Boot和Spring Integration,實現了一套輕量級的訊息驅動的微服務框架。通過使用Spring Cloud Stream,可以有效地簡化開發人員對訊息中介軟體的使用複雜度,讓系統開發人員可以有更多的精力關注於核心業務邏輯的處理。由於Spring Cloud Stream基於Spring Boot實現,所以它秉承了Spring Boot的優點,實現了自動化配置的功能幫忙我們可以快速的上手使用,但是目前為止Spring Cloud Stream只支援下面兩個著名的訊息中介軟體的自動化配置:

  • RabbitMQ
  • Kafka

快速入門

下面我們通過構建一個簡單的示例來對Spring Cloud Stream有一個初步認識。該示例主要目標是構建一個基於Spring Boot的微服務應用,這個微服務應用將通過使用訊息中介軟體RabbitMQ來接收訊息並將訊息列印到日誌中。所以,在進行下面步驟之前請先確認已經在本地安裝了RabbitMQ,具體安裝步驟請參考此文

構建一個Spring Cloud Stream消費者

  • 建立一個基礎的Spring Boot工程,命名為:stream-hello

  • 編輯pom.xml中的依賴關係,引入Spring Cloud Stream對RabbitMQ的支援,具體如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.9.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>     
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Dalston.SR4</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>
  • 建立用於接收來自RabbitMQ訊息的消費者SinkReceiver,具體如下:
@EnableBinding(Sink.class)
public class SinkReceiver {

    private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class);

    @StreamListener(Sink.INPUT)
    public void receive(Object payload) {
        logger.info("Received: " + payload);
    }

}
  • 建立應用主類,這裡同其他Spring Boot一樣,沒有什麼特別之處,具體如下:
@SpringBootApplication
public class SinkApplication {

    public static void main(String[] args) {
        SpringApplication.run(SinkApplication.class, args);
    }

}

到這裡,我們快速入門示例的編碼任務就已經完成了。下面我們分別啟動RabbitMQ以及該Spring Boot應用,然後做下面的試驗,看看它們是如何運作的。

手工測試驗證

  • 我們先來看一下Spring Boot應用的啟動日誌。
...
INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder   : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input
INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: [email protected] [delegate=amqp://[email protected]:5672/]
INFO 16272 --- [main] o.s.integration.channel.DirectChannel    : Channel 'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s).
INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter      : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A
...

從上面的日誌內容中,我們可以獲得以下資訊:

  • 使用guest使用者建立了一個指向127.0.0.1:5672位置的RabbitMQ連線,在RabbitMQ的控制檯中我們也可以發現它。

  • 聲明瞭一個名為input.anonymous.Y8VsFILmSC27eS5StsXp6A的佇列,並通過RabbitMessageChannelBinder將自己繫結為它的消費者。這些資訊我們也能在RabbitMQ的控制檯中發現它們。

下面我們可以在RabbitMQ的控制檯中進入input.anonymous.Y8VsFILmSC27eS5StsXp6A佇列的管理頁面,通過Publish Message功能來發送一條訊息到該佇列中。

此時,我們可以在當前啟動的Spring Boot應用程式的控制檯中看到下面的內容:

INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication           : Received: [[email protected]

我們可以發現在應用控制檯中輸出的內容就是SinkReceiverreceive方法定義的,而輸出的具體內容則是來自訊息佇列中獲取的物件。這裡由於我們沒有對訊息進行序列化,所以輸出的只是該物件的引用,在後面的小節中我們會詳細介紹接收訊息後的處理。

在順利完成上面快速入門的示例後,我們簡單解釋一下上面的步驟是如何將我們的Spring Boot應用連線上RabbitMQ來消費訊息以實現訊息驅動業務邏輯的。

首先,我們對Spring Boot應用做的就是引入spring-cloud-starter-stream-rabbit依賴,該依賴包是Spring Cloud Stream對RabbitMQ支援的封裝,其中包含了對RabbitMQ的自動化配置等內容。從下面它定義的依賴關係中,我們還可以知道它等價於spring-cloud-stream-binder-rabbit依賴。

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
    </dependency>
</dependencies>

接著,我們再來看看這裡用到的幾個Spring Cloud Stream的核心註解,它們都被定義在SinkReceiver中:

  • @EnableBinding,該註解用來指定一個或多個定義了@Input@Output註解的介面,以此實現對訊息通道(Channel)的繫結。在上面的例子中,我們通過@EnableBinding(Sink.class)綁定了Sink介面,該介面是Spring Cloud Stream中預設實現的對輸入訊息通道繫結的定義,它的原始碼如下:
public interface Sink {

    String INPUT = "input";

    @Input(Sink.INPUT)
    SubscribableChannel input();

}

它通過@Input註解綁定了一個名為input的通道。除了Sink之外,Spring Cloud Stream還預設實現了繫結output通道的Source介面,還有結合了SinkSourceProcessor介面,實際使用時我們也可以自己通過@Input@Output註解來定義繫結訊息通道的介面。當我們需要為@EnableBinding指定多個介面來繫結訊息通道的時候,可以這樣定義:@EnableBinding(value = {Sink.class, Source.class})

  • @StreamListener:該註解主要定義在方法上,作用是將被修飾的方法註冊為訊息中介軟體上資料流的事件監聽器,註解中的屬性值對應了監聽的訊息通道名。在上面的例子中,我們通過@StreamListener(Sink.INPUT)註解將receive方法註冊為對input訊息通道的監聽處理器,所以當我們在RabbitMQ的控制頁面中釋出訊息的時候,receive方法會做出對應的響應動作。

編寫消費訊息的單元測試用例

上面我們通過RabbitMQ的控制檯完成了傳送訊息來驗證了訊息消費程式的功能,雖然這種方法比較low,但是通過上面的步驟,相信大家對RabbitMQ和Spring Cloud Stream的訊息消費已經有了一些基礎的認識。下面我們通過編寫生產訊息的單元測試用例來完善我們的入門內容。

  • 在上面建立的工程中建立單元測試類:
@RunWith(SpringRunner.class)
@EnableBinding(value = {SinkApplicationTests.SinkSender.class})
public class SinkApplicationTests {

    @Autowired
    private SinkSender sinkSender;

    @Test
    public void sinkSenderTester() {
        sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build());
    }

    public interface SinkSender {

        String OUTPUT = "input";

        @Output(SinkSender.OUTPUT)
        MessageChannel output();

    }

}
  • 在應用了上面的訊息消費者程式之後,執行這裡定義的單元測試程式,我們馬上就能在訊息消費者的控制檯中收到下面的內容:
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver        : Received: produce a message :http://blog.didispace.com

在上面的單元測試中,我們通過@Output(SinkSender.OUTPUT)定義了一個輸出通過,而該輸出通道的名稱為input,與前文中的Sink中定義的消費通道同名,所以這裡的單元測試與前文的消費者程式組成了一對生產者與消費者。到這裡,本文的內容就次結束,如果您能夠獨立的完成上面的例子,那麼對於Spring Cloud Stream的基礎使用算是入門了。