python使用訊息佇列RabbitMq(進階)
阿新 • • 發佈:2019-01-06
import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() #宣告queue channel.queue_declare(queue='hello') # RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.傳送channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
__author__ = 'hardy' import pika connection = pika.BlockingConnection(pika.ConnectionParameters(接收'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good# practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
訊息佇列的傳送端流程
1、連線
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) channel = connection.channel()
2、宣告queue
channel.queue_declare(queue='hello')
佇列持久化
channel.queue_declare(queue='hello', durable=True)
3、傳送訊息
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
訊息持久化(必須佇列持久化)
channel.basic_publish(exchange='', routing_key="hello", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4、關閉
connection.close()
訊息佇列接收端流程
1、連線
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
2、宣告queue
channel.queue_declare(queue='hello')
3、建立回撥函式(處理資料)
def callback(ch, method, properties, body): print(" [x] Received %r" % body)
4、設定
channel.basic_consume(callback, queue='hello', no_ack=True)
5、開始接收資料
channel.start_consuming()
6、確認訊息被消費
def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(callback, queue='task_queue', no_ack=True #no_ack=True訊息不需要確認,預設no_ack=false,訊息需要確認 )