1. 程式人生 > >Spring Boot 實現 RabbitMQ 延遲消費和延遲重試佇列

Spring Boot 實現 RabbitMQ 延遲消費和延遲重試佇列

並增加了自己的一些理解,記錄下來,以便日後查閱。

專案原始碼:

背景

何為延遲佇列?

顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。
延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景:

  • 延遲消費。比如:使用者生成訂單之後,需要過一段時間校驗訂單的支付狀態,如果訂單仍未支付則需要及時地關閉訂單;使用者註冊成功之後,需要過一段時間比如一週後校驗使用者的使用情況,如果發現使用者活躍度較低,則傳送郵件或者簡訊來提醒使用者使用。
  • 延遲重試。比如消費者從佇列裡消費訊息時失敗了,但是想要延遲一段時間後自動重試。

如果不使用延遲佇列,那麼我們只能通過一個輪詢掃描程式去完成。這種方案既不優雅,也不方便做成統一的服務便於開發人員使用。但是使用延遲佇列的話,我們就可以輕而易舉地完成。

實現思路

在介紹具體的實現思路之前,我們先來介紹一下RabbitMQ的兩個特性,一個是Time-To-Live Extensions,另一個是Dead Letter Exchanges。

Time-To-Live Extensions

RabbitMQ允許我們為訊息或者佇列設定TTL(time to live),也就是過期時間。TTL表明了一條訊息可在佇列中存活的最大時間,單位為毫秒。也就是說,當某條訊息被設定了TTL或者當某條訊息進入了設定了TTL的佇列時,這條訊息會在經過TTL秒後“死亡”,成為Dead Letter。如果既配置了訊息的TTL,又配置了佇列的TTL,那麼較小的那個值會被取用。更多資料請查閱

官方文件

Dead Letter Exchange

剛才提到了,被設定了TTL的訊息在過期後會成為Dead Letter。其實在RabbitMQ中,一共有三種訊息的“死亡”形式:

  • 訊息被拒絕。通過呼叫basic.reject或者basic.nack並且設定的requeue引數為false。
  • 訊息因為設定了TTL而過期。
  • 訊息進入了一條已經達到最大長度的佇列。

如果佇列設定了Dead Letter Exchange(DLX),那麼這些Dead Letter就會被重新publish到Dead Letter Exchange,通過Dead Letter Exchange路由到其他佇列。更多資料請查閱

官方文件

流程圖

聰明的你肯定已經想到了,如何將RabbitMQ的TTL和DLX特性結合在一起,實現一個延遲佇列。

針對於上述的延遲佇列的兩個場景,我們分別有以下兩種流程圖:

延遲消費

延遲消費是延遲佇列最為常用的使用模式。如下圖所示,生產者產生的訊息首先會進入緩衝佇列(圖中紅色佇列)。通過RabbitMQ提供的TTL擴充套件,這些訊息會被設定過期時間,也就是延遲消費的時間。等訊息過期之後,這些訊息會通過配置好的DLX轉發到實際消費佇列(圖中藍色佇列),以此達到延遲消費的效果。

延遲重試

延遲重試本質上也是延遲消費的一種,但是這種模式的結構與普通的延遲消費的流程圖較為不同,所以單獨拎出來介紹。

如下圖所示,消費者發現該訊息處理出現了異常,比如是因為網路波動引起的異常。那麼如果不等待一段時間,直接就重試的話,很可能會導致在這期間內一直無法成功,造成一定的資源浪費。那麼我們可以將其先放在緩衝佇列中(圖中紅色佇列),等訊息經過一段的延遲時間後再次進入實際消費佇列中(圖中藍色佇列),此時由於已經過了“較長”的時間了,異常的一些波動通常已經恢復,這些訊息可以被正常地消費。

程式碼實現

配置佇列

從上述的流程圖中我們可以看到,一個延遲佇列的實現,需要一個緩衝佇列以及一個實際的消費佇列。又由於在RabbitMQ中,我們擁有兩種訊息過期的配置方式,所以在程式碼中,我們一共配置了三條佇列:

  • delay_queue_per_message_ttl:TTL配置在訊息上的緩衝佇列。
  • delay_queue_per_queue_ttl:TTL配置在佇列上的緩衝佇列。
  • delay_process_queue:實際消費佇列。

