1. 程式人生 > >python使用訊息佇列RabbitMq(進階)

python使用訊息佇列RabbitMq(進階)

import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
  
#宣告queue
channel.queue_declare(queue='hello')
  
# RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") connection.close()
傳送
__author__ = 'hardy'
import pika
  
connection = pika.BlockingConnection(pika.ConnectionParameters(
               
'localhost')) channel = connection.channel() #You may ask why we declare the queue again ‒ we have already declared it in our previous code. # We could avoid that if we were sure that the queue already exists. For example if send.py program #was run before. But we're not yet sure which program to run first. In such cases it's a good
# practice to repeat declaring the queue in both programs. channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
接收

訊息佇列的傳送端流程

  1、連線

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

  2、宣告queue

channel.queue_declare(queue='hello')

  佇列持久化

channel.queue_declare(queue='hello', durable=True)

  

  3、傳送訊息

channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')

  訊息持久化(必須佇列持久化)

channel.basic_publish(exchange='',
                      routing_key="hello",
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))

 

  4、關閉

connection.close()

訊息佇列接收端流程

  1、連線

connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()

  2、宣告queue

channel.queue_declare(queue='hello')

  3、建立回撥函式(處理資料)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

  4、設定

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)

  5、開始接收資料

channel.start_consuming()

  6、確認訊息被消費

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

  

channel.basic_consume(callback,
                       queue='task_queue',
                       no_ack=True  #no_ack=True訊息不需要確認,預設no_ack=false,訊息需要確認
                       )