1. 程式人生 > >python RabbitMQ消息隊列

python RabbitMQ消息隊列

ges 遠程 received exit images 通信 父進程 queue per

RabbitMQ

MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。其中較為成熟的MQ產品有IBM WEBSPHERE MQ等等。

與以下兩者不同的是,rabbitMq 可以跨進程。

線程queue:同一進程下不同線程之間的交互。

進程queue:父進程與子進程進行交互,或者同屬於同一父進程下多個子進程進行交互。不同進程之間不能交互。

基本操作

RabbitMQ可以同時維護很多的隊列,生產者可以把消息放到不同的隊列發送給消費者。

技術分享

send端

import pika
#相當於聲明一個socket
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
#聲明一個管道
channel = conn.channel()

#聲明queue
channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,routing_key=‘hello‘,body=‘Hello World!‘) # routing_key 消息的key  body 消息的內容

print(‘Sent "hello world"‘)

conn.close()

運行結果

Sent "hello world"

receive端

import pika
#相當於聲明一個socket
conn = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))


#聲明一個管道
channel = conn.channel()

#聲明queue  這裏可以不用聲明,但是如果消費者先運行,又不希望出錯,就要消費者先運行
channel.queue_declare(queue=‘hello‘)

def callback(ch,method,properties,body):
    print(‘[x] Received %r‘ % body )

channel.basic_consume(callback, queue=‘hello‘,no_ack=True  ) #消費消息 如果收到消息就調用CALLBACK函數處理

print(‘[*] Waiting for message.To exit press CTRL+C‘)

channel.start_consuming() #開始收消息

運行結果

[*] Waiting for message.To exit press CTRL+C
[x] Received b‘Hello World!‘

一個生產者對應多個消費者

代碼同上,啟動三個消費者和一個生產者,第一個消費者會接收到生產者的第一個消息,第二個消費者會接收到生產者的第二個消息,這樣依次輪訓。

如果某一個消費者在處理消息的過程中,斷電或者當機了,那麽這個消息狀態需要確認。消費者處理完成後發狀態給生產者,生產者把消息從隊列裏刪除。

no_ack=True (消費者端參數)

表示不確認消息狀態,生產者不關心消費者狀態。 如果把這個參數去掉,那麽生產者需要得到消費者的回應狀態,比如我啟動了3個消費者和一個生產者,生產者需要得到消費者的回應。如果第一個消費者得到消息,中斷了連接,那麽消息會發送到第二個消費者,依次類推,直到消費者給生產者一個狀態,生產者才會把消息從隊列裏刪除。

這個狀態要手動發送給生產者:

ch.basic_ack(delivery_tag=method.delivery_tag)

生產者接收到狀態就會把消息從隊列裏面刪除。

技術分享

消息持久化

從上面我們知道,消費者在發生當機或者其它情況,只要沒有把狀態返回給生產者,那麽這個消息一直都在。如果生產者當機了怎麽辦?消息還存不存在了呢?

如果生產者當機,那麽之前存的消息都會丟失。為了避免這種情況,那麽就要要求數據持久化。

隊列持久化

durable=True

在聲明隊列的時候 同時聲明隊列持久化。

channel.queue_declare(queue=‘hello‘,durable=True)

技術分享

這裏是隊列持久化了,但是消息還沒有了。

數據持久化

技術分享

在生產者發送消息的一端加上以上參數。

 properties=pika.BasicProperties(
                          delivery_mode=2, 
                      )

技術分享

查看hello2的消息。

技術分享

python RabbitMQ消息隊列