我們通過Java Config的方式將上述的佇列配置為Bean。由於我們添加了spring-boot-starter-amqp擴充套件,Spring Boot在啟動時會根據我們的配置自動建立這些佇列。為了方便接下來的測試,我們將delay_queue_per_message_ttl以及delay_queue_per_queue_ttl的DLX配置為同一個,且過期的訊息都會通過DLX轉發到delay_process_queue。

delay_queue_per_message_ttl

首先介紹delay_queue_per_message_ttl的配置程式碼:

@Bean
Queue delayQueuePerMessageTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_MESSAGE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX,dead letter傳送到的exchange
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .build();
}

其中,x-dead-letter-exchange聲明瞭佇列裡的死信轉發到的DLX名稱,x-dead-letter-routing-key聲明瞭這些死信在轉發時攜帶的routing-key名稱。

delay_queue_per_queue_ttl

類似地,delay_queue_per_queue_ttl的配置程式碼:

@Bean
Queue delayQueuePerQueueTTL() {
    return QueueBuilder.durable(DELAY_QUEUE_PER_QUEUE_TTL_NAME)
                       .withArgument("x-dead-letter-exchange", DELAY_EXCHANGE_NAME) // DLX
                       .withArgument("x-dead-letter-routing-key", DELAY_PROCESS_QUEUE_NAME) // dead letter攜帶的routing key
                       .withArgument("x-message-ttl", QUEUE_EXPIRATION) // 設定佇列的過期時間
                       .build();
}

delay_queue_per_queue_ttl佇列的配置比delay_queue_per_message_ttl佇列的配置多了一個x-message-ttl,該配置用來設定佇列的過期時間。

delay_process_queue

delay_process_queue的配置最為簡單:

@Bean
Queue delayProcessQueue() {
    return QueueBuilder.durable(DELAY_PROCESS_QUEUE_NAME)
                       .build();
}

配置Exchange

配置DLX

首先,我們需要配置DLX,程式碼如下:

@Bean
DirectExchange delayExchange() {
    return new DirectExchange(DELAY_EXCHANGE_NAME);
}

然後再將該DLX繫結到實際消費佇列即delay_process_queue上。這樣所有的死信都會通過DLX被轉發到delay_process_queue:

@Bean
Binding dlxBinding(Queue delayProcessQueue, DirectExchange delayExchange) {
    return BindingBuilder.bind(delayProcessQueue)
                         .to(delayExchange)
                         .with(DELAY_PROCESS_QUEUE_NAME);
}

配置延遲重試所需的Exchange

從延遲重試的流程圖中我們可以看到,訊息處理失敗之後,我們需要將訊息轉發到緩衝佇列,所以緩衝佇列也需要繫結一個Exchange。在本例中,我們將delay_process_per_queue_ttl作為延遲重試裡的緩衝佇列。

定義消費者

我們建立一個最簡單的消費者ProcessReceiver,這個消費者監聽delay_process_queue佇列,對於接受到的訊息,他會:

  • 如果訊息裡的訊息體不等於FAIL_MESSAGE,那麼他會輸出訊息體。
  • 如果訊息裡的訊息體恰好是FAIL_MESSAGE,那麼他會模擬丟擲異常,然後將該訊息重定向到緩衝佇列(對應延遲重試場景)。

另外,我們還需要新建一個監聽容器用於存放消費者,程式碼如下:

@Bean
SimpleMessageListenerContainer processContainer(ConnectionFactory connectionFactory, ProcessReceiver processReceiver) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(DELAY_PROCESS_QUEUE_NAME); // 監聽delay_process_queue
    container.setMessageListener(new MessageListenerAdapter(processReceiver));
    return container;
}

至此,我們前置的配置程式碼已經全部編寫完成,接下來我們需要編寫測試用例來測試我們的延遲佇列。

編寫測試用例

延遲消費場景

首先我們編寫用於測試TTL設定在訊息上的測試程式碼。

