1. 程式人生 > >python3 RabbitMQ (Work Queues!)

python3 RabbitMQ (Work Queues!)

Work Queues

與其他Python教程一樣,我們將使用Pika RabbitMQ客戶機版本0.11.0。

在這裡插入圖片描述

本章的指南關注什麼呢? 在第一個教程中,我們編寫了從一個命名佇列傳送和接收訊息的程式。在本例中,我們將建立一個工作佇列,用於在多個工作者之間分配耗時的任務。

工作佇列(又名:任務佇列)背後的主要思想是避免立即執行佔用大量資源的任務,並且必須等待它完成。相反,我們把任務安排在以後完成。我們將任務封裝為訊息並將其傳送到佇列。在後臺執行的worker程序將彈出任務並最終執行作業。當您執行許多工作者時,任務將在他們之間共享。

這個概念在web應用程式中尤其有用,在web應用程式中,在短HTTP請求視窗中不可能處理複雜任務。

在本教程的前一部分中,我們傳送了一條包含“Hello World!”的訊息。現在我們將傳送代表複雜任務的字串。我們沒有實際的任務,比如調整圖片大小或者渲染pdf檔案,所以讓我們假裝很忙——使用time.sleep()函式——來假裝它。我們把弦中的點的數量作為複雜度;每個點將佔“工作”的一秒鐘。例如,Hello…需要三秒鐘。

在本教程的前一部分中,我們傳送了一條包含“Hello World!”的訊息。現在我們將傳送代表複雜任務的字串。我們沒有實際的任務,比如調整圖片大小或者渲染pdf檔案,所以讓我們假裝很忙——使用time.sleep()函式——來假裝它。我們把弦中的點的數量作為複雜度;每個點將佔“工作”的一秒鐘。例如,Hello…需要三秒鐘。

import sys

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
print(" [x] Sent %r" % message)

我們的老。py指令碼還需要一些修改:它需要為訊息體中的每個點偽造一秒鐘的工作時間。它將從佇列中彈出訊息並執行任務,因此我們將其稱為worker.py:

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

迴圈排程 使用任務佇列的優點之一是能夠輕鬆地並行工作。如果我們正在積累積壓的工作,我們可以增加更多的工人,這樣就可以很容易地擴大規模。

首先,讓我們試著執行兩個worker。py指令碼同時出現。它們都將從佇列中獲取訊息,但具體如何呢?讓我們來看看。

你需要開啟三個控制檯。兩個將執行這個工人。py指令碼。這些控制檯將是我們的兩個消費者——C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C
# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

預設情況下,RabbitMQ將依次向下一個使用者傳送每條訊息。平均而言,每個消費者都會收到相同數量的訊息。這種訊息分發的方式稱為迴圈。在三個或更多的員工身上試試。

訊息確認 完成一項任務可能需要幾秒鐘。您可能想知道,如果某個消費者開始了一項很長的任務,但只完成了部分任務,那麼會發生什麼情況呢?使用我們的當前程式碼,一旦RabbitMQ將訊息傳遞給客戶,它立即標記為刪除。在這種情況下,如果您殺死一個工人,我們將失去訊息,它只是處理。我們還會丟失所有傳送給這個工人但尚未處理的訊息。

但我們不想失去任何任務。如果一個工人死了,我們希望把任務交給另一個工人。

為了確保訊息不會丟失,RabbitMQ支援訊息確認。使用者返回ack(nowledgement),告訴RabbitMQ已經接收、處理了特定的訊息,RabbitMQ可以隨意刪除它。

如果使用者在沒有傳送ack的情況下死亡(其通道關閉、連線關閉或TCP連線丟失),RabbitMQ將理解訊息沒有完全處理,並將重新排隊。如果在同一時間有其他消費者線上,它會迅速將其重新發送給另一個消費者。這樣你就可以確保沒有資訊丟失,即使工人偶爾會死去。

沒有任何訊息超時;當使用者死亡時,RabbitMQ將重新傳遞訊息。即使處理一條訊息需要非常非常長的時間,也沒有問題。

