1. 程式人生 > >基於Python語言使用RabbitMQ消息隊列(二)

基於Python語言使用RabbitMQ消息隊列(二)

封裝 queue mqc 想是 pat csdn 技術 描述 knowledge

工作隊列

在第一節我們寫了程序來向命名隊列發送和接收消息 。在本節我們會創建一個工作隊列(Work Queue)用來在多個工人(worker)中分發時間消耗型任務(time-consuming tasks)。

工作隊列(又叫做: Task Queues)背後的主體思想是 避免立刻去執行耗時任務並且等待它們完成。 相反我們可以安排這樣的任務稍後執行. 我們可以把任務封裝成一個消息並發送到隊列中. 一個在後臺運行的工人進程會接收任務並最終執行工作。當你使很多工人(workers)程序運行時,多個任務就會由它們共同承擔。
這個概念在web應用中尤其有用,因為在一次短期的HTTP請求中處理復雜任務幾乎是不可能的。

準備

在前一節我們發送了消息 “Hello World!”. 現在我們會發送一個代表復雜任務的字符串. 目前我們沒有一個真實情境下的任務,像重置圖片大小或者pdf文件渲染,所以我們就做一個偽裝,假裝我們很忙就行了:通過time.sleep()方法的使用,我們讓字符串中存在的點(.)的數量代表任務的復雜性,一個點占用一個工作的一秒鐘。例如,“Hello…”會耗用三秒鐘。

我們將會稍微修改先前的 send.py 代碼, 允許從命令行發送任意的消息. 這個程序會安排任務給我們的工作隊列,所以重命名為new_task.py:

import sys

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message                     #persistent
                      ))
print(" [x] Sent %r" % message)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

我們之前的 receive.py 腳本也需要做些改變: 假裝讓消息體中的每個點”.”耗費一秒鐘的工作。它需要從隊列中提取消息並且完成任務 ,我們把它命名為worker.py:

import time

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b‘.‘))
    print(" [x] Done")
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

輪詢派發(Round-robin dispatching)

使用任務隊列的一個優勢是簡化並行任務的能力。如果我們正在建立一個後臺記錄的任務,只需要多添加些工人(worker),這很容易做到。

首先我們同時運行起兩個worker.py腳本,它們都會從隊列中獲取消息,到底是怎麽回事呢,我們來看一下 。

你需要打開三個控制臺,兩個運行worker.py腳本。這兩個控制臺會成為我們的兩個消費者–C1和C2。

# shell 1
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 2
python worker.py
# => [*] Waiting for messages. To exit press CTRL+C

# shell 3
python new_task.py First message.
python new_task.py Second message..
python new_task.py Third message...
python new_task.py Fourth message....
python new_task.py Fifth message.....
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

下圖為在我的Ubuntu終端上的運行結果:
shell1
技術分享圖片
shell2
技術分享圖片
shell3
技術分享圖片

消息通知

RabbitMQ會默認把每條消息按次序發送給下一個消費者,平均每個消費者會獲取到相同數量的消息,這種分發消息的方式就是輪詢(round-robin),你可以使用三個或者更多工人試一下效果。
做一件任務需要耗費數秒鐘的時間。你可能疑惑如果一個消費者開展了一個長時間任務,但只完成了一部分時就死掉了,這時候會發生什麽呢? 就我們當前的代碼來說,一旦RabbitMQ把消息傳遞給了它的客戶,RabbitMQ會立刻從內存中把這條消息刪除掉,這樣的話如果你殺死掉一個工人進程,我們就會丟掉它正在處理的這條消息。我們也會丟掉所有派發給這個特定工人進程的還有沒被處理的消息。

但我們不想丟掉任何任務,如果一個工人進程死掉了,我們希望任務會被傳遞給另一個工人。
為了確保消息沒有丟,RabbitMQ支持消息通知機制(message acknowledgments)。一條通知(ack)會從消費者處返回來告知RabbitMQ特定的消息已經被接收,被處理並且RabbitMQ可以刪掉它。

如果一個消費者掛了(它的渠道(channel)被關閉,連接被關閉或者TCP連接丟失)但沒有發送通知,會理解為消息沒有被完整地處理並且會重新把它推入隊列。這時如果有其他消費者存在,它會迅速重新把它傳遞給其他消費者。這樣的話你就可以確定不會有消息被丟掉,哪怕是工人進程意外掛了。

