1. 程式人生 > >Spring Boot (十三): Spring Boot 整合 RabbitMQ

Spring Boot (十三): Spring Boot 整合 RabbitMQ

1. 前言

RabbitMQ 是一個訊息佇列,說到訊息佇列,大家可能多多少少有聽過,它主要的功能是用來實現應用服務的非同步與解耦,同時也能起到削峰填谷、訊息分發的作用。

訊息佇列在比較主要的一個作用是用來做應用服務的解耦,訊息從訊息的生產者傳遞到訊息佇列,消費者從訊息佇列中獲取訊息並進行消費,生產者不需要管是誰在消費訊息,消費者也無需關注訊息是由誰來生產的。在分散式的系統中,訊息佇列也會被用在其他地方,比如分散式事務的支援,代表如阿里開源的 RocketMQ 。

當然,我們本篇文章的主角還是 RabbitMQ 。

2. RabbitMQ 介紹

RabbitMQ 是實現 AMQP(高階訊息佇列協議)的訊息中介軟體的一種,最初起源於金融系統,用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。 RabbitMQ 主要是為了實現系統之間的雙向解耦而實現的。當生產者大量產生資料時,消費者無法快速消費,那麼需要一箇中間層。儲存這個資料。

AMQP,即 Advanced Message Queuing Protocol,高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,反之亦然。AMQP 的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

RabbitMQ 是一個開源的 AMQP 實現,伺服器端用Erlang語言編寫,支援多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支援 AJAX。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面表現不俗。

3. 概念介紹

在普通的訊息佇列的設計中,一般會有這麼幾個概念:生產者、消費者和我們的佇列。但是在 RabbitMQ 中,中間增加了一層,叫交換機(Exchange),這樣,訊息的投遞就不由生產者來決定投遞至哪個佇列,而訊息是直接投遞至交換機的,由交換機根據排程策略來決定這個訊息需要投遞到哪個佇列。如圖:

  • 左側的 P 代表訊息的生產者
  • 紫色的 X 代表交換機
  • 右側紅色的代表隊列

4. 交換機(Exchange)

那麼為什麼我們需要 Exchange 而不是直接將訊息傳送至佇列呢?

AMQP 協議中的核心思想就是生產者和消費者的解耦,生產者從不直接將訊息傳送給佇列。生產者通常不知道是否一個訊息會被髮送到佇列中,只是將訊息傳送到一個交換機。先由 Exchange 來接收,然後 Exchange 按照特定的策略轉發到 Queue 進行儲存。

Exchange 收到訊息時,他是如何知道需要傳送至哪些 Queue 呢?這裡就需要了解 Binding 和 RoutingKey 的概念:

Binding 表示 Exchange 與 Queue 之間的關係,我們也可以簡單的認為佇列對該交換機上的訊息感興趣,繫結可以附帶一個額外的引數 RoutingKey。Exchange 就是根據這個 RoutingKey 和當前 Exchange 所有繫結的 Binding 做匹配,如果滿足匹配,就往 Exchange 所繫結的 Queue 傳送訊息,這樣就解決了我們向 RabbitMQ 傳送一次訊息,可以分發到不同的 Queue。RoutingKey 的意義依賴於交換機的型別。

下面就來了解一下 Exchange 的三種主要型別:Fanout、Direct 和 Topic。

4.1 Direct Exchange

Direct Exchange 是 RabbitMQ 預設的 Exchange,完全根據 RoutingKey 來路由訊息。設定 Exchange 和 Queue 的 Binding 時需指定 RoutingKey(一般為 Queue Name),發訊息時也指定一樣的 RoutingKey,訊息就會被路由到對應的Queue。

4.2 Topic Exchange

Topic Exchange 和 Direct Exchange 類似,也需要通過 RoutingKey 來路由訊息,區別在於Direct Exchange 對 RoutingKey 是精確匹配,而 Topic Exchange 支援模糊匹配。分別支援 *# 萬用字元,* 表示匹配一個單詞, # 則表示匹配沒有或者多個單詞。

4.3 Headers Exchange

Headers Exchange 會忽略 RoutingKey 而根據訊息中的 Headers 和建立繫結關係時指定的 Arguments 來匹配決定路由到哪些 Queue。

Headers Exchange 的效能比較差,而且 Direct Exchange 完全可以代替它,所以不建議使用。

4.4 Default Exchange

Default Exchange 是一種特殊的 Direct Exchange。當你手動建立一個佇列時,後臺會自動將這個佇列繫結到一個名稱為空的 Direct Exchange 上,繫結 RoutingKey 與佇列名稱相同。有了這個預設的交換機和繫結,使我們只關心佇列這一層即可,這個比較適合做一些簡單的應用。

5. Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ 非常簡單,如果只是簡單的使用配置非常少,Spring Boot 提供了 spring-boot-starter-amqp 專案對訊息各種支援。

5.1 簡單使用

引入依賴

程式碼清單:spring-boot-rabbitmq/pom.xml
***

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置檔案 application.yml 如下:

程式碼清單:spring-boot-rabbitmq/src/main/resources/application.yml
***

server:
  port: 8080
spring:
  application:
    name: spring-boot-rabbitmq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

佇列配置

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/QueueConfig.java
***

@Configuration
public class QueueConfig {
    @Bean
    public Queue simpleQueue() {
        return new Queue("simple");
    }

    @Bean
    public Queue simpleOneToMany() {
        return new Queue("simpleOneToMany");
    }
}

訊息提供者

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleSend.java
***

@Component
public class SimpleSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
  
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void send() {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String message = "Hello Spring Boot " + simpleDateFormat.format(new Date());
        amqpTemplate.convertAndSend("simple", message);
        logger.info("訊息推送成功!");
    }
}

訊息消費者

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/simple/SimpleReceive.java
***

@Component
@RabbitListener(queues = "simple")
public class SimpleReceive {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @RabbitHandler
    public void process(String message) {
        logger.info("Receive :{}", message);
    }

}

測試

程式碼清單:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java
***

@RunWith(SpringRunner.class)
@SpringBootTest
public class DemoApplicationTests {

    @Autowired
    SimpleSend simpleSend;

    @Test
    public void simpleSend() {
        simpleSend.send();
    }

}

5.2 一對多使用

如果有一個訊息的生產者,有 N 個訊息的消費者,會發生什麼呢?

對上面的程式碼稍作改動,增加一個訊息的消費者。

測試程式碼如下:

@Test
public void simpleOneSend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
    }
}

測試可以看到結果是兩個消費者平均的消費了生產者生產的訊息。

5.3 多對多使用

我們再增加一個訊息的生產者,測試程式碼如下:

@Test
public void simpleManySend() {
    for (int i = 0; i < 100; i ++) {
        simpleManySend.send(i);
        simpleManySend1.send(i);
    }
}

測試可以看到結果是兩個消費者平均的消費了兩個生產者生產的訊息。

5.4 Topic Exchange

首先還是先配置 Topic ,配置程式碼如下:

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/TopicConfig.java
***

@Configuration
public class TopicConfig {

    private final String message = "topic.message";
    private final String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(this.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(this.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("topicExchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}

這裡佇列 queueMessages 可以同時匹配兩個 route_key ,而佇列 queueMessage 只能匹配 topic.message 。

訊息的生產者程式碼如下:

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/topic/TopicSend.java
***

@Component
public class TopicSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send1() {
        String message = "message 1";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.message", message);
    }

    public void send2() {
        String message = "message 2";
        logger.info("send:{}", message);
        rabbitTemplate.convertAndSend("topicExchange", "topic.messages", message);
    }
}

呼叫 send1() 訊息會由 Exchange 同時轉發到兩個佇列, 而呼叫 send2() 則只會轉發至 receive2 。

5.5 Fanout Exchange

Fanout 就是我們熟悉的廣播模式或者訂閱模式,給 Fanout 交換機發送訊息,綁定了這個交換機的所有佇列都收到這個訊息。

Fanout 配置如下:

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/config/FanoutConfig.java
***

@Configuration
public class FanoutConfig {
    @Bean
    public Queue MessageA() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue MessageB() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue MessageC() {
        return new Queue("fanout.C");
    }

    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    @Bean
    Binding bindingExchangeA(Queue MessageA, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageA).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue MessageB, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageB).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue MessageC, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(MessageC).to(fanoutExchange);
    }
}

訊息生產者程式碼如下:

程式碼清單:spring-boot-rabbitmq/src/main/java/com/springboot/springbootrabbitmq/fanout/FanoutSend.java
***

@Component
public class FanoutSend {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String message = "Hello FanoutSend.";
        logger.info("send:{}", message);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", message);
    }
}

測試程式碼如下:

程式碼清單:spring-boot-rabbitmq/src/test/java/com/springboot/springbootrabbitmq/DemoApplicationTests.java
***

@Test
public void fanoutSend() {
    fanoutSend.send();
}

測試結果為繫結到 fanout 交換機上面的佇列都收到了訊息。

6. 示例程式碼

示例程式碼-Github

示例程式碼-Gitee

7. 參考

http://www.ityouknow.com/springboot/2016/11/30/spring-boot-rabbitMQ.html

https://blog.csdn.net/y4x5M0nivSrJaY3X92c/article/details/80416