1. 程式人生 > >【攻克RabbitMQ】常見問題

【攻克RabbitMQ】常見問題

訊息什麼情況下會丟失?配合mandatory引數或備份交換器來提高程式的健壯性

  1. 傳送訊息的交換器並沒有繫結任何佇列,訊息將會丟失
  2. 交換器綁定了某個佇列,但是傳送訊息時的路由鍵無法與現存的佇列匹配

預估佇列的使用情況?

在後期執行過程中超過預定的閾值,可以根據實際情況對當前叢集進行擴容或者將相應的佇列遷移到其他叢集。

消費訊息?

推模式,拉模式

保證訊息的可靠性?

RabbitMQ 提供了訊息確認機制( message acknowledgement)。 消費者在訂閱佇列時,可以指定 autoAck 引數,當 autoAck 等於 false 時, RabbitMQ 會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上
是先打上刪除標記,之後再刪除)。當 autoAck 等於 true 時, RabbitMQ 會自動把傳送出去的 訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些訊息。

在ack為false的情況下,消費者獲取訊息遲遲沒有傳送消費者確認訊息的訊號或者消費者斷開,怎麼辦?

當 autoAck 引數置為 false,對於 RabbitMQ 服務端而言,佇列中的訊息分成了兩個部分: 一部分是等待投遞給消費者的訊息:一部分是己經投遞給消費者,但是還沒有收到消費者確認訊號的訊息。 如果 RabbitMQ 一直沒有收到消費者的確認訊號,並且消費此訊息的消費者己經 斷開連線,則 RabbitMQ 會安排該訊息重新進入佇列,等待投遞給下一個消費者,當然也有可 能還是原來的那個消費者。RabbitMQ 不會為未確認的訊息設定過期時間,它判斷此訊息是否需要重新投遞給消費者的唯一依據是消費該訊息的消費者連線是否己經斷開,這麼設計的原因是 RabbitMQ 允許消費者 消費一條訊息的時間可以很久很久。

在消費者接收到訊息後,如果想明確拒絕當前的訊息而不是確認,那麼應該怎麼做呢?

RabbitMQ 在 2.0.0 版本開始引入了 Basic .Reject 這個命令,消費者客戶端可以呼叫與其對 應的 channel.basicReject 方法來告訴 RabbitMQ 拒絕這個訊息。

//Channel 類中的 basicReject 方法定義如下:
//其中 deliveryTag 可以看作訊息的編號 ,它是一個 64 位的長整型值,最大值是 9223372036854775807。如果 //requeue 引數設定為 true,則 RabbitMQ 會重新將這條訊息存入 佇列,以便可以傳送給下一個訂閱的消費者;如果 //requeue 引數設定為 false,則 RabbitMQ 立即會把訊息從佇列中移除,而不會把它傳送給新的消費者。
void basicReject(long deliveryTag, boolean requeue) throws IOException

注意:

Basic.Reject 命令一次只能拒絕一條訊息 ,如果想要批量拒絕訊息 ,則可以使用 Basic.Nack 這個命令

//消費者客戶端可以呼叫 channel.basicNack 方法來實現,方法定 義如下:
//其中 deliveryTag 和 requeue 的含義可以參考 basicReject 方法。 multiple 引數
//設定為 false 則表示拒絕編號為 deliveryT坷的這一條訊息,這時候 basicNack 和 basicReject 方法一樣; //multiple 引數設定為 true 則表示拒絕 deliveryTag 編號之前所 有未被當前消費者確認的訊息。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException

注意:

將 channel.basicReject 或者 channel.basicNack 中的 requeue 設直為 false,可以啟用”死信佇列”的功能。死信佇列可以通過檢測被拒絕或者未送達的訊息來追蹤問題

請求RabbitMQ重新發送還未被確認的訊息?

 //Basic.Recover 具備可重入佇列的特性
 Basic.RecoverOk basicRecover() throws IOException; 
 Basic.RecoverOk basicRecover(boolean requeue) throws IOException; 