不會出現任何的消息超時問題,當消費者掛掉RabbitMQ會重新發送消息即便處理一條消息花費了很長很長時間。

消息通知默認是打開的。在前面的例子中我們通過設置no_ack=True 顯式地關閉了他們flag. 是時候把它拿掉了,並且一旦完成了一個任務就讓工人發送一條通知。

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count(‘.‘) )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_consume(callback,
                      queue=‘task_queue‘)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

使用上面的代碼我們可以確保什麽也不會丟失,即便你通過CTRL+C退出了一個正在處理消息的工人進程。工人進程掛掉後,所有未返回通知的消息都會被重新發送。

忘了通知

一個常見錯誤是我們忘了basic_ack ,這看上去是個小錯誤,
但後果很嚴重。當你退出客戶端時消息會重新發送(看上去像是隨機發送),但RabbitMQ會吃掉越來越多的內存,因為它不會釋放任何未返回通知的消息。

調試這種類型的錯誤你可以使用rabbitmqctl打印messages_unacknowledged字段

sudo rabbitmqctl list_queues name messages_ready
messages_unacknowledged

在 Windows上, 不用 sudo:

rabbitmqctl.bat list_queues name messages_ready
messages_unacknowledged

消息持久化(durability)

我們已經了解如何確保即便消費者死掉任務也不會丟失,但是如果RabbitMQ服務停止我們的任務仍然會丟失。
當RabbitMQ退出或崩潰時,它會遺忘掉隊列和消息,除非你告訴它不要這樣做。確保消息不會丟失我們有兩件事需要做:把隊列和消息都標記為持久化的。

首先,我們確保RabbitMQ不會丟失我們的隊列,為了達到這個目的需要把它聲明為持久化的:

channel.queue_declare(queue=‘hello‘, durable=True)
  • 1

就這條命令自身來說它是正確的,但在我們的設置中它無法正常工作。因為我們已經定義了一個叫做hello的非持久化的隊列。RabbitMQ 不允許你使用不同的參數重新定義一個已經存在的隊列並且會向任何試圖那樣做的程序返回一個錯誤。 但有一個變通方案(workaround)-我們用不同的名字聲明一個隊列,例如 task_queue:

channel.queue_declare(queue=‘task_queue‘, durable=True)
  • 1

這queue_declare 的改變 需要應用到生產者和消費者代碼上面(其實我在前面早已經這樣做了)
這樣我們確定task_queue 隊列不會被丟掉即便 RabbitMQ 重啟。 現在我們需要標記我們的消息為持久化——通過提供一個值為2的delivery_mode 屬性。

channel.basic_publish(exchange=‘‘,
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, 
                      ))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

公平派發

你可能已經註意到派發過程仍然不太合適。例如有兩個工人的情況, 當所有編號為偶數的消息是重量級,奇數消息是輕量級時,一個工人進程會持續繁忙,另一個卻沒做什麽工作。好吧,RabbitMQ對此一無所知,並且繼續若無其事地派發消息。
發生這種情況是因為當消息進入隊列時,RabbitMQ只是進行派發,它不會查看一個消費者的未返回通知的數量。它只是忙目地把第n條消息派發給第n條消費者。
技術分享圖片
為了應對這種情況,我們可以使用basic.qos方法,設置prefetch_count=1 。這會告訴 RabbitMQ 不要同時給一個工人超過一條消息。或者換句話說,在一個工人處理完先前的消息並且返回通知前不要給他派發新的消息。相反的,它會把消息派發給下一個不忙的工人。

channel.basic_qos(prefetch_count=1)
  • 1

註意隊列大小

如果所有工人都在繁忙中, 你的隊列可能會被填滿. 你會留意到這種情況,並且可能添加更多工人或者使用 message TTL(一個隊列和消息存活時間的擴展,在此不做過多介紹)

整合

new_task.py腳本完整代碼:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)

message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange=‘‘,
                      routing_key=‘task_queue‘,
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

worker.py腳本完成代碼:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()

channel.queue_declare(queue=‘task_queue‘, durable=True)
print(‘ [*] Waiting for messages. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b‘.‘))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue=‘task_queue‘)

channel.start_consuming()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

使用消息通知和prefetch_count你可以建立一個工作隊列 ,持久化選項會使任務仍然存在即便RabbitMQ重啟。
下一節我們會了解如何把相同的消息傳遞給多個消費者。

基於Python語言使用RabbitMQ消息隊列(二)