訊息驅動式微服務:Spring Cloud Stream & RabbitMQ
1. 概述
在本文中,我們將向您介紹Spring Cloud Stream
,這是一個用於構建訊息驅動的微服務應用程式的框架,這些應用程式由一個常見的訊息傳遞代理(如RabbitMQ
、Apache Kafka
等)連線。
Spring Cloud Stream
構建在現有Spring框架(如Spring Messaging
和Spring Integration
)之上。儘管這些框架經過了實戰測試,工作得非常好,但是實現與使用的message broker
緊密耦合。此外,有時對某些用例進行擴充套件是困難的。
Spring Cloud Stream
背後的想法是一個非常典型的Spring Boot
抽象地講,讓Spring根據配置和依賴關係管理在執行時找出實現自動注入
。這意味著您可以通過更改依賴項和配置檔案來更改message broker
。可以在這裡找到目前已經支援的各種訊息代理。
本文將使用RabbitMQ
作為message broker
。在此之前,讓我們瞭解一下broker
(代理)的一些基本概念,以及為什麼要在面向微服務的體系架構中需要它。
2. 微服務中的訊息
在微服務體系架構中,我們有許多相互通訊以完成請求的小型應用程式—它們的主要優點之一是改進了的可伸縮性。一個請求從多個下游微服務傳遞到完成是很常見的。例如,假設我們有一個Service-A
Service-B
和Service-C
來完成一個請求:[外鏈圖片轉存失敗(img-jzvHHRXw-1562549429195)(https://user-gold-cdn.xitu.io/2019/7/7/16bccd47c4051b28?w=511&h=347&f=png&s=11713)]
是的,還會有其他元件,比如Spring Cloud Eureka
、Spring Cloud Zuul
等等,但我們還是專注關心這類架構的特有問題。
假設由於某種原因Service-B
需要更多的時間來響應。也許它正在執行I/O操作
或長時間的DB事務
,或者進一步呼叫其它導致Service-B
現在,我們可以啟動更多的Service-B
例項來解決這個問題,這樣很好,但是Service-A
實際上是響應很快的,它需要等待Service-B
的響應來進一步處理。這將導致Service-A
無法接收更多的請求,這意味著我們還必須啟動Service-A
的多個例項。
另一種方法解決類似情況的是使用事件驅動的微服務體系架構。這基本上意味著Service-A
不直接通過HTTP
呼叫Service-B
或Service-C
,而是將請求或事件釋出給message broker
(訊息代理)。Service-B
和Service-C
將成為message broker
(訊息代理)上此事件的訂閱者。
與依賴HTTP呼叫的傳統微服務體系架構相比,這有許多優點:
- 提高可伸縮性和可靠性——現在我們知道哪些服務是整個應用程式中的真正瓶頸。
- 鼓勵鬆散耦合——
Service-A
不需要了解Service-B
和Service-C
。它只需要連線到message broker
併發布事件。事件如何進一步編排取決於代理設定。通過這種方式,Service-A
可以獨立地執行,這是微服務的核心概念之一。 - 與遺留系統互動——通常我們不能將所有東西都移動到一個新的技術堆疊中。我們仍然必須使用遺留系統,雖然速度很慢,但是很可靠。
3. RabbitMQ
高階訊息佇列協議(AMQP)
是RabbitMQ
用於訊息傳遞的協議。雖然RabbitMQ
支援其他一些協議,但是AMQP
由於相容性和它提供的大量特性而更受歡迎。
3.1 RabbitMQ架構設計
因此釋出者將訊息釋出到RabbitMQ
中稱為Exchange
(交換器)。Exchange
(交換器)接收訊息並將其路由到一個或多個Queues
(佇列)。路由演算法依賴於Exchange
(交換器)型別和routing
(路由)key/header(與訊息一起傳遞)。將Exchange
(交換器)連線到Queues
(佇列)的這些規則稱為bindings
(繫結)。
繫結可以有4種類型:
- Direct: 它根據
routing key
(路由鍵)將Exchange
(交換器)型別直接路由到特定的Queues
(佇列)。 - Fanout:它將訊息路由到繫結
Exchange
(交換器)中的所有Queues
(佇列)。 - Topic:它根據完全匹配或部分據
routing key
(路由鍵)匹配將訊息路由到(0、1或更多)的Queues
(佇列)。 - Headers:它類似於
Topic
(主題)交換型別,但是它是基routing header
(路由頭)而不是routing key
(路由鍵)來路由的。
來源: https://www.cloudamqp.com/
通過Exchange
(交換器)和Queues
(佇列)釋出和消費訊息的整個過程是通過一個Channel
(通道)完成的。
有關路由的詳細資訊,請訪問此連結。
3.2 RabbitMQ 設定
3.2.1 安裝
我們可以從這裡下載並安裝基於我們的作業系統的二進位制檔案。
然而,在本文中,我們將使用cloudamqp.com
提供的免費雲安裝。只需註冊服務並登入即可。
在主儀表板上單擊建立新例項
:
然後給你的例項起個名字,然後進入下一步:
然後選擇一個可用區:
最後,檢視例項資訊,點選右下角的建立例項
:
就是這樣。現在在雲中運行了一個RabbitMQ
例項。有關例項的更多資訊,請轉到您的儀表板並單擊新建立的例項
:
我們可以看到我們可以訪問RabbitMQ例項的主機,比如從我們的專案連線所需的使用者名稱和密碼:
我們將在Spring應用程式中使用AMQP URL
連線到這個例項,所以請在某個地方記下它。
您還可以通過單擊左上角的RabbitMQ manager
來檢視管理器控制檯。這將採用它來管理的您的RabbitMQ
例項。
Project 配置
現在我們的設定已經準備好了,讓我們建立我們的服務:
- cloud-stream-producer-rabbitmq: 作為一個釋出者,將訊息推送到
RabbitMQ
- cloud-stream-consumer-rabbitmq: 消費者消費訊息
使用Spring Initializr
建立一個腳手架專案。這將是我們的producer
專案,我們將使用REST
端點發布訊息。
選擇您喜歡的Spring Boot
版本,新增Web
和Cloud Stream
依賴項,生成Maven
專案:
注意:
請注意cloud-stream
依賴項。這也需要像RabbitMQ
、Kafka
等繫結器依賴項才能工作。
由於我們將使用RabbitMQ
,新增以下Maven
依賴項:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
或者,我們也可以將兩者結合起來使用spring-cloud-starter-stream-rabbit
:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
使用同樣的方法,建立消費者專案,但僅使用spring-cloud-starter-stream-rabbit
依賴項。
4. 建立生產者
如前所述,將訊息從釋出者傳遞到佇列的整個過程是通過通道完成的。因此,讓我們建立一個HelloBinding
介面,其中包含我們的訊息機制greetingChannel
:
interface HelloBinding {
@Output("greetingChannel")
MessageChannel greeting();
}
因為這將釋出訊息,所以我們使用@Output
註解。方法名可以是我們想要的任意名稱,當然,我們可以在一個介面中有多個Channel
(通道)。
現在,讓我們建立一個REST
,它將訊息推送到這個Channel
(通道)
@RestController
public class ProducerController {
private MessageChannel greet;
public ProducerController(HelloBinding binding) {
greet = binding.greeting();
}
@GetMapping("/greet/{name}")
public void publish(@PathVariable String name) {
String greeting = "Hello, " + name + "!";
Message<String> msg = MessageBuilder.withPayload(greeting)
.build();
this.greet.send(msg);
}
}
上面,我們建立了一個ProducerController
類,它有一個MessageChannel
型別的屬性 greet
。這是通過我們前面宣告的方法在建構函式中初始化的。
注意: 我們可以用簡潔的方式做同樣的事情,但是我們使用不同的名稱來讓您更清楚地瞭解事物是如何連線的。
然後,我們有一個簡單的REST
介面,它接收PathVariable
的name
,並使用MessageBuilder
建立一個String
型別的訊息。最後,我們使用MessageChannel
上的.send()
方法來發布訊息。
現在,我們將在的主類中新增@EnableBinding
註解,傳入HelloBinding
告訴Spring
載入。
@EnableBinding(HelloBinding.class)
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
最後,我們必須告訴Spring
如何連線到RabbitMQ
(通過前面的AMQP URL
),並將greetingChannel
連線到一可用的消費者。
這兩個都是在application.properties
中定義的:
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination = greetings
server.port=8080
5. 建立消費者
現在,我們需要監聽之前建立的通道greetingChannel
。讓我們為它建立一個繫結:
public interface HelloBinding {
String GREETING = "greetingChannel";
@Input(GREETING)
SubscribableChannel greeting();
}
與生產者繫結的兩個非常明顯區別。因為我們正在消費訊息,所以我們使用SubscribableChannel
和@Input
註解連線到greetingChannel
,訊息資料將被推送這裡。
現在,讓我們建立處理資料的方法:
@EnableBinding(HelloBinding.class)
public class HelloListener {
@StreamListener(target = HelloBinding.GREETING)
public void processHelloChannelGreeting(String msg) {
System.out.println(msg);
}
}
在這裡,我們建立了一個HelloListener
類,在processHelloChannelGreeting
方法上新增@StreamListener
註解。這個方法需要一個字串作為引數,我們剛剛在控制檯列印了這個引數。我們還在類新增@EnableBinding
啟用了HelloBinding
。
同樣,我們在這裡使用@EnableBinding
,而不是主類,以便告訴我們如何使用。
看看我們的主類,我們沒有任何修改:
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
在application.properties
配置檔案中,我們需要定義與生產者一樣的屬性,除了修改埠之外
spring.rabbitmq.addresses=<amqp url>
spring.cloud.stream.bindings.greetingChannel.destination=greetings
server.port=9090
6. 全部測試
讓我們同時啟動生產者和消費者服務。首先,讓我們通過點選端點http://localhost:8080/greet/john
來生產訊息。
在消費者日誌中看到訊息內容:
我們使用以下命令啟動另一個消費者服務例項(在另一個埠(9091)上):
$ mvn spring-boot:run -Dserver.port=9091
現在,當我們點選生產者REST
端點生產訊息時,我們看到兩個消費者都收到了訊息:
這可能是我們在一些用例中想要的。但是,如果我們只想讓一個消費者消費一條訊息呢?為此,我們需要在application.properties
中建立一個消費者組。消費者的配置檔案:
spring.cloud.stream.bindings.greetingChannel.group = greetings-group
現在,再次在不同的埠上執行消費者的2個例項,並通過生產者生產訊息再次檢視:
這一切也可以在RabbitMQ
管理器控制檯看到:
7. 結論
在本文中,我們解釋了訊息傳遞的主要概念、它在微服務中的角色以及如何使用Spring Cloud Stream
實現它。我們使用RabbitMQ
作為訊息代理,但是我們也可以使用其他流行的代理,比如Kafka
,只需更改配置和依賴項。
與往常一樣,本文使用的示例程式碼可以在GitHub獲得完整的原始碼。
原文:https://stackabuse.com/spring-cloud-stream-with-rabbitmq-message-driven-microservices/
作者:Dhananjay Singh
譯者:李東