1. 程式人生 > >Python-RabbitMQ消息分發機制

Python-RabbitMQ消息分發機制

連接 處的 code top exclusive exc 但是 現在 pika

上一篇中的例子是一個生產者對應一個消費者,那能不能一個生產者對應一個消費者呢? 下面來測試一下,順便觀察一下它的分發策略。。。

步驟一:先編輯生產者代碼(rabbit_send.py)

#top1:導入pika模塊
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import pika

#top2:建立socket
connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))

#top3:聲明管道
channel = connection.channel()

#top4:在管道中聲明Queue,Queue的名字是‘exclusive‘(隨意)
channel.queue_declare(queue=‘exclusive‘)

#top5:在管道內發送消息
channel.basic_publish(exchange=‘‘,
                      routing_key=‘exclusive‘,  #queue名稱
                      body=‘Let s go!‘)             #消息內容

#top6:關閉隊列
connection.close()

步驟二:編輯消費者代碼(rabbit_receive.py)

#top1:導入pika模塊
import os
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
import pika

#top2:建立scoket
connection = pika.BlockingConnection(pika.ConnectionParameters(
    ‘localhost‘))

#top3:聲明管道
channel = connection.channel()

#top4:聲明Queue
channel.queue_declare(queue=‘exclusive‘)

#top5:定義一個處理消息的函數(所說的回調函數)
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

#top6:接收消息
channel.basic_consume(#消費消息
                      callback,   #如果收到消息,就調用callback函數來處理消息
                      queue=‘exclusive‘,
                      no_ack=True)

#top7:此處的start只要一起動就一直運行了,因為它不止收一條
channel.start_consuming()
定義好生產者和消費者後,執行一個生產者多個消費者進行測試。測試結果是消息的接收機制是輪詢的,生產者每發送一次消息,都由消費者輪流來接收。

接下來考慮一個情況,現在的代碼是消費者接收到消息後調用callback函數去處理消息立刻打印,但是如果我的處理過程需要30秒的時間,恰好在這30秒的時間內消費者宕機了,這個消息還沒有處理完,比如我有一個轉賬的業務,那轉到一半宕機了,那咋整?應該有一個確認機制來確定到底是不是處理完了,消費者應該發送一個確認給生產者,然後生產者才把消息從消息隊列裏刪除;還是糾結。。。。那消費者處理到一半宕機了,還怎麽給生產者發確認。。。。
還用剛才的代碼來測試,把在消費者處理消息的函數中加入一個time.sleep(30),再print一句話來模擬處理時間,再執行生產者和多個消費者,假如第一個消費者接收到消息我們把它停止,再觀察別的消費者,沒反應。。。。什麽鬼?消息丟了!!!
那我們回過頭來把no_ack=True註釋掉,這個的意思是"不確認",再測試。結果是把第一個消費者斷了,第二個消費者繼續處理消息,保證消息被處理完,那為什麽生產者知道消費者宕機了呢?因為socket斷了,它是連接RabbitMQ的,它斷了自然而然就知道消費者宕機了。。
一般我們不需要加no_ack=True參數,只有那些對生產者不關心的消息可以加上。

Python-RabbitMQ消息分發機制