channel.basicRecover 方法用來請求 RabbitMQ 重新發送還未被確認的訊息。 如果 requeue 引數設定為 true,則未被確認的訊息會被重新加入到佇列中,這樣對於同一條訊息 來說,可能會被分配給與之前不同的消費者。如果 requeue 引數設定為 false,那麼同一條消 息會被分配給與之前相同的消費者。預設情況下,如果不設定 requeue 這個引數,相當於
channel.basicRecover(true) ,即 requeue 預設為 true

交換器無法根據自身的型別和路由鍵找到一個符合條件 的佇列

當 mandatory 引數設為 true 時,交換器無法根據自身的型別和路由鍵找到一個符合條件 的佇列,那麼 RabbitMQ 會呼叫 Basic.Return 命令將訊息返回給生產者。當 mandatory 參 數設定為 false 時,出現上述情形,則訊息直接被丟棄

生產者如何獲取到沒有被正確路由到合適佇列的訊息呢?

可以通過呼叫channel.addReturnListener來新增ReturnListener監聽器實現。RabbitMQ 會通過 Basic . Return 返回 “mandatory test” 這條訊息,之後生產者客戶端通過 ReturnListener 監昕到了這個事 件,上面程式碼的最後輸出應該是”Basic.Retum 返回的結果是: mandatory test”

mandatory和immediate引數的區別

mandatory 引數告訴伺服器至少將該訊息路由到一個佇列中, 否則將訊息返 回給生產者。 immediate 引數告訴伺服器, 如果該訊息關聯的佇列上有消費者, 則立刻投遞: 如果所有匹配的佇列上都沒有消費者,則直接將訊息返還給生產者, 不用將訊息存入佇列而等 待消費者了。

未被路由到的訊息應該怎麼處理?

  1. 傳送訊息的時候設定mandatory引數,新增ReturnListener監聽器接收未被路由到的返回訊息
  2. 採用備份交換器AE,可以將未被路由的訊息儲存在RabbitMQ中,通過宣告交換器的時候新增AE引數實現,或者通過策略的方式實現,同時使用,前者優先順序高,會覆蓋掉Policy的設定

備份交換器需要注意?

  1. 如果設定的備份交換器不存在,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失
  2. 如果備份交換器沒有繫結任何佇列,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失
  3. 如果備份交換器沒有任何匹配的佇列,客戶端和RabbitMQ服務端都不會有異常出現,此時訊息會丟失
  4. 如果備份交換器和mandatory引數一起使用,那麼mandatory引數無效

怎麼為訊息設定過期時間TTL?

  1. 通過佇列屬性設定,佇列中所有訊息都有相同的過期時間,宣告佇列的時候在channel.queueDeclare加入TTL引數
  2. 對訊息本身進行單獨設定,每條訊息的TTL可以不同,在channel.basicPublish方法引數中設定
  3. 同時使用以上兩種方式設定過期時間,以較小的為準
  4. 訊息在佇列中的生存時間一旦超過設定的TTL值,就變成死信,消費者無法再收到該訊息(不是絕對的)
  5. 如果不設定 TTL.則表示此訊息不會過期;如果將 TTL 設定為 0,則表示除非此時可以直接將訊息投遞到消費者,否則該訊息會被立即丟棄,這個特性可以部分替代 RabbitMQ 3.0 版本之前的 immediate 引數

對過期訊息處理?

  1. 設定佇列 TTL 屬性的方法,一旦訊息過期,就會從佇列中抹去,佇列中己過期的訊息肯定在隊 列頭部, RabbitMQ 只要定期從隊頭開始掃描是否有過期的訊息即可,
  2. 訊息本身進行單獨設定,即使訊息過期,也不會馬上從佇列中抹去,因為每條訊息是否過期是在即將投遞到消費者之前判定的。每條訊息的過期時間不同,如果要刪除所有過期訊息勢必要掃描整個佇列,所以不如等到此訊息即將 被消費時再判定是否過期, 如果過期再進行刪除即可。

