廣播模式下的生產者與消費者fanout模式
阿新 • • 發佈:2018-03-27
生成 ack word 需要 bin 隊列 highlight time host
生產者
#coding=utf-8 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘)) #建立socket連接可以加各種參數,端口,用戶名,等 channel = connection.channel() #聲明一個管道 channel.exchange_declare(exchange=‘logs‘,exchange_type=‘fanout‘) #聲明廣播 channel.basic_publish(exchange=‘logs‘,routing_key=‘‘, body = ‘hellow,word‘,#隊列的名字、消息內容 ) print ‘send hellow,word‘ connection.close()
消費者
#coding=utf-8 import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters(host = ‘localhost‘)) channel = connection.channel() channel.exchange_declare(exchange=‘logs‘
exchange_type=‘fanout‘) #直接加位置參數會出錯,必須指定參數 result = channel.queue_declare(exclusive=True)#生成一個隨機隊列,用完之後刪除 queue_name = result.method.queue channel.queue_bind(exchange=‘logs‘,queue=queue_name) def callback(ch,method,properties,body): #回調函數 print ‘收到消息:‘,body time.sleep(30) print ‘消息處理完畢:‘, body channel.basic_consume(callback,queue=queue_name #no_ack=True 需要客戶端確認,如果正在處理消息的時候客戶端掛掉就會轉到下一個客戶端,會等待消息完整的處理完 )#如果收到消息就調用callback來處理消息 print ‘等待接收消息。。。。。‘ channel.start_consuming() #循環持續運行下去 #消息持久化
廣播模式下的生產者與消費者fanout模式