1. 程式人生 > >【RabbitMQ】一文帶你搞定RabbitMQ死信佇列

【RabbitMQ】一文帶你搞定RabbitMQ死信佇列

本文口味:爆炒魷魚   預計閱讀:15分鐘

一、說明

RabbitMQ是流行的開源訊息佇列系統,使用erlang語言開發,由於其社群活躍度高,維護更新較快,效能穩定,深得很多企業的歡心(當然,也包括我現在所在公司【手動滑稽】)。

為了保證訂單業務的訊息資料不丟失,需要使用到RabbitMQ的死信佇列機制,當訊息消費發生異常時,將訊息投入死信佇列中。但由於對死信佇列的概念及配置不熟悉,導致曾一度陷入百度的汪洋大海,無法自拔,很多文章都看起來可行,但是實際上卻並不能幫我解決實際問題。最終,在官網文件中找到了我想要的答案,通過官網文件的學習,才發現對於死信佇列存在一些誤解,導致配置死信佇列之路困難重重。

於是本著記錄和分享的精神,將死信佇列的概念和配置完整的寫下來,以便幫助遇到同樣問題的朋友。

二、本文大綱

以下是本文大綱:

本文閱讀前,需要對RabbitMQ有一個簡單的瞭解,偏向實戰配置講解。

三、死信佇列是什麼

死信,在官網中對應的單詞為“Dead Letter”,可以看出翻譯確實非常的簡單粗暴。那麼死信是個什麼東西呢?

“死信”是RabbitMQ中的一種訊息機制,當你在消費訊息時,如果佇列裡的訊息出現以下情況:

  1. 訊息被否定確認,使用 channel.basicNackchannel.basicReject ,並且此時requeue 屬性被設定為false
  2. 訊息在佇列的存活時間超過設定的TTL時間。
  3. 訊息佇列的訊息數量已經超過最大佇列長度。

那麼該訊息將成為“死信”。

“死信”訊息會被RabbitMQ進行特殊處理,如果配置了死信佇列資訊,那麼該訊息將會被丟進死信佇列中,如果沒有配置,則該訊息將會被丟棄。

四、如何配置死信佇列

這一部分將是本文的關鍵,如何配置死信佇列呢?其實很簡單,大概可以分為以下步驟:

  1. 配置業務佇列,繫結到業務交換機上
  2. 為業務佇列配置死信交換機和路由key
  3. 為死信交換機配置死信佇列

注意,並不是直接宣告一個公共的死信佇列,然後所以死信訊息就自己跑到死信佇列裡去了。而是為每個需要使用死信的業務佇列配置一個死信交換機,這裡同一個專案的死信交換機可以共用一個,然後為每個業務佇列分配一個單獨的路由key。

有了死信交換機和路由key後,接下來,就像配置業務佇列一樣,配置死信佇列,然後繫結在死信交換機上。也就是說,死信佇列並不是什麼特殊的佇列,只不過是繫結在死信交換機上的佇列。死信交換機也不是什麼特殊的交換機,只不過是用來接受死信的交換機,所以可以為任何型別【Direct、Fanout、Topic】。一般來說,會為每個業務佇列分配一個獨有的路由key,並對應的配置一個死信佇列進行監聽,也就是說,一般會為每個重要的業務佇列配置一個死信佇列。

有了前文這些陳述後,接下來就是驚險刺激的實戰環節,這裡省略了RabbitMQ環境的部署和搭建環節。

先建立一個Springboot專案。然後在pom檔案中新增 spring-boot-starter-amqpspring-boot-starter-web 的依賴,接下來建立一個Config類,這裡是關鍵:

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    // 宣告業務Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 宣告死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 宣告業務佇列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裡聲明當前佇列繫結的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裡聲明當前佇列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }

    // 宣告業務佇列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    這裡聲明當前佇列繫結的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  這裡聲明當前佇列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }

    // 宣告死信佇列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 宣告死信佇列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 宣告業務佇列A繫結關係
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 宣告業務佇列B繫結關係
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 宣告死信佇列A繫結關係
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 宣告死信佇列B繫結關係
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

這裡聲明瞭兩個Exchange,一個是業務Exchange,另一個是死信Exchange,業務Exchange下綁定了兩個業務佇列,業務佇列都配置了同一個死信Exchange,並分別配置了路由key,在死信Exchange下綁定了兩個死信佇列,設定的路由key分別為業務佇列裡配置的路由key。