怎麼設定佇列的過期時間?

  1. 通過 channel . queueDeclare 方法中的 x-expires 引數可以控制佇列被自動刪除前處 於未使用狀態的時間。未使用的意思是佇列上沒有任何的消費者,佇列也沒有被重新宣告,並且在過期時間段內也未呼叫過 Basic . Get 命令。
  2. RabbitMQ 會確保在過期時間到達後將佇列刪除,但是不保障刪除的動作有多及時 。在 RabbitMQ 重啟後,持久化的佇列的過期時間會被重新計算。

什麼是死信佇列?

  1. DLX,全稱為 Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱之為死信郵箱。當訊息在一個佇列中變成死信 (dead message) 之後,它能被重新被髮送到另一個交換器中,這個交換器就是 DLX,繫結 DLX 的佇列就稱之為死信佇列。
  2. DLX 也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的佇列上被指定, 實 際上就是設定某個佇列的屬性。當這個佇列中存在死信時 , RabbitMQ 就會自動地將這個訊息重新發布到設定的 DLX 上去,進而被路由到另一個佇列,即死信佇列。

什麼是延遲佇列?

​ 延遲佇列儲存的物件是對應的延遲訊息,所謂“延遲訊息”是指當訊息被髮送後,並不想讓消費者立刻拿到訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費

延遲佇列應用場景?

  1. 訂單系統,用延遲佇列處理超時訂單
  2. 使用者希望通過手機遠端遙控家裡的智慧裝置在指定的時間進行工作。這時候就可以將 使用者指令傳送到延遲佇列,當指令設定的時間到了再將指令推送到智慧裝置。

持久化?

  1. 交換器的持久化

    交換器的持久化是通過在宣告交換器時將 durable 引數置為 true 實現的,如果交換器不設定持久化,那麼在 RabbitMQ 服務重啟之後,相關的交換器元資料會丟失, 不過訊息不會丟失,只是不能將訊息傳送到這個交換器中了。對一個長期使用的交換器來說,建議將其置為持久化的。

  2. 佇列的持久化

    佇列的持久化是通過在宣告佇列時將 durable 引數置為 true 實現的,如果佇列不設定持久化,那麼在 RabbitMQ 服務重啟之後,相關佇列的元資料會丟失,此時資料也會丟失。

  3. 訊息的持久化

    通過將訊息的投遞模式 (BasicProperties 中的 deliveryMode 屬性)設定為 2 即可實現訊息的持久化。

在這段時間內 RabbitMQ 服務節點發生了巖機、重啟等異常情況,訊息儲存還沒來得及落盤,那麼這些訊息將RabbitMQ 實戰指南會丟失。這個問題怎麼解決呢?

​ 可以引入 RabbitMQ 的映象佇列機制,相當於配置了副本,如果主節點 Cmaster) 在此特殊時間內掛掉,可以自動切換到從節點 Cslave ), 這樣有效地保證了高可用性

當訊息的生產者將訊息傳送出去之後,訊息到底有沒有正確地到達伺服器呢?

  1. 通過事務機制實現,比較消耗效能

    這裡寫圖片描述

    • 客戶端傳送 Tx.Select. 將通道置為事務模式;
    • Broker 回覆 Tx. Select-Ok. 確認己將通道置為事務模式:
    • 在傳送完訊息之後,客戶端傳送 Tx.Commit 提交事務;
    • Broker 回覆 Tx. Commi t-Ok. 確認事務提交。
  2. 通過傳送方確認機制實現

消費端對訊息的處理?

  1. 過推模式或者拉模式的方 式來獲取井消費訊息,當消費者處理完業務邏輯需要手動確認訊息己被接收,這RabbitMQ才能把當前訊息從佇列中標記清除
  2. 如果消費者由於某些原因無法處理當前接收到的訊息, 可以通過 channel . basicNack 或者 channel . basicReject 來拒絕掉。

