1. 程式人生 > >基於Python語言使用RabbitMQ訊息佇列(二)

基於Python語言使用RabbitMQ訊息佇列(二)

工作佇列

在第一節我們寫了程式來向命名佇列傳送和接收訊息 。在本節我們會建立一個工作佇列(Work Queue)用來在多個工人(worker)中分發時間消耗型任務(time-consuming tasks)。

工作佇列(又叫做: Task Queues)背後的主體思想是 避免立刻去執行耗時任務並且等待它們完成。 相反我們可以安排這樣的任務稍後執行. 我們可以把任務封裝成一個訊息併發送到佇列中. 一個在後臺執行的工人程序會接收任務並最終執行工作。當你使很多工人(workers)程式執行時,多個任務就會由它們共同承擔。
這個概念在web應用中尤其有用,因為在一次短期的HTTP請求中處理複雜任務幾乎是不可能的。

準備

在前一節我們傳送了訊息 “Hello World!”. 現在我們會發送一個代表複雜任務的字串. 目前我們沒有一個真實情境下的任務,像重置圖片大小或者pdf檔案渲染,所以我們就做一個偽裝,假裝我們很忙就行了:通過time.sleep()方法的使用,我們讓字串中存在的點(.)的數量代表任務的複雜性,一個點佔用一個工作的一秒鐘。例如,“Hello…”會耗用三秒鐘。

我們將會稍微修改先前的 send.py 程式碼, 允許從命令列傳送任意的訊息. 這個程式會安排任務給我們的工作佇列,所以重新命名為new_task.py:

import sys

message = ' '.join(sys.argv[1
:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message #persistent )) print(" [x] Sent %r"
% message)

我們之前的 receive.py 指令碼也需要做些改變: 假裝讓訊息體中的每個點”.”耗費一秒鐘的工作。它需要從佇列中提取訊息並且完成任務 ,我們把它命名為worker.py:

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")

輪詢派發(Round-robin dispatching)

使用任務佇列的一個優勢是簡化並行任務的能力。如果我們正在建立一個後臺記錄的任務,只需要多新增些工人(worker),這很容易做到。

首先我們同時執行起兩個worker.py指令碼,它們都會從佇列中獲取訊息,到底是怎麼回事呢,我們來看一下 。

你需要開啟三個控制檯,兩個執行worker.py指令碼。這兩個控制檯會成為我們的兩個消費者–C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....

下圖為在我的Ubuntu終端上的執行結果:
shell1
這裡寫圖片描述
shell2
這裡寫圖片描述
shell3
這裡寫圖片描述

訊息通知

RabbitMQ會預設把每條訊息按次序傳送給下一個消費者,平均每個消費者會獲取到相同數量的訊息,這種分發訊息的方式就是輪詢(round-robin),你可以使用三個或者更多工人試一下效果。
做一件任務需要耗費數秒鐘的時間。你可能疑惑如果一個消費者開展了一個長時間任務,但只完成了一部分時就死掉了,這時候會發生什麼呢? 就我們當前的程式碼來說,一旦RabbitMQ把訊息傳遞給了它的客戶,RabbitMQ會立刻從記憶體中把這條訊息刪除掉,這樣的話如果你殺死掉一個工人程序,我們就會丟掉它正在處理的這條訊息。我們也會丟掉所有派發給這個特定工人程序的還有沒被處理的訊息。

但我們不想丟掉任何任務,如果一個工人程序死掉了,我們希望任務會被傳遞給另一個工人。
為了確保訊息沒有丟,RabbitMQ支援訊息通知機制(message acknowledgments)。一條通知(ack)會從消費者處返回來告知RabbitMQ特定的訊息已經被接收,被處理並且RabbitMQ可以刪掉它。

如果一個消費者掛了(它的渠道(channel)被關閉,連線被關閉或者TCP連線丟失)但沒有傳送通知,會理解為訊息沒有被完整地處理並且會重新把它推入佇列。這時如果有其他消費者存在,它會迅速重新把它傳遞給其他消費者。這樣的話你就可以確定不會有訊息被丟掉,哪怕是工人程序意外掛了。

不會出現任何的訊息超時問題,當消費者掛掉RabbitMQ會重新發送訊息即便處理一條訊息花費了很長很長時間。

訊息通知預設是開啟的。在前面的例子中我們通過設定no_ack=True 顯式地關閉了他們flag. 是時候把它拿掉了,並且一旦完成了一個任務就讓工人傳送一條通知。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue='task_queue')

