python RabbitMQ消息隊列
RabbitMQ
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等。
與以下兩者不同的是,rabbitMq 可以跨進程。
線程queue:同一進程下不同線程之間的交互。
進程queue:父進程與子進程進行交互,或者同屬於同一父進程下多個子進程進行交互。不同進程之間不能交互。
基本操作
RabbitMQ可以同時維護很多的隊列,生產者可以把消息放到不同的隊列發送給消費者。
send端
import pika #相當於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #聲明一個管道 channel = conn.channel() #聲明queue channel.queue_declare(queue=‘hello‘) channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=‘Hello World!‘) # routing_key 消息的key body 消息的內容 print(‘Sent "hello world"‘) conn.close()
運行結果
Sent "hello world"
receive端
import pika #相當於聲明一個socket conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #聲明一個管道 channel = conn.channel() #聲明queue 這裏可以不用聲明,但是如果消費者先運行,又不希望出錯,就要消費者先運行 channel.queue_declare(queue=‘hello‘) def callback(ch,method,properties,body): print(‘[x] Received %r‘ % body ) channel.basic_consume(callback, queue=‘hello‘,no_ack=True ) #消費消息 如果收到消息就調用CALLBACK函數處理 print(‘[*] Waiting for message.To exit press CTRL+C‘) channel.start_consuming() #開始收消息
運行結果
[*] Waiting for message.To exit press CTRL+C [x] Received b‘Hello World!‘
一個生產者對應多個消費者
代碼同上,啟動三個消費者和一個生產者,第一個消費者會接收到生產者的第一個消息,第二個消費者會接收到生產者的第二個消息,這樣依次輪訓。
如果某一個消費者在處理消息的過程中,斷電或者當機了,那麽這個消息狀態需要確認。消費者處理完成後發狀態給生產者,生產者把消息從隊列裏刪除。
no_ack=True (消費者端參數)
表示不確認消息狀態,生產者不關心消費者狀態。 如果把這個參數去掉,那麽生產者需要得到消費者的回應狀態,比如我啟動了3個消費者和一個生產者,生產者需要得到消費者的回應。如果第一個消費者得到消息,中斷了連接,那麽消息會發送到第二個消費者,依次類推,直到消費者給生產者一個狀態,生產者才會把消息從隊列裏刪除。
這個狀態要手動發送給生產者:
ch.basic_ack(delivery_tag=method.delivery_tag)
生產者接收到狀態就會把消息從隊列裏面刪除。
消息持久化
從上面我們知道,消費者在發生當機或者其它情況,只要沒有把狀態返回給生產者,那麽這個消息一直都在。如果生產者當機了怎麽辦?消息還存不存在了呢?
如果生產者當機,那麽之前存的消息都會丟失。為了避免這種情況,那麽就要要求數據持久化。
隊列持久化
durable=True
在聲明隊列的時候 同時聲明隊列持久化。
channel.queue_declare(queue=‘hello‘,durable=True)
這裏是隊列持久化了,但是消息還沒有了。
數據持久化
在生產者發送消息的一端加上以上參數。
properties=pika.BasicProperties( delivery_mode=2, )
查看hello2的消息。
python RabbitMQ消息隊列