1. 程式人生 > >介紹Spring Cloud Stream與RabbitMQ整合

介紹Spring Cloud Stream與RabbitMQ整合

Spring Cloud Stream是一個建立在Spring Boot和Spring Integration之上的框架,有助於建立事件驅動或訊息驅動的微服務。在本文中,我們將通過一些簡單的例子來介紹Spring Cloud Stream的概念和構造。

1 Maven依賴

在開始之前,我們需要新增Spring Cloud Stream與RabbitMQ訊息中介軟體的依賴。

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId
>
</dependency>

同時為支援Junit單元測試,在pom.xml檔案中新增

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-test-support</artifactId>
    <scope>test</scope>
</dependency>

2 主要概念

微服務架構遵循“智慧端點和啞管道”的原則。端點之間的通訊由訊息中介軟體(如RabbitMQ或Apache Kafka)驅動。服務通過這些端點或通道釋出事件來進行通訊。

讓我們通過下面這個構建訊息驅動服務的基本範例,來看看Spring Cloud Stream框架的一些主要概念。

2.1 服務類

通過Spring Cloud Stream建立一個簡單的應用,從Input通道監聽訊息然後返回應答到Output通道。

@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(MyLoggerServiceApplication.class, args);
    }

    @StreamListener
(Processor.INPUT) @SendTo(Processor.OUTPUT) public LogMessage enrichLogMessage(LogMessage log) { return new LogMessage(String.format("[1]: %s", log.getMessage())); } }

註解@EnableBinding聲明瞭這個應用程式綁定了2個通道:INPUT和OUTPUT。這2個通道是在介面Processor中定義的(Spring Cloud Stream預設設定)。所有通道都是配置在一個具體的訊息中介軟體或繫結器中。

讓我們來看下這些概念的定義:

  • Bindings — 宣告輸入和輸出通道的介面集合。
  • Binder — 訊息中介軟體的實現,如Kafka或RabbitMQ
  • Channel — 表示訊息中介軟體和應用程式之間的通訊管道
  • StreamListeners — bean中的訊息處理方法,在中介軟體的MessageConverter特定事件中進行物件序列化/反序列化之後,將在通道上的訊息上自動呼叫訊息處理方法。
  • Message Schemas — 用於訊息的序列化和反序列化,這些模式可以靜態讀取或者動態載入,支援物件型別的演變。

將訊息釋出到指定目的地是由釋出訂閱訊息模式傳遞。釋出者將訊息分類為主題,每個主題由名稱標識。訂閱方對一個或多個主題表示興趣。中介軟體過濾訊息,將感興趣的主題傳遞給訂閱伺服器。訂閱方可以分組,消費者組是由組ID標識的一組訂戶或消費者,其中從主題或主題的分割槽中的訊息以負載均衡的方式遞送。

2.2 測試類

測試類是一個繫結器的實現,允許與通道互動和檢查訊息。讓我們向上面的enrichLogMessage 服務傳送一條訊息,並檢查響應中是否包含文字“[ 1 ]:”:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationIntegrationTest {
    @Autowired
    private Processor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseShouldUpdateText() {
        pipe.input().send(MessageBuilder.withPayload(new LogMessage("This is my message")).build());

        Object payload = messageCollector.forChannel(pipe.output()).poll().getPayload();

        assertEquals("[1]: This is my message", payload.toString());
    }
}

2.3 RabbitMQ配置

我們需要在工程src/main/resources目錄下的application.yml檔案裡增加RabbitMQ繫結器的配置。

spring:
  cloud:
    stream:
      bindings:
        input:
          destination: queue.log.messages
          binder: local_rabbit
          group: logMessageConsumers
        output:
          destination: queue.pretty.log.messages
          binder: local_rabbit
      binders:
        local_rabbit:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
                virtual-host: /

input繫結使用名為queue.log.messages的訊息交換機,output繫結使用名為queue.pretty.log.messages的訊息交換機。所有的繫結都使用名為local_rabbit的繫結器。
請注意,我們不需要預先建立RabbitmQ交換機或佇列。執行應用程式時,兩個交換機都會自動建立。

3 自定義通道

在上面的例子裡,我們使用Spring Cloud提供的Processor介面,這個介面有一個input通道和一個output通道。

如果我們想建立一些不同,比如說一個input通道和兩個output通道,可以新建一個自定義處理器。

public interface MyProcessor {
    String INPUT = "myInput";

    @Input
    SubscribableChannel myInput();

    @Output("myOutput")
    MessageChannel anOutput();

    @Output
    MessageChannel anotherOutput();
}

3.1 服務類

Spring將為我們提供這個介面的實現。通道的名稱可以通過使用註解來設定,比如@Output(“myOutput”)。如果沒有設定的話,Spring將使用方法名來作為通道名稱。因此這裡有三個通道:myInput, myOutput, anotherOutput。

現在我們可以增加一些路由規則,如果接收到的值小於10則走一個output通道;如果接收到的值大於等於10則走另一個output通道。

@Autowired
private MyProcessor processor;

@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
    if (val < 10) {
        processor.anOutput().send(message(val));
    } else {
        processor.anotherOutput().send(message(val));
    }
}

private static final <T> Message<T> message(T val) {
    return MessageBuilder.withPayload(val).build();
}

3.2 測試類

傳送不同的訊息,判斷返回值是否是通過不同的通道獲得。

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
@DirtiesContext
public class MultipleOutputsServiceApplicationIntegrationTest {
    @Autowired
    private MyProcessor pipe;

    @Autowired
    private MessageCollector messageCollector;

    @Test
    public void whenSendMessage_thenResponseIsInAOutput() {
        whenSendMessage(1);
        thenPayloadInChannelIs(pipe.anOutput(), 1);
    }

    @Test
    public void whenSendMessage_thenResponseIsInAnotherOutput() {
        whenSendMessage(11);
        thenPayloadInChannelIs(pipe.anotherOutput(), 11);
    }

    private void whenSendMessage(Integer val) {
        pipe.myInput().send(MessageBuilder.withPayload(val).build());
    }

    private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
        Object payload = messageCollector.forChannel(channel).poll().getPayload();
        assertEquals(expectedValue, payload);
    }
}

4 根據條件分派

使用@StreamListener 註釋,我們還可以使用自定義的SpEL表示式來過濾使用者期望的訊息。下面這個例子,我們使用條件排程將訊息路由到不同的輸出。

@Autowired
private MyProcessor processor;

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
    processor.anOutput().send(message(val));
}

@StreamListener(
  target = MyProcessor.INPUT, 
  condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
    processor.anotherOutput().send(message(val));
}

5 總結