1. 程式人生 > >使用訊息佇列RabbitMQ

使用訊息佇列RabbitMQ

RabbitMQ 即一個訊息佇列,主要是用來實現應用程式的非同步和解耦,同時也能起到訊息緩衝,訊息分發的作用。

RabbitMQ是實現AMQP(高階訊息佇列協議)的訊息中介軟體的一種,AMQP,即Advanced Message Queuing Protocol, 高階訊息佇列協議,是應用層協議的一個開放標準,為面向訊息的中介軟體設計。

在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間, 提高了系統的吞吐量。

本篇將詳細介紹RabbitMQ以及如何在SpringBoot中使用。

簡單概念

關於訊息中介軟體RabbitMQ的詳細介紹可以參考我的部落格系列教程:

RabbitMQ簡易教程

這裡我只對一些最核心的概念拿出來講一下,不理解這些就根本無法理解下面的程式碼。

幾個名詞術語:

  • Broker - 簡單來說就是訊息佇列伺服器的實體。
  • Exchange - 訊息路由器,轉發訊息到繫結的佇列上,指定訊息按什麼規則,路由到哪個佇列。
  • Queue - 訊息佇列,用來儲存訊息,每個訊息都會被投入到一個或多個佇列。
  • Binding - 繫結,它的作用就是把 Exchange 和 Queue 按照路由規則繫結起來。
  • RoutingKey - 路由關鍵字,Exchange 根據這個關鍵字進行訊息投遞。
  • Producter - 訊息生產者,產生訊息的程式。
  • Consumer - 訊息消費者,接收訊息的程式。
  • Channel - 訊息通道,在客戶端的每個連線裡可建立多個Channel,每個channel代表一個會話。

一般訊息佇列都是生產者將訊息傳送到佇列,消費者監聽佇列進行消費。rabbitmq中一個虛擬主機(預設 /)持有一個或者多個交換機(Exchange)。 使用者只能在虛擬主機的粒度進行許可權控制,交換機根據一定的策略(RoutingKey)繫結(Binding)到佇列(Queue)上, 這樣生產者和佇列就沒有直接聯絡,而是將訊息傳送的交換機,交換機再把訊息轉發到對應繫結的佇列上。

上面說了交換機(Exchange)作為rabbitmq的一個獨特的重要的概念,單獨拿出來講一下。我們用到的最常見的是4中型別:

  1. Direct: 先匹配, 再投送。即在繫結時設定一個routing_key, 訊息的routing_key匹配時, 才會被交換器投送到繫結的佇列中去. 交換機跟佇列必須是精確的對應關係,這種最為簡單。
  2. Topic: 轉發訊息主要是根據萬用字元。在這種交換機下,佇列和交換機的繫結會定義一種路由模式,那麼,萬用字元就要在這種路由模式和路由鍵之間匹配後交換機才能轉發訊息 這種可以認為是Direct 的靈活版
  3. Headers: 也是根據規則匹配, 相較於 direct 和 topic 固定地使用 routingkey , headers則是一個自定義匹配規則的型別, 在佇列與交換器繫結時會設定一組鍵值對規則,訊息中也包括一組鍵值對( headers屬性),當這些鍵值對有一對或全部匹配時,訊息被投送到對應佇列
  4. Fanout : 訊息廣播模式,不管路由鍵或者是路由模式,會把訊息發給繫結給它的全部佇列,如果配置了routingkey會被忽略

理解交換機如何投送訊息的原理後,後面的就比較簡單了。

Ack機制

在RabbitMQ的訊息佇列程式設計中,有個非常重要的概率叫訊息確認機制,也就是Ack機制,這裡我需要單獨講一下。

每個Consumer可能需要一段時間才能處理完收到的資料。如果在這個過程中,Consumer出錯了或者異常退出了,而資料還沒有處理完成,那麼非常不幸,這段資料就丟失了。

因為我們採用no-ack的方式進行確認,也就是說,每次Consumer接到資料後,而不管是否處理完成,RabbitMQ Server會立即把這個Message標記為完成,然後從queue中刪除了。 如果一個Consumer異常退出了,它處理的資料能夠被另外的Consumer處理,這樣資料在這種情況下就不會丟失了(注意是這種情況下)。

