1. 程式人生 > >RabbitMQ 釋出訂閱-實現延時重試佇列(參考)

RabbitMQ 釋出訂閱-實現延時重試佇列(參考)

RabbitMQ訊息處理失敗,我們會讓失敗訊息進入重試佇列等待執行,因為在重試佇列距離真正執行還需要定義的時間間隔,因此,我們可以將重試佇列設定成延時處理。今天參考網上其他人的實現,簡單梳理下訊息延時重試執行的思路。

消費失敗後,自動延時將訊息重新投遞,當達到一定的重試次數後,將訊息投遞到失敗訊息佇列,等待人工介入處理。在這裡我們一步一步實現一個帶有失敗重試功能的釋出訂閱元件,使用該元件後可以非常簡單的實現訊息的釋出訂閱。

業務背景

  1. 結合RabbitMQ的Topic模式和Work Queue模式實現生產方產生訊息,消費方按需訂閱,訊息投遞到消費方的佇列之後,多個worker同時對訊息進行消費
  2. 結合RabbitMQ的 Message TTL 和 Dead Letter Exchange 實現訊息的延時重試功能
  3. 訊息達到最大重試次數之後,將其投遞到失敗佇列,等待人工介入處理bug後,重新將其加入佇列消費

執行流程圖

  1. 生產者釋出訊息到主Exchange
  2. 主Exchange根據Routing Key將訊息分發到對應的訊息佇列
  3. 多個消費者的worker程序同時對佇列中的訊息進行消費,因此它們之間採用“競爭”的方式來爭取訊息的消費
  4. 訊息消費後,不管成功失敗,都要返回ACK消費確認訊息給佇列,避免訊息消費確認機制導致重複投遞,同時,如果訊息處理成功,則結束流程,否則進入重試階段
  5. 如果重試次數小於設定的最大重試次數(預設為3次),則將訊息重新投遞到Retry Exchange的重試佇列
  6. 重試佇列不需要消費者直接訂閱,它會等待訊息的有效時間過期之後,重新將訊息投遞給Dead Letter Exchange,我們在這裡將其設定為主Exchange,實現延時後重新投遞訊息,這樣消費者就可以重新消費訊息
  7. 如果三次以上都是消費失敗,則認為訊息無法被處理,直接將訊息投遞給Failed Exchange的Failed Queue,這時候應用可以觸發報警機制,以通知相關責任人處理
  8. 等待人工介入處理(解決bug)之後,重新將訊息投遞到主Exchange,這樣就可以重新消費了

技術實現:

建立Exchange

為了實現訊息的延時重試和失敗儲存,我們需要建立三個Exchange來處理訊息。

  • master 主Exchange,釋出訊息時釋出到該Exchange
  • master.retry 重試Exchange,訊息處理失敗時(3次以內),將訊息重新投遞給該Exchange
  • master.failed 失敗Exchange,超過三次重試失敗後,訊息投遞到該Exchange

所有的Exchange宣告(declare)必須使用以下引數

引數 說明
exchange - Exchange名稱
type topic Exchange 型別
passive false 如果Exchange已經存在,則返回成功,不存在則建立
durable true 持久化儲存Exchange,這裡僅僅是Exchange本身持久化,訊息和佇列需要單獨指定其持久化
no-wait false 該方法需要應答確認

 

在RabbitMQ的管理介面中,我們可以看到建立的三個Exchange

訊息釋出

訊息釋出時,使用basic_publish方法,引數如下

引數 說明
message - 釋出的訊息物件
exchange master 訊息釋出到的Exchange
routing-key - 路由KEY,用於標識訊息型別
mandatory false 是否強制路由,指定了該選項後,如果沒有訂閱該訊息,則會返回路由不可達錯誤
immediate false 指定了當訊息無法直接路由給消費者時如何處理

釋出訊息時,對於message物件,其內容使用json編碼後的字串,同時訊息進行持久化

訊息訂閱

訊息訂閱的實現相對複雜一些,需要完成佇列的宣告以及佇列和Exchange的繫結

Declare Queue

