1. 程式人生 > >python採用pika庫使用rabbitmq(七)Publish\Subscribe(訊息釋出\訂閱)

python採用pika庫使用rabbitmq(七)Publish\Subscribe(訊息釋出\訂閱)

之前的例子都基本都是1對1的訊息傳送和接收,即訊息只能傳送到指定的queue裡,但有些時候你想讓你的訊息被所有的Queue收到,類似廣播的效果,這時候就要用到exchange了,

Exchange在定義的時候是有型別的,以決定到底是哪些Queue符合條件,可以接收訊息


fanout: 所有bind到此exchange的queue都可以接收訊息
direct: 通過routingKey和exchange決定的那個唯一的queue可以接收訊息
topic:所有符合routingKey(此時可以是一個表示式)的routingKey所bind的queue可以接收訊息

   表示式符號說明:#代表一個或多個字元,*代表任何字元
      例:#.a會匹配a.a,aa.a,aaa.a等
          *.a會匹配a.a,b.a,c.a等
     注:使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout 

headers: 通過headers 來決定把訊息發給哪些queue

 1 import pika
 2 import sys
 3  
 4 connection = pika.BlockingConnection(pika.ConnectionParameters(
 5         host='localhost'))
 6 channel = connection.channel()
 7  
 8 channel.exchange_declare(exchange='logs',
 9                          type='
fanout') 10 11 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 12 channel.basic_publish(exchange='logs', 13 routing_key='', 14 body=message) 15 print(" [x] Sent %r" % message) 16 connection.close()
publisher.py
 1 import pika
 2
3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='localhost')) 5 channel = connection.channel() 6 7 channel.exchange_declare(exchange='logs', 8 type='fanout') 9 10 result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit會隨機分配一個名字,exclusive=True會在使用此queue的消費者斷開後,自動將queue刪除 11 queue_name = result.method.queue 12 13 channel.queue_bind(exchange='logs', 14 queue=queue_name) 15 16 print(' [*] Waiting for logs. To exit press CTRL+C') 17 18 def callback(ch, method, properties, body): 19 print(" [x] %r" % body) 20 21 channel.basic_consume(callback, 22 queue=queue_name, 23 no_ack=True) 24 25 channel.start_consuming()
subscriber