手動訊息確認在預設情況下是開啟的。在前面的示例中,我們通過no_ack=True標誌顯式地關閉了它們。當我們完成一項任務時,是時候移除此標誌並從工作人員傳送適當的確認資訊了。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='hello')

使用這段程式碼,我們可以確保即使您在處理訊息時使用CTRL+C殺死了一個工人,也不會丟失任何東西。在工人死後不久,所有未確認的資訊將被重新發送。

確認必須通過相同的通道傳送,這是為了在同一通道上接收。嘗試承認使用不同的通道將導致通道級別的協議異常。要了解更多資訊,請參閱《doc指南》。

被遺忘的確認
錯過basic_ack是一個常見的錯誤。這是一個容易犯的錯誤,但後果是嚴重的。當您的客戶端退出時,訊息將被重新傳遞(這可能看起來像隨機的重新傳遞),但是RabbitMQ將佔用越來越多的記憶體,因為它不能釋放任何未被新增的訊息。

為了除錯這種錯誤,可以使用rabbitmqctl列印messages_unrecognized欄位:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

On Windows, drop the sudo: 
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

訊息持久化儲存

我們已經學會了如何確保即使使用者死亡,任務也不會丟失。但是如果RabbitMQ伺服器停止,我們的任務仍然會丟失。

當RabbitMQ退出或崩潰時,它將忘記佇列和訊息,除非您告訴它不要這樣做。需要做兩件事情來確保訊息不會丟失:我們需要將佇列和訊息都標記為持久的。

首先,我們需要確保RabbitMQ永遠不會丟失佇列。為了做到這一點,我們需要宣告它是持久化的:

channel.queue_declare(queue='hello', durable=True)

雖然這個命令本身是正確的,但是在我們的設定中它不能工作。這是因為我們已經定義了一個名為hello的佇列,它不持久。RabbitMQ不允許重新定義具有不同引數的現有佇列,並且會向試圖這樣做的任何程式返回一個錯誤。但是有一個快速的解決方案——讓我們宣告一個具有不同名稱的佇列,例如task_queue:

channel.queue_declare(queue='task_queue', durable=True)

這個queue_declare修改需要同時應用於生產者程式碼和使用者程式碼。

至此,我們確信即使RabbitMQ重新啟動,task_queue佇列也不會丟失。現在我們需要將我們的訊息標記為持久化——通過提供一個值為2的delivery_mode屬性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

訊息永續性注意事項

將訊息標記為永續性並不能完全保證訊息不會丟失。雖然它告訴RabbitMQ將訊息儲存到磁碟,但是仍然有一個短時間視窗,當RabbitMQ接受了一條訊息並且還沒有儲存它。此外,RabbitMQ不會對每條訊息執行fsync(2)操作——它可能只是被儲存為快取,而不是真正寫入磁碟。永續性保證並不強,但對於我們的簡單任務佇列來說已經足夠了。如果您需要更強的保證,那麼您可以使用publisher confirmed。

公平分發

您可能已經注意到,排程仍然不能完全按照我們的要求工作。例如,在有兩個工人的情況下,當所有奇怪的資訊都很重,偶數資訊都很輕時,一個工人會一直很忙,而另一個幾乎不做任何工作。好吧,RabbitMQ對此一無所知,它仍然會均勻地傳送訊息。

這是因為RabbitMQ只是在訊息進入佇列時傳送訊息。它不檢視消費者未確認訊息的數量。它只是盲目地將第n個訊息傳送給第n個消費者。

在這裡插入圖片描述

為了打敗它,我們可以使用basic語言。設定了prefetch_count=1的qos方法。這告訴RabbitMQ不要一次給一個工人傳送多個訊息。或者,換句話說,在處理並確認前一條訊息之前,不要向工作人員傳送新訊息。相反,它會把它傳送給下一個不太忙的工人。

channel.basic_qos(prefetch_count=1)

關於佇列大小的注意事項

如果所有的工人都很忙,你的隊伍就會排滿。您將希望關注這一點,並可能新增更多的工人,或使用訊息TTL。

所有的程式碼

new_task.py script:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

And our worker:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()