1. 程式人生 > >RabbitMqx訊息中心_訊息中心一致性

RabbitMqx訊息中心_訊息中心一致性

訊息中心一致性解決方案

1、介紹

訊息傳送一致性是指產生訊息的業務動作和訊息的傳送一致,兩者要不同時成功或失敗。在確定使用rabbitmq作為訊息中心的實現框架後,訊息傳送的一致性應結合實際的框架實現。

rabbitmq官方推薦不使用事務實現訊息傳送的一致性,而是採用非同步的confirm結合mandatory和immediateRabbitMQ 3.0採用TTL和DLX替代immediate實現)標誌位進行實現。

2解決方案

訊息傳送者的一致性包括broker持久化訊息和publisher知道訊息已經成功持久化。主要有同步和非同步兩種方式實現,如下:

21採用事務方式

每個訊息都必須經歷以上兩個步驟,就算一次事務成功,如果業務型別要求資料一致性非常高,可以採用低效率的事務型解決方案,官方給出來的效能是:

It takes a bit more than 4 minutes to publish 10000 messages(釋出10000條訊息需要4分鐘多一點)。

採用事物的方式固然能解決傳送的一致性問題,但是訊息傳送變成了業務應用的必要依賴,如果訊息儲存或應用於訊息的網路異常都會導致業務操作無法正常完成,所有可將業務系統與訊息傳送進行解耦合。實現的大致設計架構流程如下:

如果採用標準的 AMQP 協議,則唯一能夠保證訊息不會丟失的方式是利用事務機制 -- 令channel處於transactional模式、向其 publish 訊息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多餘開銷,並會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。一般不使用事務提交的方式。

對於要求強一致性的應用結合選型實現框架Rabbitmq,應用的接入需要channel-transacted="true",在訊息傳送端實現訊息傳送和業務放在同一事務中。結合spring如下:

22非同步的方式監聽實現

預設情況RabbitMQ流轉如下publisher->server,server不會將publisher的請求的執行情況,返回給publisher。換句話說,預設,publisher只知道執行了生產訊息的動作,不知道server是否已成功儲存msg,更不知道msg是否已被consumer消費。

非同步的方式是指publisher傳送訊息後,不進行等待,而是非同步監聽是否成功。這種方式又分為兩種模式,一種是return,另一種是confirm. 前一種是publisher傳送到exchange後,非同步收到訊息。第二種是publisher傳送訊息到exchange,queue,consumer收到訊息後才會收到非同步收到訊息。可見,第二種方式更加安全可靠(如果一旦出現

broker掛機或者網路不穩定,broker已經成功接收訊息,但是publisher並沒有收到confirm或return,就會引數重複發生訊息的問題,後面會對重複訊息進行詳細講解)。官方給出的效能是:It takes a bit more than 2 seconds to publish 10000 messages(釋出10000條訊息需要2秒多一點)。結構圖大致如下:


具體的非同步返回的幾個標誌位置介紹如下:

1)confirm

如果使用confirm模式,publisher->server,server只會告知publisher,是否接收到了請求。publisher只知道server接收到了msg,但不知道msg是否成功儲存到queue。

可見僅僅使用confirm並不能完全保證訊息傳送的一致性,需要引入更遠的標誌位。Mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標誌位,它們都有當訊息傳遞過程中不可達目的地時將訊息返回給生產者的功能。

2)mandatory
如果使用了mandatory標誌位,publisher->server,server會告知publisher,是否正確找到對應的queue,並把msg儲存到了queue中。mandatory標誌位設定為true時,如果exchange根據自身型別和訊息routeKey無法找到一個符合條件的queue,那麼會呼叫basic.return方法將訊息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將訊息扔掉。

mandatory屬性的使用和Publisher Confirm機制沒有必然關係,只有將mandatory屬性和Publisher Confirm機制結合使用,才能真正實現訊息的可靠投遞。(如果只使用Publisher Confirm機制,訊息丟失了,卻仍舊可以收到來自伺服器的Ack,這也是實際使用中容易犯的錯誤。官方說明:“對於無法路由的信息,broker會在確認了通過exchange無法將訊息路由到任何queue後,傳送回客戶端basic.ack進行確認(其中包含空的queue列表)。如果客戶端傳送訊息時使用了mandatory屬性,則會發送回客戶basic.return + basic.ack資訊。” 其中說,發回的basic.ack中會包含一個空的queue列表,但是確實沒看到。既然如此還是最好使用mandatory屬性)。

3)immediate
如果使用了immediate標誌位,publisher->server,server上如果該訊息關聯的queue上有消費者,則馬上將訊息投遞給它,如果所有queue都沒有消費者,直接把訊息返還給生產者,不用將訊息入佇列等待消費者了。immediate標誌位設定為true時,如果exchange在將訊息route到queue(s)時發現對應的queue上沒有消費者,那麼這條訊息不會放入佇列中。當與訊息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該訊息會通過basic.return方法返還給生產者。RabbitMQ 3.0.0以後的版本中去掉immediate引數支援,因為immediate標記會影響映象佇列效能,增加程式碼複雜性,並建議採用"設定訊息TTL"和"DLX"等方式替代(詳細說明參見《RabbitMQ的immediate代替方案之TTL 詳解》和《RabbitMQ的immediate代替方案之DLE)。具體使用中結合spring如下:

1. <rabbit:queue id="zkcloud.subsystem.dlx.queue" name="#{dlxNaming['zkcloud.subsystem.dlx.queue']}">  

2.         <rabbit:queue-arguments>  

3.             <entry key="x-message-ttl">  

4.                 <value type="java.lang.Long">86400000</value>  

5.             </entry>  

6.             <entry key="x-max-length">  

7.                 <value type="java.lang.Long">100</value>  

8.             </entry>  

9.         </rabbit:queue-arguments>  

10.     </rabbit:queue>  

11.   

12.     <rabbit:fanout-exchange id="zkcloud.subsystem.dlx.exchange" name="#{dlxNaming['zkcloud.subsystem.dlx.exchange']}">  

13.         <rabbit:bindings>  

14.             <rabbit:binding queue="zkcloud.subsystem.dlx.queue" />  

15.         </rabbit:bindings>  

16.     </rabbit:fanout-exchange>