為了保證資料不被丟失,RabbitMQ支援訊息確認機制,即acknowledgments。為了保證資料能被正確處理而不僅僅是被Consumer收到,那麼我們不能採用no-ack。而應該是在處理完資料後傳送ack。 在處理資料後傳送的ack,就是告訴RabbitMQ資料已經被接收,處理完成,RabbitMQ可以去安全的刪除它了。

如果Consumer退出了但是沒有傳送ack,那麼RabbitMQ就會把這個Message傳送到下一個Consumer。這樣就保證了在Consumer異常退出的情況下資料也不會丟失。

這裡並沒有用到超時機制。RabbitMQ僅僅通過Consumer的連線中斷來確認該Message並沒有被正確處理。也就是說,RabbitMQ給了Consumer足夠長的時間來做資料處理。 這樣即使你通過Ctr-C中斷了Recieve.cs,那麼Message也不會丟失了,它會被分發到下一個Consumer。

如果忘記了ack,那麼後果很嚴重。當Consumer退出時,Message會重新分發。然後RabbitMQ會佔用越來越多的記憶體,由於RabbitMQ會長時間執行,因此這個“記憶體洩漏”是致命的。

下面的例子中我會使用手動Ack模式。

環境準備

RabbitMQ需要安裝erlang和RabbitMQ Server, 安裝教程請參考:RabbitMQ簡易教程 - 安裝

請先按照教程安裝好RabbitMQ後,新增一個使用者spring/123456,後面配置要用到。

SpringBoot中整合

接下來正式開始講解如何在SpringBoot中集成了。

maven依賴

第一版當然是先新增maven依賴了,有官方現成的starter依賴,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    <dependency>
        <groupId>com.fasterxml.jackson.core</groupId>
        <artifactId>jackson-databind</artifactId>
        <version>2.9.6</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <exclusions>
            <exclusion>
                <groupId>com.vaadin.external.google</groupId>
                <artifactId>android-json</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
</dependencies>

配置檔案

在application.yml中新增rabbitmq相關配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
spring:
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: spring
    password: 123456
    publisher-confirms: true #支援釋出確認
    publisher-returns: true  #支援釋出返回
    listener:
      simple:
        acknowledge-mode: manual #採用手動應答
        concurrency: 1 #指定最小的消費者數量
        max-concurrency: 1 #指定最大的消費者數量
        retry:
          enabled: true #是否支援重試

配置類

最核心的類就是RabbitMQ的配置類,在裡面定製模版類、宣告交換機、佇列、繫結交換機到佇列。

這裡我聲明瞭一個Direct型別的交換機,並通過路由鍵繫結到一個佇列中來測試Direct模式, 另外還聲明瞭Fanout型別的交換機,並繫結到2個佇列來測試廣播模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
@Configuration
public class RabbitConfig {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 定製化amqp模版      可根據需要定製多個
     * <p>
     * <p>
     * 此處為模版類定義 Jackson訊息轉換器
     * ConfirmCallback介面用於實現訊息傳送到RabbitMQ交換器後接收ack回撥   即訊息傳送到exchange  ack
     * ReturnCallback介面用於實現訊息傳送到RabbitMQ 交換器,但無相應佇列與交換器繫結時的回撥  即訊息傳送不到任何一個佇列中  ack
     *
     * @return the amqp template
     */
    // @Primary
    @Bean
    public AmqpTemplate amqpTemplate() {
        Logger log = LoggerFactory.getLogger(RabbitTemplate.class);
        // 使用jackson 訊息轉換器
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        rabbitTemplate.setEncoding("UTF-8");
        // 訊息傳送失敗返回到佇列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
            String correlationId = message.getMessageProperties().getCorrelationIdString();
            log.debug("訊息:{} 傳送失敗, 應答碼:{} 原因:{} 交換機: {}  路由鍵: {}", correlationId, replyCode, replyText, exchange, routingKey);
        });
        // 訊息確認,yml需要配置 publisher-confirms: true
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
            if (ack) {
                log.debug("訊息傳送到exchange成功,id: {}", correlationData.getId());
            } else {
                log.debug("訊息傳送到exchange失敗,原因: {}", cause);
            }
        });
        return rabbitTemplate;
    }

    /* ----------------------------------------------------------------------------Direct exchange test--------------------------------------------------------------------------- */

    /**
     * 宣告Direct交換機 支援持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange("DIRECT_EXCHANGE").durable(true).build();
    }

    /**
     * 宣告一個佇列 支援持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").build();
    }

    /**
     * 通過繫結鍵 將指定佇列繫結到一個指定的交換機 .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                                 @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("DIRECT_ROUTING_KEY").noargs();
    }

    /* ----------------------------------------------------------------------------Fanout exchange test--------------------------------------------------------------------------- */

    /**
     * 宣告 fanout 交換機.
     *
     * @return the exchange
     */
    @Bean("fanoutExchange")
    public FanoutExchange fanoutExchange() {
        return (FanoutExchange) ExchangeBuilder.fanoutExchange("FANOUT_EXCHANGE").durable(true).build();
    }

    /**
     * Fanout queue A.
     *
     * @return the queue
     */
    @Bean("fanoutQueueA")
    public Queue fanoutQueueA() {
        return QueueBuilder.durable("FANOUT_QUEUE_A").build();
    }

    /**
     * Fanout queue B .
     *
     * @return the queue
     */
    @Bean("fanoutQueueB")
    public Queue fanoutQueueB() {
        return QueueBuilder.durable("FANOUT_QUEUE_B").build();
    }

    /**
     * 繫結佇列A 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingA(@Qualifier("fanoutQueueA") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    /**
     * 繫結佇列B 到Fanout 交換機.
     *
     * @param queue          the queue
     * @param fanoutExchange the fanout exchange
     * @return the binding
     */
    @Bean
    public Binding bindingB(@Qualifier("fanoutQueueB") Queue queue,
                            @Qualifier("fanoutExchange") FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}

