RabbitMQ消息分發輪詢和Message Acknowledgment
一、消息分發
RabbitMQ中的消息都只能存儲在Queue中,生產者(下圖中的P)生產消息並最終投遞到Queue中,消費者(下圖中的C)可以從Queue中獲取消息並消費。
多個消費者可以訂閱同一個Queue,這時Queue中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息並處理。
啟動3個消費者
生產者依次生成3條消息
可見3條消息分別被3個消費者獲取,所以RabbitMQ是采用輪詢機制將消息隊列Queue中的消息依次發給不同的消費者
二、消息確認(Message Acknowledgment)
在實際應用中,可能會發生消費者收到Queue中的消息,但沒有處理完成就宕機(或出現其他意外)的情況,這種情況下就可能會導致消息丟失。為了避免這種情況發生,我們可以要求消費者在消費完消息後發送一個回執給RabbitMQ,RabbitMQ收到消息回執(Message acknowledgment)後才將該消息從Queue中移除;如果RabbitMQ沒有收到回執並檢測到消費者的RabbitMQ連接斷開,則RabbitMQ會將該消息發送給其他消費者(如果存在多個消費者)進行處理。這裏不存在timeout概念,一個消費者處理消息時間再長也不會導致該消息被發送給其他消費者,除非它的RabbitMQ連接斷開。
如何來實現呢?只需要將consumer消費者端中 no_ack = True去掉就行了
no_ack 就 no acknowlegment的意思,這個參數會導致RabbitMQ並不關心消費者有沒有處理完成,可能在消費者獲取消息後就將該消息從Queue中移除。去掉這個參數,如果在消費者執行過程當初出現了意外(宕機),RabbitMQ沒有收到消息回執,就會發送給其他消費者執行。
修改consumer端
def callback(ch, method, properties, body): print(‘--->>‘, ch, ‘\n‘, method, ‘\n‘, properties) time.sleep(30) # 讓消費者處理的時間長一點,可以用來模擬運行中斷開的情況 print(" [x] Received %r" % body) # ch: 聲明的管道channel對象內存地址 # channel.basic_consume(callback, # 如果收到消息就調用callback函數來處理消息 queue=‘hello‘, # no_ack=True )
運行3個消費者,接收生產者的數據,依次關閉消費者1和消費者2,最後RabbitMQ中的消息還是會被消費者3處理。
RabbitMQ消息分發輪詢和Message Acknowledgment