使用上面的程式碼我們可以確保什麼也不會丟失,即便你通過CTRL+C退出了一個正在處理訊息的工人程序。工人程序掛掉後,所有未返回通知的訊息都會被重新發送。

忘了通知

一個常見錯誤是我們忘了basic_ack ,這看上去是個小錯誤,
但後果很嚴重。當你退出客戶端時訊息會重新發送(看上去像是隨機發送),但RabbitMQ會吃掉越來越多的記憶體,因為它不會釋放任何未返回通知的訊息。

除錯這種型別的錯誤你可以使用rabbitmqctl列印messages_unacknowledged欄位

sudo rabbitmqctl list_queues name messages_ready
messages_unacknowledged

在 Windows上, 不用 sudo:

rabbitmqctl.bat list_queues name messages_ready
messages_unacknowledged

訊息持久化(durability)

我們已經瞭解如何確保即便消費者死掉任務也不會丟失,但是如果RabbitMQ服務停止我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會遺忘掉佇列和訊息,除非你告訴它不要這樣做。確保訊息不會丟失我們有兩件事需要做:把佇列和訊息都標記為持久化的。

首先,我們確保RabbitMQ不會丟失我們的佇列,為了達到這個目的需要把它宣告為持久化的:

channel.queue_declare(queue='hello', durable=True)

就這條命令自身來說它是正確的,但在我們的設定中它無法正常工作。因為我們已經定義了一個叫做hello的非持久化的佇列。RabbitMQ 不允許你使用不同的引數重新定義一個已經存在的佇列並且會向任何試圖那樣做的程式返回一個錯誤。 但有一個變通方案(workaround)-我們用不同的名字宣告一個佇列,例如 task_queue:

channel.queue_declare(queue='task_queue', durable=True)

這queue_declare 的改變 需要應用到生產者和消費者程式碼上面(其實我在前面早已經這樣做了)
這樣我們確定task_queue 佇列不會被丟掉即便 RabbitMQ 重啟。 現在我們需要標記我們的訊息為持久化——通過提供一個值為2的delivery_mode 屬性。

channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))

公平派發

你可能已經注意到派發過程仍然不太合適。例如有兩個工人的情況, 當所有編號為偶數的訊息是重量級,奇數訊息是輕量級時,一個工人程序會持續繁忙,另一個卻沒做什麼工作。好吧,RabbitMQ對此一無所知,並且繼續若無其事地派發訊息。
發生這種情況是因為當訊息進入佇列時,RabbitMQ只是進行派發,它不會檢視一個消費者的未返回通知的數量。它只是忙目地把第n條訊息派發給第n條消費者。
這裡寫圖片描述
為了應對這種情況,我們可以使用basic.qos方法,設定prefetch_count=1 。這會告訴 RabbitMQ 不要同時給一個工人超過一條訊息。或者換句話說,在一個工人處理完先前的訊息並且返回通知前不要給他派發新的訊息。相反的,它會把訊息派發給下一個不忙的工人。

channel.basic_qos(prefetch_count=1)

注意佇列大小

如果所有工人都在繁忙中, 你的佇列可能會被填滿. 你會留意到這種情況,並且可能新增更多工人或者使用 message TTL(一個佇列和訊息存活時間的擴充套件,在此不做過多介紹)

整合

new_task.py指令碼完整程式碼:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

worker.py指令碼完成程式碼:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

使用訊息通知和prefetch_count你可以建立一個工作佇列 ,持久化選項會使任務仍然存在即便RabbitMQ重啟。
下一節我們會了解如何把相同的訊息傳遞給多個消費者。