對於每一個訂閱訊息的服務,都必須建立一個該服務對應的佇列,將該佇列繫結到關注的路由規則,這樣之後,訊息生產者將訊息投遞給Exchange之後,就會按照路由規則將訊息分發到對應的佇列供消費者消費了。

消費服務需要declare三個佇列

  • [queue_name] 佇列名稱,格式符合 [服務名稱]@訂閱服務標識
  • [queue_name]@retry 重試佇列
  • [queue_name]@failed 失敗佇列

Declare佇列時,引數規定規則如下

引數 說明
queue - 佇列名稱
passive false 佇列不存在則建立,存在則直接成功
durable true 佇列持久化
exclusive false 排他,指定該選項為true則佇列只對當前連線有效,連線斷開後自動刪除
no-wait false 該方法需要應答確認
auto-delete false 當不再使用時,是否自動刪除

對於@retry重試佇列,需要指定額外引數

'x-dead-letter-exchange'    => 'master'
'x-dead-letter-routing-key' => [queue_name],
'x-message-ttl'              => 30 * 1000 // 重試時間設定為30s

這裡的兩個header欄位的含義是,在佇列中延遲30s後,將該訊息重新投遞到x-dead-letter-exchange對應的Exchange中,並且routing key指定為消費佇列的名稱,這樣就可以實現訊息只投遞給原始出錯時的佇列,避免訊息重新投遞給所有關注當前routing key的消費者了。

在RabbitMQ的管理介面中,Queues部分可以看到我們建立的三個佇列

檢視佇列的詳細資訊,我們可以看到 [email protected] 佇列與其它兩個佇列的不同

 


佇列和Exchange繫結
建立完佇列之後,需要將佇列與Exchange繫結(bind),不同佇列需要繫結到之前建立的對應的Exchange上面
Queue Exchange
[queue_name] master
[queue_name]@retry master.retry
[queue_name]@failed master.failed

繫結時,需要提供訂閱的路由KEY,該路由KEY與訊息釋出時的路由KEY對應,區別是這裡可以使用萬用字元同時訂閱多種型別的訊息。

引數 說明
queue - 繫結的佇列
exchange - 繫結的Exchange
routing-key - 訂閱的訊息路由規則
no-wait false 該方法需要應答確認


在RabbitMQ的管理介面中,我們可以看到該佇列與Exchange和routing-key的繫結關係

 

 

訊息消費實現

使用 basic_consume 對訊息進行消費的時候,需要注意下面引數

引數 說明
queue - 消費的佇列名稱
consumer-tag - 消費者標識,留空即可
no_local false 如果設定了該欄位,伺服器將不會發布訊息到 釋出它的客戶端
no_ack false 需要消費確認應答
exclusive false 排他訪問,設定後只允許當前消費者訪問該佇列
nowait false 該方法需要應答確認

消費端在消費訊息時,需要從訊息中獲取訊息被消費的次數,以此判斷該訊息處理失敗時重試還是傳送到失敗佇列。

在訊息傳送到重試佇列和失敗佇列時,我們在訊息的headers中添加了一個名為x-orig-routing-key的欄位,該欄位是實現訊息重試的關鍵欄位,由於我們的訊息需要在不同的Exchange,Queue之間流轉,為了避免訊息在重新投遞到主Exchange時,被所有的消費者佇列重新消費,在重試過程中,我們將訊息的routing-key修改為佇列名稱,直接投遞給原始消費訊息的佇列。x-orig-routing-key用於在之後能夠重新獲取到最開始的routing-key。

這裡的重複消費是指 某個訊息被兩個消費方A和B消費了,其中A消費失敗,B成功,這時候,訊息由A消費者重新投遞到主Exchange後,B消費佇列也會獲取到該訊息,因此就會導致B消費者重複消費已經消費國的訊息

 

本文實現延時重試,使用了三個重試Exchange,Exchange如果訂閱特別多的話,Exchange的壓力會非常大,因此在非常極端的情況下,訊息大批量失敗,且訊息收發非常快,那麼Exchange的效能可能會有問題。

本文是使用釋出訂閱實現延時重試的訊息執行,也會有其他思路。