1. 程式人生 > >Spring Cloud Bus(訊息匯流排)(1)

Spring Cloud Bus(訊息匯流排)(1)

訊息代理

訊息代理是一種訊息驗證、傳輸、路由的架構模式。它在應用程式之間起到通訊排程並最小化應用之間的依賴作用,使得應用程式可以高效地解耦通訊過程。訊息代理是一箇中間件產品,它的核心是一個訊息的路由程式,用來實現接受和分發訊息,並根據設定好的訊息處理流來轉發給正確的應用。它包括獨立的通訊和訊息傳遞協議,能夠實現組織內部和組織間的網路通訊。常用的場景有:

1.將訊息路由到一個或多個目的地。

2.訊息轉化為其他的表現方式。

3.執行訊息的聚集、訊息的分解,並將結果傳送到它們的目的地,然後重新組合響應返回給訊息使用者。

4.呼叫Web服務來檢索資料。

5.響應事件或錯誤。

6.使用釋出訂閱模式來提供內容或基於主題的訊息路由。

現成的有:

ActiveMQ

Kafka

RabbitMQ

RocketMQ

……

當前版本的Spring Cloud Bus僅支援RibbitMQ和Kafka。

RabbitMQ

基本概念

Broker:可以理解為訊息佇列伺服器的實體,他是一箇中間件應用,負載接受訊息生產者的訊息,然後將訊息傳送至訊息接受者或者其他的Broker。

Exchange:訊息交換機,是訊息第一個到達的地方,訊息通過它指定路由規則,分發到不同的訊息佇列中。

Queue:訊息佇列,訊息通過傳送和路由之後最終到達的地方,到達Queue的訊息即進入邏輯上等待消費的狀態。每個訊息都會被髮送到一個或多個佇列。

Binding:繫結,它的作用就是把Exchange和Queue按照路由規則繫結起來,也就是Exchange和Queue之間的虛擬連線。

Routing Key:路由關鍵字,Exchange根據這個關鍵字進行訊息投遞。

Virtual host:虛擬主機,它是對Broker的虛擬劃分,將消費者、生產者和它們依賴的AMQP相關結構進行隔離,一般都是為了安全考慮。比如在Broker中設定多個虛擬主機,對不同使用者進行許可權的分離。

Connection:連線,代表生產者、消費者、Broker之間進行通訊的物理網路。

Channel:訊息通道,用於連線生產者、消費者、Broker之間進行通訊的物理網路。

Producer:訊息生產者,製造訊息併發送訊息的程式。

Consumer:訊息消費者,接受訊息並處理訊息的程式。

訊息投遞到佇列的過程如下:

1.客戶端連線到訊息佇列伺服器,開啟一個Channel。

2.客戶端宣告一個Exchange,並設定相關屬性。

3.客戶端宣告一個Queue,並設定相關屬性。

4.客戶端使用Routing Key,在Exchange和Queue之間建立好繫結關係。

5.客戶端投遞訊息到Exchange。

6.Exchange接受到訊息後,根據訊息的Key和已設定的Binding,進行訊息路由,將訊息投遞到一個或多個Queue裡。

Exchange有三種類型:

1.Direct交換機:完全根據Key進行投遞。比如繫結時設定Routing Key為abc,那麼客戶端提交資訊時,只有設定了Key為abc才會被投遞到佇列。

2.Topic交換機:對Key進行模式匹配後進行投遞,可以使用#匹配一個或多個詞,符號*匹配正好一個詞。

3.Fanout交換機:不需要任何Key,它採取廣播的模式,一個訊息進來時,投遞到與該交換機繫結的所有佇列。

SpringBoot整合RabbitMQ

(在此之前需要先行安裝RabbitMQ,安裝:RabbitMQ安裝

1.新建一個Spring Boot工程,命名為rabbitmq-hello。

2.在pom.xml中引入spring-boot-starter-amqp支援RabbitMQ。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-hello</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>rabbitmq-hello</name>
    <description>Demo project for Spring Boot</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.5.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>


</project>

3.在application.properties中配置關於RabbitMQ的連線資訊和使用者資訊。

spring.application.name=rabbitmq-hello

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=SpringCloud
spring.rabbitmq.password=123456789

4.建立訊息生產者,通過注入AmqpTemplate介面的例項來實現訊息的傳送,AmqpTemplate介面定義了一套針對AMQP協議的基礎操作。

@Component
public class Sender {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send(){
        String context="hello "+new Date();
        System.out.println();
        System.out.println("--------------------------------------------------------");
        System.out.println("Sender:"+context);
        System.out.println("--------------------------------------------------------");
        System.out.println();
        amqpTemplate.convertAndSend("hello",context);
    }
}

5.建立訊息消費者Receiver。通過@RabbitListener註解定義該類對hello佇列的監聽,並用@RabbitHandler註解來指定對訊息的處理方法:

@Component
@RabbitListener(queues = "hello")
public class Receiver {

    @RabbitHandler
    public void process(String hello){
        System.out.println();
        System.out.println("--------------------------------------------------------");
        System.out.println("Receiver:"+hello);
        System.out.println("--------------------------------------------------------");
        System.out.println();
    }

}

建立RabbitMQ的配置類RabbitConfig用來配置佇列,交換器路由等資訊:

@Configuration
public class RabbitConfig {

    @Bean
    public Queue helloQueue(){
        return new Queue("hello");
    }
}

最後的目錄結構如下:

下面建立一個單元測試類用於呼叫訊息生產:

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqHelloApplicationTests {

    @Autowired
    private Sender sender;

    @Test
    public void contextLoads() {
    }

    @Test
    public void hello(){
        sender.send();
    }

}

啟動應用主類後:

執行單元測試類:

可以看到它傳送了一條佇列訊息。

然後轉到應用主類的控制檯,可以看到訊息消費者對hello佇列的監聽程式執行了並輸出接收到的訊息。

Ok一個簡單的RabbitMQ與Spring Boot的程式就簡單整合了。

整合Spring Cloud Bus

1.在config-server和config-service中新增:

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>

在配置檔案中加上配置資訊:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=SpringCloud
spring.rabbitmq.password=123456789

management.endpoints.web.exposure.include=*

然後修改github上的檔案後訪問:http://localhost:7001/bus/refresh

它會對檔案進行更新:

然後再訪問客戶端就可以發現配置已經更新了。

如果需要服務端對某個客戶端單個重新整理則需要這麼訪問:http://localhost:7001/bus/refresh?destination=configClient:7002

destination引數除了可以定位具體的例項之外還能用來定位具體的服務。如:/bus/refresh?destination=customers:**,該請求會觸發customers服務的所有例項進行重新整理。

參考《Spring Cloud 微服務實戰》