1. 程式人生 > >python 64式: 第3式、rabbitmq訊息佇列使用

python 64式: 第3式、rabbitmq訊息佇列使用

topicProductor.py內容如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pika
import sys

'''
問題:
實現基於rabbitmq的生產者和消費者,消費者可以支援繫結路由鍵為notification.*,
則生產者如果繫結的路由鍵為notification.info,那麼生產者傳送的訊息hello,也可以
被消費者消費。

關鍵:
1 客戶端初始化rabbitmq過程:
1) 建立連線(指定host),獲取通道,通道的交換機宣告(指定交換機名稱和型別)
2) 設定路由鍵,訊息,通道的基本釋出(交換機,路由鍵,訊息體)
3) 關閉連線
總結: 
建立連線,獲取通道->交換機宣告->訊息釋出

輸入輸出:

服務端啟動:
python topicConsumer.py notifications.*
解釋:
上述輸入引數1:
notifications.*
表示rabbitmq中的topic

客戶端啟動:
python topicProductor.py notifications.info hello
解釋:
上述輸入引數1:
notifications.info
表示rabbitmq中的topic
上述輸入引數2:
hello
表示此次傳送訊息的內容

服務端輸出:
receive notifications.info: hello
'''



def startProduce(exchangeName, routingKey, exchangeType='topic', host='localhost', message=''):
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=host
    ))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchangeName, exchange_type=exchangeType)
    channel.basic_publish(exchange=exchangeName,
                          routing_key=routingKey,
                          body=message)
    print "send: %s: %s" % (routingKey, message)
    connection.close()

def process():
    if len(sys.argv) < 3:
        print "input parameter error, example usage: \n" \
              "python topicProductor.py notifications.info hello\n" \
              "the first parameter: notifications.info means the topic of rabbitmq\n" \
              "the second parameter: hello means the sended content of rabbitmq"
        return
    exchangeName = 'topicExchange'
    routingKey = sys.argv[1]
    message = sys.argv[2]
    startProduce(exchangeName, routingKey, message=message)

if __name__ == "__main__":
    process()

topicConsumer.py如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import pika
import sys

'''
關鍵:
1伺服器rabbitmq初始化過程:
1) 建立連線(指定host),獲取通道,通道的交換機宣告(指定交換機名稱和型別)
2) 獲取佇列名,設定繫結的key, 通道的佇列繫結(交換機,佇列名字,路由鍵)
3) 設定回掉函式(指定通道,方法,屬性,訊息體)
4) 通道基本消費(回掉函式,佇列名,不確認), 通道開始消費
總結:
建立連線,獲取通道->交換機宣告->繫結佇列到交換機->消費訊息
'''


def startConsume(exchangeName, routingKey, exchangeType='topic', host='localhost'):
    if not host or not exchangeName or not exchangeType or not routingKey:
        print "parameters empty, please check"
        return
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=host
    ))
    channel = connection.channel()
    channel.exchange_declare(exchange=exchangeName,
                             exchange_type=exchangeType)
    result = channel.queue_declare(exclusive=True)
    queueName = result.method.queue
    channel.queue_bind(exchange=exchangeName,
                       queue=queueName,
                       routing_key=routingKey)
    channel.basic_consume(callback,
                          queue=queueName,
                          no_ack=True)
    channel.start_consuming()


def callback(channel , method, properties , body):
    print "receive %s: %s" % (method.routing_key , body)


def process():
    if len(sys.argv) < 2:
        print "input parameter error, example usage: \n" \
              "python topicConsumer.py notifications.*\n" \
              "the first parameter: notifications.* means the topic of rabbitmq"
        return
    routingKey = sys.argv[1]
    exchangeName = 'topicExchange'
    startConsume(exchangeName, routingKey)

if __name__ == "__main__":
    process()

參考文章:

[1] http://www.rabbitmq.com/tutorials/tutorial-five-python.html