1. 程式人生 > >RabbitMq之訊息確認

RabbitMq之訊息確認

  最近閱讀了rabbitmq的官方文件,然後結合之前面試時被問到關於訊息佇列的問題來探索一下關於訊息佇列的訊息確認機制。

  其實訊息確認就是消費者確認訊息被消費了, 生產者確認訊息已經發送到了訊息佇列中了。

  我們知道rabbitmq有四種訊息機制,下圖是為了我們對訊息確認的理解從官網盜了一張工作佇列的圖如下:

 

一、 關於消費者確認方面問題

  在我們的mq推送了訊息給消費者後,我們怎麼知道訊息被消費者消費了呢?萬一消費者沒有消費該訊息,或者消費者掛了,這訊息是不是就永久丟失了,所以根據此有下列幾個關於消費者確認的相關問題。同時在這我們也糾正一個常見的誤區,mq是推訊息到消費者處的而不是消費者去mq中取訊息的,然後在mq中訊息充足的情況下,mq推訊息給消費者不是等消費者消費完一個再推一個,而是根據 prefetch_count引數來決定可以推多個訊息到消費者的快取裡面。

 

問題1: 如果其中一個消費者開始一項漫長的任務,而僅部分完成而死掉,會發生什麼情況。使用我們當前的程式碼,RabbitMQ一旦將訊息傳遞給消費者,便立即將其標記為刪除。在這種情況下,如果您殺死一個work,我們將丟失正在處理的訊息。我們還將丟失所有傳送給該特定工作人員但尚未處理的訊息。

答:為了確保訊息永不丟失,RabbitMQ支援 訊息確認。消費者傳送回一個確認(acknowledgement)以告知RabbitMQ已經接收,處理了特定的訊息,然後RabbitMQ會去刪除這條訊息,如果消費者在不傳送確認的情況下死亡(其通道已關閉,連線已關閉或TCP連線丟失),RabbitMQ將瞭解訊息未得到充分處理,並將重新排隊。如果同時有其他消費者線上,它將很快將其重新分發給另一個消費者。這樣,您可以確保即使工人偶爾死亡也不會丟失任何訊息

 

問題2:開啟了訊息持久化,訊息就一定不會丟失嗎?

答: 將訊息標記為永續性並不能完全保證不會丟失訊息。儘管它告訴RabbitMQ將訊息儲存到磁碟,但是仍有很短的時間RabbitMQ接受了訊息但尚未將其儲存。另外,RabbitMQ不會對每條訊息都執行fsync(2)-它可能只是儲存到快取中,而沒有真正寫入磁碟。永續性保證並不強,但是對於我們的簡單任務佇列而言,這已經綽綽有餘了。如果您需要更強有力的保證,則可以使用 釋出者確認。

  在這裡補充一個知識:如果mq開啟了持久化以後, 生產者把訊息推給訊息佇列, 訊息佇列會複製一份訊息到持久化佇列,然後有新執行緒從持久化佇列中把訊息持久化到磁碟中。

 

問題3: 兩個消費者同時消費一個佇列,是佇列分發訊息還是消費者去取訊息?

答:您可能已經注意到,排程仍然無法完全按照我們的要求進行。例如,在有兩名工人的情況下,當有的訊息都很重,有的訊息訊息很輕時,一位工人將一直忙碌而另一位工人將幾乎不做任何工作。好吧,RabbitMQ對此一無所知,並且仍將平均分配訊息。

發生這種情況是因為RabbitMQ在訊息進入佇列時才排程訊息。它不會檢視使用者的未確認訊息數。它只是盲目地將每第n條訊息傳送給第n個使用者。 為了解決這個問題,我們可以將Channel#basic_qos通道方法與 prefetch_count = 1設定一起使用。這使用basic.qos協議方法來告訴RabbitMQ一次不向工作人員傳送多條訊息。換句話說,在處理並確認上一條訊息之前,不要將新訊息傳送給工作人員。而是將其分派給不忙的下一個工作程式。     這裡補充一個知識:當兩個消費者同時訂閱了同一個佇列時,佇列會把訊息迴圈平均分配給兩個消費者。

 

問題4: 關於佇列大小的注意事項,如果佇列滿了怎麼處理?

答:如果佇列滿了以後,我們一方面可以增加消費者的數量,很淺顯消費者越多消費訊息就越快,還有一個是設定訊息的過期時間來控制。

  這裡補充個知識: 當訊息過期或者被消費者拒絕並且設定不返回佇列中,這訊息將加入死信佇列。

 

二、關於生產者確認方面問題

  同樣的,我們的生產者給mq push訊息的時候,我們怎麼知道這訊息放進了mq裡面了呢?所以這引發了mq確認訊息的相關問題。

問題1: 釋出者確認訊息機制是怎樣的?

答:首先訊息的傳遞機制是這樣,釋出者將訊息傳送到訊息佇列的exchange中,然後根據exchange的分發規則,分發到制定具體佇列,如果開啟了持久化,訊息會複製一份持久化佇列中, 持久化佇列在收到訊息後會給佇列返回ack確認資訊, 然後佇列給exchange返回確認資訊, exchange根據回撥函式給釋出者返回確認資訊,這樣釋出者確認就算完成了。

  當時你知道釋出者的確認規則,你是不是立馬想到確認要經過這麼多箇中間人,省略中間的環節行不行勒,哈哈答案是可以的,不過得自己去改造啦,這是效能優化的一項。

 

問題2:釋出者確認的方式有哪幾種?

答:有三種確認方式

1)同步確認,訊息發出後一直處於阻塞狀態等待確認訊息,可以設定超時時間,如果超過超時時間則認為訊息丟失, 如果超時或者訊息確認失敗則會丟擲異常,我們捕獲異常然後選擇是重發還是等其他處理方式。

2)批量確認,一次性發出一批訊息然後阻塞等待,形式和同步確認相似,有點則是一批確認所以效能上會有很大的提升,缺點是我們不能確認具體發生了什麼錯誤,並且我們得在記憶體中儲存這批訊息以確認傳送成功的訊息和重發失敗的訊息。

3)非同步確認,傳送訊息後註冊一個回撥函式,不阻塞執行緒,在訊息確認後會呼叫回撥函式

 

  如果想更詳細地瞭解其機制可以閱讀其官方文件,文件中有關於訊息確認的具體程式碼展示,可以更方便地理解其機制。

&n