1. 程式人生 > >RabbitMQ應用例項Python版-訊息確認和訊息持久化

RabbitMQ應用例項Python版-訊息確認和訊息持久化

訊息確認

當處理一個比較耗時得任務的時候,你也許想知道消費者(consumers)是否執行到一半就掛掉。當前的程式碼中,當訊息被RabbitMQ傳送給消費者(consumers)之後,馬上就會在記憶體中移除。這種情況,你只要把一個工作者(worker)停止,正在處理的訊息就會丟失。同時,所有傳送到這個工作者的還沒有處理的訊息都會丟失。

我們不想丟失任何任務訊息。如果一個工作者(worker)掛掉了,我們希望任務會重新發送給其他的工作者(worker)。

為了防止訊息丟失,RabbitMQ提供了訊息響應(acknowledgments)。消費者會通過一個ack(響應),告訴RabbitMQ已經收到並處理了某條訊息,然後RabbitMQ就會釋放並刪除這條訊息。

如果消費者(consumer)掛掉了,沒有傳送響應,RabbitMQ就會認為訊息沒有被完全處理,然後重新發送給其他消費者(consumer)。這樣,及時工作者(workers)偶爾的掛掉,也不會丟失訊息。

訊息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送訊息。這樣在處理一個耗時非常長的訊息任務的時候就不會出問題了。

訊息響應預設是開啟的。之前的例子中我們可以使用no_ack=True標識把它關閉。是時候移除這個標識了,當工作者(worker)完成了任務,就傳送一個響應。

12345678def callback(ch,method,properties,
body):print" [x] Received %r"%(body,)time.sleep(body.count('.'))print" [x] Done"ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_consume(callback,queue='hello')

執行上面的程式碼,我們發現即使使用CTRL+C殺掉了一個工作者(worker)程序,訊息也不會丟失。當工作者(worker)掛掉這後,所有沒有響應的訊息都會重新發送。

忘記確認

一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。訊息在你的程式退出之後就會重新發送,如果它不能夠釋放沒響應的訊息,RabbitMQ就會佔用越來越多的記憶體。

為了排除這種錯誤,你可以使用rabbitmqctl命令,輸出messages_unacknowledged欄位:

1 2 3 4 $rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues... hello00 ...done.

訊息持久化

如果你沒有特意告訴RabbitMQ,那麼在它退出或者崩潰的時候,將會丟失所有佇列和訊息。為了確保資訊不會丟失,有兩個事情是需要注意的:我們必須把”佇列”和”訊息”設為持久化。

首先,為了不讓佇列消失,需要把佇列宣告為持久化(durable):

1channel.queue_declare(queue='hello',durable=True)

儘管這行程式碼本身是正確的,但是仍然不會正確執行。因為我們已經定義過一個叫hello的非持久化佇列。RabbitMq不允許你使用不同的引數重新定義一個佇列,它會返回一個錯誤。但我們現在使用一個快捷的解決方法——用不同的名字,例如task_queue。

1 channel.queue_declare(queue='task_queue',durable=True)

這個queue_declare必須在生產者(producer)和消費者(consumer)對應的程式碼中修改。

這時候,我們就可以確保在RabbitMq重啟之後queue_declare佇列不會丟失。另外,我們需要把我們的訊息也要設為持久化——將delivery_mode的屬性設為2。

1 2 3 4 5 6 channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode=2,# make message persistent ))

注意:訊息持久化

將訊息設為持久化並不能完全保證不會丟失。以上程式碼只是告訴了RabbitMq要把訊息存到硬碟,但從RabbitMq收到訊息到儲存之間還是有一個很小的間隔時間。因為RabbitMq並不是所有的訊息都使用fsync(2)——它有可能只是儲存到快取中,並不一定會寫到硬碟中。並不能保證真正的持久化,但已經足夠應付我們的簡單工作佇列。如果你一定要保證持久化,你需要改寫你的程式碼來支援事務(transaction)。

轉自:http://www.ywnds.com/?p=5394&viewuser=41