1. 程式人生 > >RabbitMQ - Publisher的消息確認機制

RabbitMQ - Publisher的消息確認機制

publisher exchange 將在 序列號 sum chang 將不 同時 true

queue和consumer之間的消息確認機制:通過設置ack。那麽Publisher能不到知道他post的Message有沒有到達queue,甚至更近一步,是否被某個Consumer處理呢?畢竟對於一些非常重要的數據,可能Publisher需要確認某個消息已經被正確處理。

1. 事務機制 VS Publisher Confirm

如果采用標準的 AMQP 協議,則唯一能夠保證消息不會丟失的方式是利用事務機制 -- 令 channel 處於 transactional 模式、向其 publish 消息、執行 commit 動作。在這種方式下,事務機制會帶來大量的多余開銷,並會導致吞吐量下降 250% 。為了補救事務帶來的問題,引入了 confirmation 機制(即 Publisher Confirm)。

為了使能 confirm 機制,client 首先要發送 confirm.select 方法幀。取決於是否設置了 no-wait 屬性,broker 會相應的判定是否以 confirm.select-ok 進行應答。一旦在 channel 上使用 confirm.select方法,channel 就將處於 confirm 模式。處於 transactional 模式的 channel 不能再被設置成 confirm 模式,反之亦然。
一旦 channel 處於 confirm 模式,broker 和 client 都將啟動消息計數(以 confirm.select 為基礎從 1 開始計數)。broker 會在處理完消息後,在當前 channel 上通過發送 basic.ack 的方式對其進行 confirm 。delivery-tag 域的值標識了被 confirm 消息的序列號。broker 也可以通過設置 basic.ack 中的 multiple 域來表明到指定序列號為止的所有消息都已被 broker 正確的處理了。

在異常情況中,broker 將無法成功處理相應的消息,此時 broker 將發送 basic.nack 來代替 basic.ack 。在這個情形下,basic.nack 中各域值的含義與 basic.ack 中相應各域含義是相同的,同時 requeue 域的值應該被忽略。通過 nack 一或多條消息,broker 表明自身無法對相應消息完成處理,並拒絕為這些消息的處理負責。在這種情況下,client 可以選擇將消息 re-publish 。

在 channel 被設置成 confirm 模式之後,所有被 publish 的後續消息都將被 confirm(即 ack) 或者被 nack 一次。但是沒有對消息被 confirm 的快慢做任何保證,並且同一條消息不會既被 confirm 又被 nack 。

2. 消息在什麽時候確認

broker 將在下面的情況中對消息進行 confirm :

  • broker 發現當前消息無法被路由到指定的 queues 中(如果設置了 mandatory 屬性,則 broker 會先發送 basic.return)
  • 非持久屬性的消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中)
  • 持久消息到達了其所應該到達的所有 queue 中(和鏡像 queue 中),並被持久化到了磁盤(被 fsync)
  • 持久消息從其所在的所有 queue 中被 consume 了(如果必要則會被 acknowledge)

broker 會丟失持久化消息,如果 broker 在將上述消息寫入磁盤前異常。在一定條件下,這種情況會導致 broker 以一種奇怪的方式運行。例如,考慮下述情景:

1. 一個 client 將持久消息 publish 到持久 queue 中
2. 另一個 client 從 queue 中 consume 消息(註意:該消息具有持久屬性,並且 queue 是持久化的),當尚未對其進行 ack
3. broker 異常重啟
4. client 重連並開始 consume 消息

在上述情景下,client 有理由認為消息需要被(broker)重新 deliver 。但這並非事實:重啟(有可能)會令 broker 丟失消息。為了確保持久性,client 應該使用 confirm 機制。如果 publisher 使用的 channel 被設置為 confirm 模式,publisher 將不會收到已丟失消息的 ack(這是因為 consumer 沒有對消息進行 ack ,同時該消息也未被寫入磁盤)。

3. 編程實現

首先要區別AMQP協議mandatory和immediate標誌位的作用。

mandatory和immediate是AMQP協議中basic.pulish方法中的兩個標誌位,它們都有當消息傳遞過程中不可達目的地時將消息返回給生產者的功能。具體區別在於:
1. mandatory標誌位
當mandatory標誌位設置為true時,如果exchange根據自身類型和消息routeKey無法找到一個符合條件的queue,那麽會調用basic.return方法將消息返還給生產者;當mandatory設為false時,出現上述情形broker會直接將消息扔掉。
2. immediate標誌位
當immediate標誌位設置為true時,如果exchange在將消息route到queue(s)時發現對應的queue上沒有消費者,那麽這條消息不會放入隊列中。當與消息routeKey關聯的所有queue(一個或多個)都沒有消費者時,該消息會通過basic.return方法返還給生產者。

RabbitMQ - Publisher的消息確認機制