介紹Spring Cloud Stream與RabbitMQ整合
Spring Cloud Stream是一個建立在Spring Boot和Spring Integration之上的框架,有助於建立事件驅動或訊息驅動的微服務。在本文中,我們將通過一些簡單的例子來介紹Spring Cloud Stream的概念和構造。
1 Maven依賴
在開始之前,我們需要新增Spring Cloud Stream與RabbitMQ訊息中介軟體的依賴。
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId >
</dependency>
同時為支援Junit單元測試,在pom.xml檔案中新增
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
2 主要概念
微服務架構遵循“智慧端點和啞管道”的原則。端點之間的通訊由訊息中介軟體(如RabbitMQ或Apache Kafka)驅動。服務通過這些端點或通道釋出事件來進行通訊。
讓我們通過下面這個構建訊息驅動服務的基本範例,來看看Spring Cloud Stream框架的一些主要概念。
2.1 服務類
通過Spring Cloud Stream建立一個簡單的應用,從Input通道監聽訊息然後返回應答到Output通道。
@SpringBootApplication
@EnableBinding(Processor.class)
public class MyLoggerServiceApplication {
public static void main(String[] args) {
SpringApplication.run(MyLoggerServiceApplication.class, args);
}
@StreamListener (Processor.INPUT)
@SendTo(Processor.OUTPUT)
public LogMessage enrichLogMessage(LogMessage log) {
return new LogMessage(String.format("[1]: %s", log.getMessage()));
}
}
註解@EnableBinding聲明瞭這個應用程式綁定了2個通道:INPUT和OUTPUT。這2個通道是在介面Processor中定義的(Spring Cloud Stream預設設定)。所有通道都是配置在一個具體的訊息中介軟體或繫結器中。
讓我們來看下這些概念的定義:
- Bindings — 宣告輸入和輸出通道的介面集合。
- Binder — 訊息中介軟體的實現,如Kafka或RabbitMQ
- Channel — 表示訊息中介軟體和應用程式之間的通訊管道
- StreamListeners — bean中的訊息處理方法,在中介軟體的MessageConverter特定事件中進行物件序列化/反序列化之後,將在通道上的訊息上自動呼叫訊息處理方法。
- Message Schemas — 用於訊息的序列化和反序列化,這些模式可以靜態讀取或者動態載入,支援物件型別的演變。
將訊息釋出到指定目的地是由釋出訂閱訊息模式傳遞。釋出者將訊息分類為主題,每個主題由名稱標識。訂閱方對一個或多個主題表示興趣。中介軟體過濾訊息,將感興趣的主題傳遞給訂閱伺服器。訂閱方可以分組,消費者組是由組ID標識的一組訂戶或消費者,其中從主題或主題的分割槽中的訊息以負載均衡的方式遞送。
2.2 測試類
測試類是一個繫結器的實現,允許與通道互動和檢查訊息。讓我們向上面的enrichLogMessage 服務傳送一條訊息,並檢查響應中是否包含文字“[ 1 ]:”:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MyLoggerServiceApplication.class)
@DirtiesContext
public class MyLoggerApplicationIntegrationTest {
@Autowired
private Processor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseShouldUpdateText() {
pipe.input().send(MessageBuilder.withPayload(new LogMessage("This is my message")).build());
Object payload = messageCollector.forChannel(pipe.output()).poll().getPayload();
assertEquals("[1]: This is my message", payload.toString());
}
}
2.3 RabbitMQ配置
我們需要在工程src/main/resources目錄下的application.yml檔案裡增加RabbitMQ繫結器的配置。
spring:
cloud:
stream:
bindings:
input:
destination: queue.log.messages
binder: local_rabbit
group: logMessageConsumers
output:
destination: queue.pretty.log.messages
binder: local_rabbit
binders:
local_rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
input繫結使用名為queue.log.messages的訊息交換機,output繫結使用名為queue.pretty.log.messages的訊息交換機。所有的繫結都使用名為local_rabbit的繫結器。
請注意,我們不需要預先建立RabbitmQ交換機或佇列。執行應用程式時,兩個交換機都會自動建立。
3 自定義通道
在上面的例子裡,我們使用Spring Cloud提供的Processor介面,這個介面有一個input通道和一個output通道。
如果我們想建立一些不同,比如說一個input通道和兩個output通道,可以新建一個自定義處理器。
public interface MyProcessor {
String INPUT = "myInput";
@Input
SubscribableChannel myInput();
@Output("myOutput")
MessageChannel anOutput();
@Output
MessageChannel anotherOutput();
}
3.1 服務類
Spring將為我們提供這個介面的實現。通道的名稱可以通過使用註解來設定,比如@Output(“myOutput”)。如果沒有設定的話,Spring將使用方法名來作為通道名稱。因此這裡有三個通道:myInput, myOutput, anotherOutput。
現在我們可以增加一些路由規則,如果接收到的值小於10則走一個output通道;如果接收到的值大於等於10則走另一個output通道。
@Autowired
private MyProcessor processor;
@StreamListener(MyProcessor.INPUT)
public void routeValues(Integer val) {
if (val < 10) {
processor.anOutput().send(message(val));
} else {
processor.anotherOutput().send(message(val));
}
}
private static final <T> Message<T> message(T val) {
return MessageBuilder.withPayload(val).build();
}
3.2 測試類
傳送不同的訊息,判斷返回值是否是通過不同的通道獲得。
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(classes = MultipleOutputsServiceApplication.class)
@DirtiesContext
public class MultipleOutputsServiceApplicationIntegrationTest {
@Autowired
private MyProcessor pipe;
@Autowired
private MessageCollector messageCollector;
@Test
public void whenSendMessage_thenResponseIsInAOutput() {
whenSendMessage(1);
thenPayloadInChannelIs(pipe.anOutput(), 1);
}
@Test
public void whenSendMessage_thenResponseIsInAnotherOutput() {
whenSendMessage(11);
thenPayloadInChannelIs(pipe.anotherOutput(), 11);
}
private void whenSendMessage(Integer val) {
pipe.myInput().send(MessageBuilder.withPayload(val).build());
}
private void thenPayloadInChannelIs(MessageChannel channel, Integer expectedValue) {
Object payload = messageCollector.forChannel(channel).poll().getPayload();
assertEquals(expectedValue, payload);
}
}
4 根據條件分派
使用@StreamListener 註釋,我們還可以使用自定義的SpEL表示式來過濾使用者期望的訊息。下面這個例子,我們使用條件排程將訊息路由到不同的輸出。
@Autowired
private MyProcessor processor;
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload < 10")
public void routeValuesToAnOutput(Integer val) {
processor.anOutput().send(message(val));
}
@StreamListener(
target = MyProcessor.INPUT,
condition = "payload >= 10")
public void routeValuesToAnotherOutput(Integer val) {
processor.anotherOutput().send(message(val));
}