監聽器

接下來編寫訊息佇列的監聽器類,監聽佇列訊息並做相應的處理,並通過Ack機制確認處理完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
/**
 * 訊息監聽器
 *
 * @author XiongNeng
 * @version 1.0
 */
@Component
public class Receiver {
    private static final Logger log = LoggerFactory.getLogger(Receiver.class);

    /**
     * FANOUT廣播佇列監聽一.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_A"})
    public void on(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_A " + new String(message.getBody()));
    }

    /**
     * FANOUT廣播佇列監聽二.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception   這裡異常需要處理
     */
    @RabbitListener(queues = {"FANOUT_QUEUE_B"})
    public void t(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("FANOUT_QUEUE_B " + new String(message.getBody()));
    }

    /**
     * DIRECT模式.
     *
     * @param message the message
     * @param channel the channel
     * @throws IOException the io exception  這裡異常需要處理
     */
    @RabbitListener(queues = {"DIRECT_QUEUE"})
    public void message(Message message, Channel channel) throws IOException {
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        log.debug("DIRECT " + new String(message.getBody()));
    }
}

訊息傳送者

再寫一個訊息傳送服務,用來向交換機發送訊息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
 * 訊息傳送服務
 */
@Service
public class SenderService {
    private Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 測試廣播模式.
     *
     * @param p the p
     * @return the response entity
     */
    public void broadcast(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("FANOUT_EXCHANGE", "", p, correlationData);
    }

    /**
     * 測試Direct模式.
     *
     * @param p the p
     * @return the response entity
     */
    public void direct(String p) {
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("DIRECT_EXCHANGE", "DIRECT_ROUTING_KEY", p, correlationData);
    }

}

執行測試

程式碼寫完後當然要寫個測試用例測一下嘛。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
 * SenderServiceTest
 *
 * @author XiongNeng
 * @version 1.0
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class SenderServiceTest {
    @Autowired
    private SenderService senderService;

    @Test
    public void testCache() {
        // 測試廣播模式
        senderService.broadcast("同學們集合啦!");
        // 測試Direct模式
        senderService.direct("定點訊息");
    }
}

執行結果:

1
2
3
[cTaskExecutor-1] com.xncoding.pos.mq.Receiver             : FANOUT_QUEUE_A "同學們集合啦!"
[cTaskExecutor-1] com.xncoding.pos.mq.Receiver             : DIRECT "定點訊息"
[cTaskExecutor-1] com.xncoding.pos.mq.Receiver             : FANOUT_QUEUE_B "同學們集合啦!"

可以看到,廣播訊息被兩個佇列收到了,Direct訊息只有一個收到。完美!