我們藉助spring-rabbit包下提供的RabbitTemplate類來發送訊息。由於我們添加了spring-boot-starter-amqp擴充套件,Spring Boot會在初始化時自動地將RabbitTemplate當成bean載入到容器中。

解決了訊息的傳送問題,那麼又該如何為每個訊息設定TTL呢?這裡我們需要藉助MessagePostProcessor。MessagePostProcessor通常用來設定訊息的Header以及訊息的屬性。我們新建一個ExpirationMessagePostProcessor類來負責設定訊息的TTL屬性:

/**
 * 設定訊息的失效時間
 */
public class ExpirationMessagePostProcessor implements MessagePostProcessor {
    private final Long ttl; // 毫秒

    public ExpirationMessagePostProcessor(Long ttl) {
        this.ttl = ttl;
    }

    @Override
    public Message postProcessMessage(Message message) throws AmqpException {
        message.getMessageProperties()
               .setExpiration(ttl.toString()); // 設定per-message的失效時間
        return message;
    }
}

然後在呼叫RabbitTemplate的convertAndSend方法時,傳入ExpirationMessagePostPorcessor即可。我們向緩衝佇列中傳送3條訊息,過期時間依次為1秒,2秒和3秒。具體的程式碼如下所示:

@Test
public void testDelayQueuePerMessageTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        long expiration = i * 1000;
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_MESSAGE_TTL_NAME,
                (Object) ("Message From delay_queue_per_message_ttl with expiration " + expiration), new ExpirationMessagePostProcessor(expiration));
    }
    ProcessReceiver.latch.await();
}

細心的朋友一定會問,為什麼要在程式碼中加一個CountDownLatch呢?這是因為如果沒有latch阻塞住測試方法的話,測試用例會直接結束,程式退出,我們就看不到訊息被延遲消費的表現了。

那麼類似地,測試TTL設定在佇列上的程式碼如下:

@Test
public void testDelayQueuePerQueueTTL() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(3);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_QUEUE_PER_QUEUE_TTL_NAME,
                "Message From delay_queue_per_queue_ttl with expiration " + QueueConfig.QUEUE_EXPIRATION);
    }
    ProcessReceiver.latch.await();
}

我們向緩衝佇列中傳送3條訊息。理論上這3條訊息會在4秒後同時過期。

延遲重試場景

我們同樣還需測試延遲重試場景。

@Test
public void testFailMessage() throws InterruptedException {
    ProcessReceiver.latch = new CountDownLatch(6);
    for (int i = 1; i <= 3; i++) {
        rabbitTemplate.convertAndSend(QueueConfig.DELAY_PROCESS_QUEUE_NAME, ProcessReceiver.FAIL_MESSAGE);
    }
    ProcessReceiver.latch.await();
}

我們向delay_process_queue傳送3條會觸發FAIL的訊息,理論上這3條訊息會在4秒後自動重試。

我的理解

延遲消費過程(每個訊息可以單獨設定失效時間):

  • 1. 宣告 delay_queue_per_message_ttl 佇列:死信佇列,設定 DLX 引數,包含 x-dead-letter-exchange 表示失效後進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效後的路由鍵(值為 delay_process_queue,即實際消費佇列)。
  • 2. 宣告 delay_process_queue 佇列:實際消費佇列。
  • 3. 宣告 delay_exchange 交換機:實際消費交換機,型別為 Direct(一一對應)。
  • 4. 宣告 dlx_binding 繫結:將實際消費佇列和實際消費交換機繫結(路由鍵規則值為 delay_process_queue)。
  • 5. 釋出一個訊息,路由鍵為 delay_queue_per_message_ttl(傳送到死信佇列),並通過 header 單獨設定每個訊息的過期時間:當過期時間生效後,訊息會轉到實際消費佇列。
  • 6. 宣告一個消費者,監聽 delay_process_queue 佇列(即實際消費佇列):訊息正常被消費掉,達到延遲消費的目的。

