1. 程式人生 > >Python 訊息佇列rabbitmq使用之 更加細緻的 有選擇的 釋出訊息/接收訊息

Python 訊息佇列rabbitmq使用之 更加細緻的 有選擇的 釋出訊息/接收訊息

1、釋出端程式碼

# new_topic_p.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# 指定使用頭交換機
channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

# 獲取執行指令碼時的輸入 
routing_key = sys.argv[1] if len(sys.argv) > 2
else 'anonymous.info' # 獲取要傳入型別的訊息 message = ' '.join(sys.argv[2:]) or 'Hello World!' # 釋出訊息 channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()

2、消費端程式碼

# new_topic_c.py
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

# 獲取要繫結的key
binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) # 迴圈繫結key,可以繫結多個 for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()

3、如何執行 消費端: 執行下邊命令 接收所有日誌:

python new_topic_c.py "#"

執行下邊命令 接收來自”kern“裝置的日誌:

python new_topic_c.py "kern.*"

執行下邊命令 只接收嚴重程度為”critical“的日誌:

python new_topic_c.py "*.critical"

執行下邊命令 建立多個繫結:

python new_topic_c.py "kern.*" "*.critical"

釋出端:

python new_topic_p.py "kern.critical" "A critical kernel error"