消費端存在的問題?

  1. 訊息分發

    同一個佇列擁有多個消費者,會採用輪詢的方式分發訊息給消費者,若其中有的消費者任務重,有的消費者很快處理完訊息,導致程序空閒,這樣對導致整體應用吞吐量下降,為了解決上面的問題,用到channel.basicQos 方法允許限制通道上的消費者所能保持的最大未確認訊息的數量。Basic.Qos 的使用對於拉模式的消費方式無效.

    舉例如下:

    在訂閱消費佇列之前,消費端程式呼叫了 channel.basicQos(5) ,之後訂 閱了某個佇列進行消費。 RabbitMQ 會儲存一個消費者的列表,每傳送一條訊息都會為對應的消費者計數,如果達到了所設定的上限,那麼 RabbitMQ 就不會向這個消費者再發送任何訊息。 直到消費者確認了某條訊息之後 , RabbitMQ將相應的計數減1,之後消費者可以繼續接收訊息, 直到再次到達計數上限。這種機制可以類比於 TCP!IP中的”滑動視窗”。

  2. 訊息順序性

    • 生產者使用了事務機制可能會破壞訊息順序性
    • 生產者傳送訊息設定了不同的超時時間,並且設定了死信佇列
    • 訊息設定了優先順序

    可以考慮在訊息體內新增全域性有序標識來實現

  3. 棄用QueueingConsumer,Spring提供的RabbitMQ採用的是DefaultConsume

    • 記憶體溢位,由於某些原因,佇列之中堆積了比較多的訊息,可能導致消費者客戶端記憶體溢位假死,發生惡性迴圈,使用 Basic . Qos 來解決,一定要在呼叫 Basic . Consume 之前呼叫 Basic.Qos
      才能生效。
    • 會拖累同一個connection下的所有通道,使其效能降低
    • 同步遞迴呼叫QueueingConsumer會產生死鎖
    • RabbitMQ的自動連線恢復機制不支援QueueingConsumer這種形式
    • QueueingConsumer不是事件驅動的

訊息傳輸保障?

  • 一般訊息中介軟體的訊息傳輸保障分為三個等級
    1. At most once: 最多一次。訊息可能會丟失,但絕不會重複傳輸。
    2. At least once: 最少一次。訊息絕不會丟失,但可能會重複傳輸。
    3. Exactly once: 恰好一次。每條訊息肯定會被傳輸一次且僅傳輸一次。
  • RabbitMQ支援其中的“最多一次”和“最少一次”。
    • 其中”最少一次”投遞實現需要考慮 以下這個幾個方面的內容:
      1. 訊息生產者需要開啟事務機制或者 publisher confirm 機制,以確保訊息可以可靠地傳 輸到 RabbitMQ 中。
      2. 訊息生產者需要配合使用 mandatory 引數或者備份交換器來確保訊息能夠從交換器 路由到佇列中,進而能夠儲存下來而不會被丟棄。
      3. 訊息和佇列都需要進行持久化處理,以確保 RabbitMQ 伺服器在遇到異常情況時不會造成訊息丟失。
      4. 消費者在消費訊息的同時需要將 autoAck 設定為 false,然後通過手動確認的方式去 確認己經正確消費的訊息,以避免在消費端引起不必要的訊息丟失。
    • “最多一次”的方式就無須考慮以上那些方面,生產者隨意傳送,消費者隨意消費,不過這 樣很難確保訊息不會丟失。

提高資料可靠性途徑?

  1. 設定 mandatory 引數或者備份交換器 (immediate 引數己被陶汰);
  2. 設定 publisher conflITll機制或者事務;
  3. 設定交換器、佇列和訊息都為持久化;
  4. 設定消費端對應的 autoAck 引數為 false 井在消費完訊息之後再進行訊息確認