延遲消費過程(所有訊息統一設定失效時間):

  • 1. 宣告 delay_queue_per_queue_ttl 佇列:死信佇列,設定 DLX 引數,包含 x-dead-letter-exchange 表示失效後進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效後的路由鍵(值為 delay_process_queue,即實際消費佇列)、x-message-ttl 表示佇列訊息過期時間。
  • 2. 宣告 delay_process_queue 佇列:實際消費佇列。
  • 3. 宣告 delay_exchange 交換機:實際消費交換機,型別為 Direct(一一對應)。
  • 4. 宣告 dlx_binding 繫結:將實際消費佇列和實際消費交換機繫結(路由鍵規則值為 delay_process_queue)。
  • 5. 釋出一個訊息,路由鍵為 delay_queue_per_queue_ttl(傳送到死信佇列):當過期時間生效後,訊息會轉到實際消費佇列。
  • 6. 宣告一個消費者,監聽 delay_process_queue佇列(即實際消費佇列):訊息正常被消費掉,達到延遲消費的目的。

延遲重試過程

  • 1. 宣告 delay_process_queue 佇列:實際消費佇列。
  • 2. 宣告 delay_queue_per_queue_ttl 佇列:死信佇列,設定 DLX 引數,包含 x-dead-letter-exchange 表示失效後進入的 exchange(值為 delay_exchange,即實際消費交換機)、x-dead-letter-routing-key 表示失效後的路由鍵(值為 delay_process_queue,即實際消費佇列)、x-message-ttl 表示佇列訊息過期時間。
  • 3. 宣告 delay_exchange 交換機:實際消費交換機,型別為 Direct(一一對應)。
  • 4. 宣告 per_queue_ttl_exchange 交換機:死信交換機,型別為 Direct(一一對應)。
  • 5. 宣告 dlx_binding 繫結:將實際消費佇列和實際消費交換機繫結(路由鍵規則值為 delay_process_queue)。
  • 6. 宣告 queue_ttl_binding 繫結:將死信佇列和死信交換機繫結(路由鍵規則值為 delay_queue_per_queue_ttl)。
  • 7. 釋出一個訊息,路由鍵為 delay_process_queue(傳送到實際消費佇列)。
  • 8. 宣告一個消費者,監聽 delay_process_queue 佇列(即實際消費佇列):消費者監聽到訊息,當處理過程中發生異常,訊息重新發送到私信佇列,然後等待過期時間生效後,訊息再轉到實際消費佇列,重新消費,以達到延遲重試的目的。

需要注意:在延遲消費的過程中,我們是沒有建立死信交換機的,那為什麼還可以釋出訊息呢?原因是 RabbitMQ 會使用預設的 Exchange,並且建立一個預設的 Binding(型別為 Direct),通過rabbitmqadmin list bindings命令,可以看到結果。

相關推薦

Spring Boot 實現 RabbitMQ 延遲消費延遲佇列

並增加了自己的一些理解,記錄下來,以便日後查閱。 專案原始碼: 背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景: 延遲消費。比如:使用者生成

Apache Camel繼承Spring Boot 實現檔案遠端複製轉移

pom.xml <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.

如何使用Spring BootRabbitMQ結合實現延遲佇列

背景 何為延遲佇列? 顧名思義,延遲佇列就是進入該佇列的訊息會被延遲消費的佇列。而一般的佇列,訊息一旦入隊了之後就會被消費者馬上消費。 延遲佇列能做什麼?延遲佇列多用於需要延遲工作的場景。最常見的是以下兩種場景:延遲消費。比如: 使用者生成訂單之後

RabbitMQ延遲消費重複消費

轉載自 https://blog.csdn.net/quliuwuyiz/article/details/79301054 使用RabbitMQ實現延遲任務 場景一:物聯網系統經常會遇到向終端下發命令,如果命令一段時間沒有應答,就需要設定成超時。 場景二:訂單下單之後30分鐘後,如

spring mvcspring boot實現AOP

spring boot實現AOP 首先建立切面類需要@Aspect,@Component註解 然後建立@Pointcut確定什麼方法實現aop @Pointcut("execution(* com.air_baocl.controller.selectApi.*(..))")

如何在優雅地Spring實現訊息的傳送消費

