Spring Cloud Bus(訊息匯流排)(1)
訊息代理
訊息代理是一種訊息驗證、傳輸、路由的架構模式。它在應用程式之間起到通訊排程並最小化應用之間的依賴作用,使得應用程式可以高效地解耦通訊過程。訊息代理是一箇中間件產品,它的核心是一個訊息的路由程式,用來實現接受和分發訊息,並根據設定好的訊息處理流來轉發給正確的應用。它包括獨立的通訊和訊息傳遞協議,能夠實現組織內部和組織間的網路通訊。常用的場景有:
1.將訊息路由到一個或多個目的地。
2.訊息轉化為其他的表現方式。
3.執行訊息的聚集、訊息的分解,並將結果傳送到它們的目的地,然後重新組合響應返回給訊息使用者。
4.呼叫Web服務來檢索資料。
5.響應事件或錯誤。
6.使用釋出訂閱模式來提供內容或基於主題的訊息路由。
現成的有:
ActiveMQ
Kafka
RabbitMQ
RocketMQ
……
當前版本的Spring Cloud Bus僅支援RibbitMQ和Kafka。
RabbitMQ
基本概念
Broker:可以理解為訊息佇列伺服器的實體,他是一箇中間件應用,負載接受訊息生產者的訊息,然後將訊息傳送至訊息接受者或者其他的Broker。
Exchange:訊息交換機,是訊息第一個到達的地方,訊息通過它指定路由規則,分發到不同的訊息佇列中。
Queue:訊息佇列,訊息通過傳送和路由之後最終到達的地方,到達Queue的訊息即進入邏輯上等待消費的狀態。每個訊息都會被髮送到一個或多個佇列。
Binding:繫結,它的作用就是把Exchange和Queue按照路由規則繫結起來,也就是Exchange和Queue之間的虛擬連線。
Routing Key:路由關鍵字,Exchange根據這個關鍵字進行訊息投遞。
Virtual host:虛擬主機,它是對Broker的虛擬劃分,將消費者、生產者和它們依賴的AMQP相關結構進行隔離,一般都是為了安全考慮。比如在Broker中設定多個虛擬主機,對不同使用者進行許可權的分離。
Connection:連線,代表生產者、消費者、Broker之間進行通訊的物理網路。
Channel:訊息通道,用於連線生產者、消費者、Broker之間進行通訊的物理網路。
Producer:訊息生產者,製造訊息併發送訊息的程式。
Consumer:訊息消費者,接受訊息並處理訊息的程式。
訊息投遞到佇列的過程如下:
1.客戶端連線到訊息佇列伺服器,開啟一個Channel。
2.客戶端宣告一個Exchange,並設定相關屬性。
3.客戶端宣告一個Queue,並設定相關屬性。
4.客戶端使用Routing Key,在Exchange和Queue之間建立好繫結關係。
5.客戶端投遞訊息到Exchange。
6.Exchange接受到訊息後,根據訊息的Key和已設定的Binding,進行訊息路由,將訊息投遞到一個或多個Queue裡。
Exchange有三種類型:
1.Direct交換機:完全根據Key進行投遞。比如繫結時設定Routing Key為abc,那麼客戶端提交資訊時,只有設定了Key為abc才會被投遞到佇列。
2.Topic交換機:對Key進行模式匹配後進行投遞,可以使用#匹配一個或多個詞,符號*匹配正好一個詞。
3.Fanout交換機:不需要任何Key,它採取廣播的模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。
SpringBoot整合RabbitMQ
(在此之前需要先行安裝RabbitMQ,安裝:RabbitMQ安裝)
1.新建一個Spring Boot工程,命名為rabbitmq-hello。
2.在pom.xml中引入spring-boot-starter-amqp支援RabbitMQ。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-hello</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>rabbitmq-hello</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3.在application.properties中配置關於RabbitMQ的連線資訊和使用者資訊。
spring.application.name=rabbitmq-hello
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=SpringCloud
spring.rabbitmq.password=123456789
4.建立訊息生產者,通過注入AmqpTemplate介面的例項來實現訊息的傳送,AmqpTemplate介面定義了一套針對AMQP協議的基礎操作。
@Component
public class Sender {
@Autowired
private AmqpTemplate amqpTemplate;
public void send(){
String context="hello "+new Date();
System.out.println();
System.out.println("--------------------------------------------------------");
System.out.println("Sender:"+context);
System.out.println("--------------------------------------------------------");
System.out.println();
amqpTemplate.convertAndSend("hello",context);
}
}
5.建立訊息消費者Receiver。通過@RabbitListener註解定義該類對hello佇列的監聽,並用@RabbitHandler註解來指定對訊息的處理方法:
@Component
@RabbitListener(queues = "hello")
public class Receiver {
@RabbitHandler
public void process(String hello){
System.out.println();
System.out.println("--------------------------------------------------------");
System.out.println("Receiver:"+hello);
System.out.println("--------------------------------------------------------");
System.out.println();
}
}
建立RabbitMQ的配置類RabbitConfig用來配置佇列,交換器路由等資訊:
@Configuration
public class RabbitConfig {
@Bean
public Queue helloQueue(){
return new Queue("hello");
}
}
最後的目錄結構如下:
下面建立一個單元測試類用於呼叫訊息生產:
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqHelloApplicationTests {
@Autowired
private Sender sender;
@Test
public void contextLoads() {
}
@Test
public void hello(){
sender.send();
}
}
啟動應用主類後:
執行單元測試類:
可以看到它傳送了一條佇列訊息。
然後轉到應用主類的控制檯,可以看到訊息消費者對hello佇列的監聽程式執行了並輸出接收到的訊息。
Ok一個簡單的RabbitMQ與Spring Boot的程式就簡單整合了。
整合Spring Cloud Bus
1.在config-server和config-service中新增:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-bus-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>
在配置檔案中加上配置資訊:
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=SpringCloud
spring.rabbitmq.password=123456789
management.endpoints.web.exposure.include=*
然後修改github上的檔案後訪問:http://localhost:7001/bus/refresh
它會對檔案進行更新:
然後再訪問客戶端就可以發現配置已經更新了。
如果需要服務端對某個客戶端單個重新整理則需要這麼訪問:http://localhost:7001/bus/refresh?destination=configClient:7002
destination引數除了可以定位具體的例項之外還能用來定位具體的服務。如:/bus/refresh?destination=customers:**,該請求會觸發customers服務的所有例項進行重新整理。
參考《Spring Cloud 微服務實戰》