下面是配置檔案application.yml:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

這裡記得將default-requeue-rejected屬性設定為false。

接下來,是業務佇列的消費程式碼:

@Slf4j
@Component
public class BusinessMessageReceiver {

    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到業務訊息A:{}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            log.error("訊息消費發生異常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到業務訊息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

然後配置死信佇列的消費者:

@Component
public class DeadLetterMessageReceiver {


    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信訊息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信訊息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

最後,為了方便測試,寫一個簡單的訊息生產者,並通過controller層來生產訊息。

@Component
public class BusinessMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

一切準備就緒,啟動!

可以從RabbitMQ的管理後臺中看到一共有四個佇列,除預設的Exchange外還有宣告的兩個Exchange。

接下來,訪問一下url,來測試一下:

http://localhost:8080/rabbitmq/sendmsg?msg=msg

日誌:

收到業務訊息A:msg
收到業務訊息B:msg

表示兩個Consumer都正常收到了訊息。這代表正常消費的訊息,ack後正常返回。然後我們再來測試nck的訊息。

http://localhost:8080/rabbitmq/sendmsg?msg=deadletter

這將會觸發業務佇列A的NCK,按照預期,訊息被NCK後,會拋到死信佇列中,因此死信佇列將會出現這個訊息,日誌如下:

收到業務訊息A:deadletter
訊息消費發生異常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
...

收到死信訊息A:deadletter

可以看到,死信佇列的Consumer接受到了這個訊息,所以流程到此為止就打通了。

五、死信訊息的變化

那麼“死信”被丟到死信佇列中後,會發生什麼變化呢?

如果佇列配置了引數 x-dead-letter-routing-key 的話,“死信”的路由key將會被替換成該引數對應的值。如果沒有設定,則保留該訊息原有的路由key。

舉個栗子:

如果原有訊息的路由key是testA,被髮送到業務Exchage中,然後被投遞到業務佇列QueueA中,如果該佇列沒有配置引數x-dead-letter-routing-key,則該訊息成為死信後,將保留原有的路由keytestA,如果配置了該引數,並且值設定為testB,那麼該訊息成為死信後,路由key將會被替換為testB,然後被拋到死信交換機中。

另外,由於被拋到了死信交換機,所以訊息的Exchange Name也會被替換為死信交換機的名稱。

訊息的Header中,也會新增很多奇奇怪怪的欄位,修改一下上面的程式碼,在死信佇列的消費者中新增一行日誌輸出:

log.info("死信訊息properties:{}", message.getMessageProperties());

然後重新執行一次,即可得到死信訊息Header中被新增的資訊:

死信訊息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起來有很多資訊,實際上並不多,只是值比較長而已。下面就簡單說明一下Header中的值:

欄位名 含義
x-first-death-exchange 第一次被拋入的死信交換機的名稱
x-first-death-reason 第一次成為死信的原因,rejected:訊息在重新進入佇列時被佇列拒絕,由於default-requeue-rejected 引數被設定為falseexpired :訊息過期。maxlen : 佇列內訊息數量超過佇列最大容量
x-first-death-queue 第一次成為死信前所在佇列名稱
x-death 歷次被投入死信交換機的資訊列表,同一個訊息每次進入一個死信交換機,這個陣列的資訊就會被更新

六、死信佇列應用場景

通過上面的資訊,我們已經知道如何使用死信隊列了,那麼死信佇列一般在什麼場景下使用呢?

一般用在較為重要的業務佇列中,確保未被正確消費的訊息不被丟棄,一般發生消費異常可能原因主要有由於訊息資訊本身存在錯誤導致處理異常,處理過程中引數校驗異常,或者因網路波動導致的查詢異常等等,當發生異常時,當然不能每次通過日誌來獲取原訊息,然後讓運維幫忙重新投遞訊息(沒錯,以前就是這麼幹的= =)。通過配置死信佇列,可以讓未正確處理的訊息暫存到另一個佇列中,待後續排查清楚問題後,編寫相應的處理程式碼來處理死信訊息,這樣比手工恢復資料要好太多了。

七、總結

死信佇列其實並沒有什麼神祕的地方,不過是繫結在死信交換機上的普通佇列,而死信交換機也只是一個普通的交換機,不過是用來專門處理死信的交換機。

總結一下死信訊息的生命週期:

  1. 業務訊息被投入業務佇列
  2. 消費者消費業務佇列的訊息,由於處理過程中發生異常,於是進行了nck或者reject操作
  3. 被nck或reject的訊息由RabbitMQ投遞到死信交換機中
  4. 死信交換機將訊息投入相應的死信佇列
  5. 死信佇列的消費者消費死信訊息

死信訊息是RabbitMQ為我們做的一層保證,其實我們也可以不使用死信佇列,而是在訊息消費異常時,將訊息主動投遞到另一個交換機中,當你明白了這些之後,這些Exchange和Queue想怎樣配合就能怎麼配合。比如從死信佇列拉取訊息,然後傳送郵件、簡訊、釘釘通知來通知開發人員關注。或者將訊息重新投遞到一個佇列然後設定過期時間,來進行延時消費。

本篇文章中的demo專案已上傳至github,有需要的朋友可以自行下載查閱​。https://github.com/MFrank2016/dead-letter-demo​

如果本文對你有幫助,記得點個贊,也希望能分享給更多的朋友。也歡迎關注我的公眾號進行留言交流。

相關推薦

RabbitMQRabbitMQ死信佇列

本文口味:爆炒魷魚   預計閱讀:15分鐘 一、說明 RabbitMQ是流行的開源訊息佇列系統,使用erlang語言開發,由於其社群活躍度高,維護更新較快,效能穩定,深得很多企業的歡心(當然,也包括我現在所在公司【手動滑稽】)。 為了保證訂單業務的訊息資料不丟失,需要使用到RabbitMQ的死信佇列機制,當訊

RabbitMQRabbitMQ延遲佇列

本文口味:魚香肉絲   預計閱讀:10分鐘 一、說明 在上一篇中,介紹了RabbitMQ中的死信佇列是什麼,何時使用以及如何使用RabbitMQ的死信佇列。相信通過上一篇的學習,對於死信佇列已經有了更多的瞭解,這一篇的內容也跟死信佇列息息相關,如果你還不瞭解死信佇列,那麼建議你先進行上一篇文章的閱讀。 這一篇

專案實踐頁面許可權、按鈕許可權以及資料許可權

![許可權授權.png](https://img2020.cnblogs.com/blog/1496775/202101/1496775-20210108142933342-1432167452.jpg) > 以專案驅動學習,以實踐檢驗真知 # 前言 許可權這一概念可以說是隨處可見:等級不夠進入不了某個

專案實踐Spring Security + JWT

![首圖.png](https://img2020.cnblogs.com/news/1496775/202101/1496775-20210112085019858-1640273167.jpg) > 以專案驅動學習,以實踐檢驗真知 # 前言 關於認證和授權,R之前已經寫了兩篇文章: [

最小生成樹演算法圖解--理解什麼是Prim演算法和Kruskal演算法

假設以下情景,有一塊木板,板上釘上了一些釘子,這些釘子可以由一些細繩連線起來。假設每個釘子可以通過一根或者多根細繩連線起來,那麼一定存在這樣的情況,即用最少的細繩把所有釘子連線起來。 更為實際的情景是這樣的情況,在某地分佈著N個村莊,現在需要在N個村莊之間修路,每個村莊之前的距離不同,問怎麼修最短的路,將各個

工業大資料讀懂《工業大資料白皮書》

來源:工信部、工業網際網路城市物聯網智庫 整理髮布轉載請註明來源和出處------   【導讀】

併發程式設計讀懂深入理解Java記憶體模型(面試必備)

併發程式設計這一塊內容,是高階資深工程師必備知識點,25K起如果不懂併發程式設計,那基本到頂。但是併發程式設計內容龐雜,如何系統學

懂什麼是測試開發!

01  開始前說點什麼 需要說明的是,原文發表於作者的公眾號中,文章篇幅雖長,但內容樸實、且能幫助讀者進一步理解測試開發工作,請讀者耐心品完~    1. 自我反省       公眾號開通了也有兩年多了,除了剛開通的那段時間發文比

剛體驗完RabbitMQSpringBoot+RabbitMQ方式收發訊息

>人生終將是場單人旅途,孤獨之前是迷茫,孤獨過後是成長。 ## 楔子 這篇是訊息佇列RabbitMQ的第二彈。 [上一篇](https://juejin.im/post/6856571028496351239)的結尾我也預告了本篇的內容:利用RabbitTemplate和註解進行收發訊息,還有一

從定義到AST及其遍歷方式,懂Antlr4

摘要:本文將首先介紹Antlr4 grammer的定義方式,如何通過Antlr4 grammer生成對應的AST,以及Antlr4 的兩種AST遍歷方式:Visitor方式和Listener方式。 1. Antlr4簡單介紹 Antlr4(Another Tool for Language Recogniti

面試都在問的「微服務」「RPC」「服務治理」「下一代微服務」徹底懂!

❝ 文章每週持續更新,各位的「三連」是對我最大的肯定。可以微信搜尋公眾號「 後端技術學堂 」第一時間閱讀(一般比部落格早更新一到兩篇) ❞ 單體式應用程式 與微服務相對的另一個概念是傳統的「單體式應用程式」( Monolithic application ),單體式應用內部包含了所有需要的服務。而且各個服務功

快速懂動態字串SDS,面試不再懵逼

目錄 redis原始碼分析系列文章 前言 API使用 embstr和raw的區別 SDSHdr的定義 SDS具體邏輯圖 SDS的優勢 更快速的獲取字串長度 資料安全,不會截斷 SDS關鍵程式碼分析 獲取常見值(抽象出常見方法) 建立物件 刪除 新增字元(擴容)重點!!! 總結 參考資料 redis原始碼分析

沒用過訊息佇列體驗RabbitMQ收發訊息

>人生終將是場單人旅途,孤獨之前是迷茫,孤獨過後是成長。 ## 楔子 先給大家說聲抱歉,最近一週都沒有發文,有一些比較要緊重要的事需要處理。 今天正好得空,本來說準備寫`SpringIOC`相關的東西,但是發現想要梳理一遍還是需要很多時間,所以我打算慢慢寫,先把MQ給寫了,再慢慢寫其他相關的,畢竟

乾貨!!十分鐘懂 Java AQS 核心設計與實現!!!

前言 這篇文章寫完放著也蠻久的了,今天終於釋出了,對於拖延症患者來說也真是不容易~哈哈哈。 言歸正傳,其實吧。。我覺得對於大部分想了解 AQS 的朋友來說,明白 AQS 是個啥玩意兒以及為啥需要 AQS,其實是最重要的。就像我一開始去看 AQS 的時候,抱著程式碼就啃,看不懂就去網上搜。。但是網上文章千篇一律

專案實踐手把手SSM

> 以專案驅動學習,以實踐檢驗真知 # 前言 現在使用Java後端開發使用的技術棧基本上比較統一:Spring + SpringMVC + Mybatis,即大家常說的SSM。雖然現在流行的做法是使用SpringBoot來快速搭建、配置好SSM專案,但還是有必要知道如何不用SpringBoot來組合好這三

深度解析騰訊雲直播答題方案

exc com erp 同學 col 測試 的確 影響 cep 歡迎大家前往雲+社區,獲取更多騰訊海量技術實踐幹貨哦~ 作者:騰訊視頻雲 進入2018年最火的新鮮事物無疑就是“直播答題”了,動輒上百萬的獎金更是吸引了大量用戶的參與。一場直播動輒幾百萬的獎金,每人可以分到

了解激光雷達重要指標及參數

因此 一個 https 速度 .com p s 展示 jpg left 博客轉載自:https://www.leiphone.com/news/201801/oySuWNzftbNrWwpv.html 雷鋒網(公眾號:雷鋒網)按:本文作者SLAMTEC(思嵐科技公號slam

吃透執行緒池

微信公眾號:[Amos部落格] 內容目錄 TreadPoolexecutor原始碼解析 類關係圖 Executor介面 ExecutorService介面 AbstractExecutorService 成員變數

快速瞭解最火的數字經濟(大資料、人工智慧等都有)

人工智慧行業應用加速(暴富機會由“網際網路+”轉向AI+) “網際網路+”紅利已開發將盡,未來,新的暴富紅利將由“人工智慧”接棒。從產業演進看,科技巨頭正加速全球化併購,打造AI生態閉環,開源化也將成為全球性趨勢。開源化使得人工智慧的行業運用門檻急遽降低,未來幾年將迎來人工智慧行業應用浪潮。 2

某高校計算機程式設計教授教如何快速入門python,進入程式設計

如何快速入門Python 學習任何一門語言都是從入門(1年左右),通過不間斷練習達到熟練水準(3到5年),少數人最終能精通語言,成為執牛耳者,他們是金字塔的最頂層。雖然萬事開頭難,但好的開始是成功的一半,今天這篇文章就來談談如何開始入門 Python。只要方向對了,就不怕路遠。 設定目標