本文將對rocktmq-spring-boot的設計實現做一個簡單的介紹,讀者可以通過本文了解將RocketMQ Client端整合為spring-boot-starter框架的開發細節,然後通過一個簡單的示例來一步一步的講解如何使用這個spring-boot-starter工具包來配置,傳送和消費Rocke

Spring Cloud Stream + RabbitMQ 訊息生成訊息消費

在本 DEMO中有兩個節點互為訊息的生產者和訊息消費者。 一、節點1 1. pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/P

spring boot +security+oauth認證伺服器資源伺服器(基於註解實現

部分程式碼是搬別人寫好、自己做了調整 一、認證伺服器配置 1.新建maven專案pom.xml <dependencies> <dependency> <groupId>

spring boot 實現熱部署,部署java檔案靜態資源

自己學習了spring boot發現很方便使用,加上熱部署功能,不需要改個樣式就重啟服務,浪費時間了.修改完檔案之後,spring boot 自動給你更新資源,很方便開發人員除錯. 接下來讓我們一步步來實現這個功能. 首先我們需要在gradle 裡面新增依賴 runt

spring boot實現圖片的上傳下載功能

              這篇部落格簡單介紹下spring boot下圖片上傳和下載,已經遇到的問題。首先需要建立一個spring boot專案。 1,核心的controller程式碼package com.qwrt.station.websocket.controller

分散式訊息系統:Kafka(九)應用Spring Boot實現消費者生產者

一、專案 (1)新建Spring Boot專案,參考以下建立過程; 建立一個Spring Boot專案 (2)pom檔案中新增spring-kafka框架 <dependency> <groupId>org.springf

Spring boot 實現jsonjsonp格式資料,介面共用

@ControllerAdvice(basePackages = {"com.eweiche"}) public class JSONPController extends AbstractJsonp

使用 Jenkins X、Kubernetes Spring Boot 實現 CI/CD

本文首發於:Jenkins 中文社群 過去五年中的變化,如遷移到公有云以及從虛擬機器向容器的轉變,已經徹底改變了構建和部署軟體的意

Spring框架學習筆記(7)——Spring Boot 實現上傳下載

最近忙著都沒時間寫部落格了,做了個專案,實現了下載功能,沒用到上傳,寫這篇文章也是順便參考學習瞭如何實現上傳,上傳和下載做一篇筆記吧 下載 主要有下面的兩種方式: 通過ResponseEntity實現 通過寫HttpServletResponse的OutputStream實現 我只測試了ResponseE

十五、Spring Boot 環境變量讀取 屬性對象的綁定

bean database 環境 autoconf lac autowire 屬性 boot title 凡是被spring管理的類,實現接口 EnvironmentAware 重寫方法 setEnvironment 可以在工程啟動時,獲取到系統環境變量和applicati

Spring Boot實現多個數據源教程收集(待實踐)

get shu 多個 href eos net -c smi tar 先收集,後續實踐。 http://blog.csdn.net/catoop/article/details/50575038 http://blog.csdn.net/neosmith/article

Spring Boot 實現電商系統 Web API (二)創建多模塊項目

ble jin play 正常 ota autowired ips 功能 bind 大型項目,需要將代碼按不同功能,分成不同模塊,這樣比較好管理和閱讀代碼,也有助於多人協作。 一、項目結構 1.1 模塊說明 項目分成5個模塊,分別如下: 模塊名稱 說明 webapi

Java Spring Boot 上傳文件預覽文件地址解析

jar包 tof form loader index res for catch div @RequestMapping(value ="/upload",method = RequestMethod.POST) @Permission(isAjax=fa

IntelliJ IDEA Spring boot實現熱部署

chrom 最重要的 配置文件 實現 auto 需要 blog 圖片 tom 一、spring-boot-devtools是一個為開發者服務的一個模塊,其中最重要的功能就是自動部署新代碼。 二、原理   使用了兩個ClassLoader,一個ClassLoader用來加載那

Spring Boot 環境變量讀取 屬性對象的綁定

out mar mis ring host 重寫方法 條件 popu ide 凡是被Spring管理的類,實現接口 EnvironmentAware 重寫方法 setEnvironment 可以在工程啟動時,獲取到系統環境變量和application配置文件