python 64式: 第3式、rabbitmq訊息佇列使用
阿新 • • 發佈:2018-11-08
topicProductor.py內容如下
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys ''' 問題: 實現基於rabbitmq的生產者和消費者,消費者可以支援繫結路由鍵為notification.*, 則生產者如果繫結的路由鍵為notification.info,那麼生產者傳送的訊息hello,也可以 被消費者消費。 關鍵: 1 客戶端初始化rabbitmq過程: 1) 建立連線(指定host),獲取通道,通道的交換機宣告(指定交換機名稱和型別) 2) 設定路由鍵,訊息,通道的基本釋出(交換機,路由鍵,訊息體) 3) 關閉連線 總結: 建立連線,獲取通道->交換機宣告->訊息釋出 輸入輸出: 服務端啟動: python topicConsumer.py notifications.* 解釋: 上述輸入引數1: notifications.* 表示rabbitmq中的topic 客戶端啟動: python topicProductor.py notifications.info hello 解釋: 上述輸入引數1: notifications.info 表示rabbitmq中的topic 上述輸入引數2: hello 表示此次傳送訊息的內容 服務端輸出: receive notifications.info: hello ''' def startProduce(exchangeName, routingKey, exchangeType='topic', host='localhost', message=''): connection = pika.BlockingConnection(pika.ConnectionParameters( host=host )) channel = connection.channel() channel.exchange_declare(exchange=exchangeName, exchange_type=exchangeType) channel.basic_publish(exchange=exchangeName, routing_key=routingKey, body=message) print "send: %s: %s" % (routingKey, message) connection.close() def process(): if len(sys.argv) < 3: print "input parameter error, example usage: \n" \ "python topicProductor.py notifications.info hello\n" \ "the first parameter: notifications.info means the topic of rabbitmq\n" \ "the second parameter: hello means the sended content of rabbitmq" return exchangeName = 'topicExchange' routingKey = sys.argv[1] message = sys.argv[2] startProduce(exchangeName, routingKey, message=message) if __name__ == "__main__": process()
topicConsumer.py如下
#!/usr/bin/env python # -*- coding: utf-8 -*- import pika import sys ''' 關鍵: 1伺服器rabbitmq初始化過程: 1) 建立連線(指定host),獲取通道,通道的交換機宣告(指定交換機名稱和型別) 2) 獲取佇列名,設定繫結的key, 通道的佇列繫結(交換機,佇列名字,路由鍵) 3) 設定回掉函式(指定通道,方法,屬性,訊息體) 4) 通道基本消費(回掉函式,佇列名,不確認), 通道開始消費 總結: 建立連線,獲取通道->交換機宣告->繫結佇列到交換機->消費訊息 ''' def startConsume(exchangeName, routingKey, exchangeType='topic', host='localhost'): if not host or not exchangeName or not exchangeType or not routingKey: print "parameters empty, please check" return connection = pika.BlockingConnection(pika.ConnectionParameters( host=host )) channel = connection.channel() channel.exchange_declare(exchange=exchangeName, exchange_type=exchangeType) result = channel.queue_declare(exclusive=True) queueName = result.method.queue channel.queue_bind(exchange=exchangeName, queue=queueName, routing_key=routingKey) channel.basic_consume(callback, queue=queueName, no_ack=True) channel.start_consuming() def callback(channel , method, properties , body): print "receive %s: %s" % (method.routing_key , body) def process(): if len(sys.argv) < 2: print "input parameter error, example usage: \n" \ "python topicConsumer.py notifications.*\n" \ "the first parameter: notifications.* means the topic of rabbitmq" return routingKey = sys.argv[1] exchangeName = 'topicExchange' startConsume(exchangeName, routingKey) if __name__ == "__main__": process()
參考文章:
[1] http://www.rabbitmq.com/tutorials/tutorial-five-python.html