Python 訊息佇列rabbitmq使用之工作佇列使用多個worker接收訊息
阿新 • • 發佈:2018-12-10
前面已經介紹過怎麼安裝rabbitmq以及要使用的三方庫
因此這裡直接進入例項
1、釋出端程式碼
# new_task.py
import pika # 匯入pika
import sys # 匯入系統模組
# 開啟連結,連結本地的rabbitmq
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
# 建立一個連結
channel = connection.channel()
# 指定一個佇列,名為task_queue durable=true表示 為了不讓佇列消失,需要把佇列宣告為持久化(durable)
# 這樣做的目的是:確保在RabbitMq重啟之後queue_declare佇列不會丟失
channel.queue_declare(queue='task_queue', durable=True)
# 獲取執行指令碼時的輸入
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',# exchange引數就是交換機的名稱。空字串代表預設或者匿名交換機:訊息將會根據指定的routing_key分發到指定的佇列。
routing_key='task_queue' , # 指明佇列
body=message, # 傳送的訊息
properties=pika.BasicProperties(
delivery_mode = 2, # 另外,我們需要把我們的訊息也要設為持久化——將delivery_mode的屬性設為2
))
print (" [x] Sent %r" % message)
# 關閉連結
connection.close()
2、消費段程式碼
# new_worker.py
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
time.sleep(body.decode().count('.') )
print(" [x] Done")
# 配置訊息確認
# 一個很容易犯的錯誤就是忘了basic_ack,後果很嚴重。訊息在你的程式退出之後就會重新發送,
# 如果它不能夠釋放沒響應的訊息,RabbitMQ就會佔用越來越多的記憶體。
ch.basic_ack(delivery_tag = method.delivery_tag)
# 使用公平排程,這樣是告訴RabbitMQ,再同一時刻,不要傳送超過1條訊息給一個工作者(worker),
# 直到它已經處理了上一條訊息並且作出了響應。這樣,RabbitMQ就會把訊息分發給下一個空閒的工作者(worker)。
channel.basic_qos(prefetch_count=1)
# 這裡我們已經將no_ack=True刪掉了,表示開啟訊息確認,但是必須要在訊息執行完後進行確認,看 ch.basic_ack配置即可
# 意義:防止某個工作者執行一半意外終止後訊息丟失和它掌管的訊息都丟失,配置後就是在執行完畢後會告訴釋出者然後執行下一個
channel.basic_consume(callback,queue='task_queue')
channel.start_consuming()
3、如何執行
消費端:
python new_worker.py
注意:這裡儘量多執行即可消費端,便於觀察執行原理 釋出端:
python new_task.py hello.
python new_task.py hello..
python new_task.py hello...
python new_task.py hello....
python new_task.py hello.....
執行多次,檢視消費端的輸出