1. 程式人生 > >python之RabbitMQ

python之RabbitMQ

一個 基礎上 產生 不能 params info rap 發布 關閉

RabbitMQ是一個在AMQP基礎上完整的,可復用的企業消息系統。他遵循Mozilla Public License開源協議。
MQ全稱為Message Queue, 消息隊列(MQ)是一種應用程序對應用程序的通信方法。應用程序通過讀寫出入隊列的消息(針對應用程序的數據)來通信,而無需專用連接來鏈接它們。消 息傳遞指的是程序之間通過在消息中發送數據進行通信,而不是通過直接調用彼此來通信,直接調用通常是用於諸如遠程過程調用的技術。排隊指的是應用程序通過 隊列來通信。隊列的使用除去了接收和發送應用程序同時執行的要求。

RabbitMQ安裝

1.linux
安裝配置epel源
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release
-6-8.noarch.rpm 安裝erlang $ yum -y install erlang 安裝RabbitMQ $ yum -y install rabbitmq-server service rabbitmq-server start/stop
2.安裝python API
pip install pika
or
easy_install pika

先來一個基於Queue實現生產者消費者模型試試水

#!/usr/bin/env python3
#coding:utf8

import queue
import threading
message = queue.Queue(10)
def producer
(i):
‘‘‘廚師,生產包子放入隊列‘‘‘ while True: message.put(i) def consumer(i): ‘‘‘消費者,從隊列中取包子吃‘‘‘ while True: msg = message.get() for i in range(12): 廚師的線程包子 t = threading.Thread(target=producer, args=(i,)) t.start() for i in range(10): 消費者的線程吃包子 t = threading.Thread(target=consumer, args=(i,)) t.start()

開始rabbitMQ

對於RabbitMQ來說,生產和消費不再針對內存裏的一個Queue對象,而是某臺服務器上的RabbitMQ Server實現的消息隊列。

一、最基本的生產者消費者

1.生產者代碼

#!/usr/bin/env python
import pika
# ######################### 生產者 #########################
#鏈接rabbit服務器(localhost是本機,如果是其他服務器請修改為ip地址)
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建一個隊列名叫hello
channel.queue_declare(queue=‘hello‘)
#exchange -- 它使我們能夠確切地指定消息應該到哪個隊列去。
#向隊列插入數值 routing_key是隊列名 body是要插入的內容

channel.basic_publish(exchange=‘‘,
                  routing_key=‘hello‘,
                  body=‘Hello World!‘)
print("開始隊列")
#緩沖區已經flush而且消息已經確認發送到了RabbitMQ中,關閉鏈接
connection.close()

2.消費者代碼

#!/usr/bin/env python
import pika
# ########################## 消費者 ##########################
#鏈接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#如果生產者沒有運行創建隊列,那麽消費者也許就找不到隊列了。為了避免這個問題
#所有消費者也創建這個隊列
channel.queue_declare(queue=‘hello‘)
#接收消息需要使用callback這個函數來接收,他會被pika庫來調用
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
#從隊列取數據 callback是回調函數 如果拿到數據 那麽將執行callback函數
channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)
print(‘ [*] 等待信息. To exit press CTRL+C‘)
#永遠循環等待數據處理和callback處理的數據
channel.start_consuming()
二、acknowledgment 消息不丟失的方法

no-ack = False,如果生產者遇到情況(關閉通道,連接關閉或TCP連接丟失))掛掉了,那麽,RabbitMQ會重新將該任務添加到隊列中。
1.生產者不變,但是還是復制上來吧

#!/usr/bin/env python
import pika
# ######################### 生產者 #########################
#鏈接rabbit服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建一個隊列名叫hello
channel.queue_declare(queue=‘hello‘)
#向隊列插入數值 routing_key是隊列名 body是要插入的內容
channel.basic_publish(exchange=‘‘,
                  routing_key=‘hello‘,
                  body=‘Hello World!‘)
print("開始隊列")
connection.close()

2.消費者

import pika
#鏈接rabbit
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#如果生產者沒有運行創建隊列,那麽消費者創建隊列
channel.queue_declare(queue=‘hello‘)
def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag) #主要使用此代碼
    
channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()

當生產者生成一條數據,被消費者接收,消費者中斷後如果不超過10秒,連接的時候數據還在。當超過10秒之後,重新鏈接,數據將消失。消費者等待鏈接。

三、durable 消息不丟失 (消息持久化)

這個 queue_declare 需要在 生產者(producer) 和消費方(consumer) 代碼中都進行設置。
1.生產者

#!/usr/bin/env python
import pika
#鏈接rabbit服務器
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建隊列,使用durable方法
channel.queue_declare(queue=‘hello‘, durable=True)
                    #如果想讓隊列實現持久化那麽加上durable=True
channel.basic_publish(exchange=‘‘,
                  routing_key=‘hello‘,
                  body=‘Hello World!‘,
                  properties=pika.BasicProperties(
                      delivery_mode=2, 
                  #標記我們的消息為持久化的 - 通過設置 delivery_mode 屬性為 2
                  #這樣必須設置,讓消息實現持久化
                  ))
#這個exchange參數就是這個exchange的名字. 空字符串標識默認的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的隊列中。
print(" [x] 開始隊列‘")
connection.close()

2.消費者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
#創建頻道
channel = connection.channel()
#創建隊列,使用durable方法
channel.queue_declare(queue=‘hello‘, durable=True)


def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag)

    channel.basic_consume(callback,
                    queue=‘hello‘,
                    no_ack=False)

    print(‘ [*] 等待隊列. To exit press CTRL+C‘)
    channel.start_consuming()

註:標記消息為持久化的並不能完全保證消息不會丟失,盡管告訴RabbitMQ保存消息到磁盤,當RabbitMQ接收到消息還沒有保存的時候仍然有一個短暫的時間窗口. RabbitMQ不會對每個消息都執行同步fsync(2) --- 可能只是保存到緩存cache還沒有寫入到磁盤中,這個持久化保證不是很強,但這比我們簡單的任務queue要好很多,如果你想很強的保證你可以使用 publisher confirms

四、消息獲取順序

默認消息隊列裏的數據是按照順序被消費者拿走,例如:消費者1 去隊列中獲取 奇數 序列的任務,消費者1去隊列中獲取 偶數 序列的任務。
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列

1.生產者

import pika  
import sys  

connection = pika.BlockingConnection(pika.ConnectionParameters(  
    host=‘localhost‘))  
channel = connection.channel()  
# 設置隊列為持久化的隊列  
channel.queue_declare(queue=‘task_queue‘, durable=True)
message = ‘ ‘.join(sys.argv[1:]) or "Hello World!"  
channel.basic_publish(exchange=‘‘,  
                  routing_key=‘task_queue‘,  
                  body=message,  
                  properties=pika.BasicProperties(  
                     delivery_mode = 2, # 設置消息為持久化的  
                  ))  
print(" [x] Sent %r" % message)  
connection.close()  

2.消費者

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
channel = connection.channel()
channel.queue_declare(queue=‘hello‘durable=True)  # 設置隊列持久化 

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    import time
    time.sleep(10)
    print ‘ok‘
    ch.basic_ack(delivery_tag = method.delivery_tag)
#表示誰來誰取,不再按照奇偶數排列
channel.basic_qos(prefetch_count=1)# 消息未處理完前不要發送信息的消息  

channel.basic_consume(callback,
                  queue=‘hello‘,
                  no_ack=False)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()

交換 (Exchanges)

exchange類型可用: direct , topic , headers 和 fanout 。 我們將要對最後一種進行講解 --- fanout

一、消息發布訂閱

發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。

exchange type = fanout

1.發布者

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                     type=‘fanout‘)

message = ‘ ‘.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange=‘logs‘,
                  routing_key=‘‘,
                  body=message)
print(" [x] Sent %r" % message)
connection.close()

2.訂閱者

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘logs‘,
                     type=‘fanout‘)

result = channel.queue_declare(exclusive=True) #隊列斷開後自動刪除臨時隊列  
queue_name = result.method.queue            # 隊列名采用服務端分配的臨時隊列  

channel.queue_bind(exchange=‘logs‘,
               queue=queue_name)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                  queue=queue_name,
                  no_ack=True)

channel.start_consuming()

六、關鍵字發送

exchange type = direct

之前事例,發送消息時明確指定某個隊列並向其中發送消息,RabbitMQ還支持根據關鍵字發送,即:隊列綁定關鍵字,發送者將數據根據關鍵字發送到消息exchange,exchange根據 關鍵字 判定應該將數據發送至指定隊列。

1.生產者:

#!/usr/bin/env python3
#coding:utf8
#######################生產者#################
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))  
channel = connection.channel()

channel.exchange_declare(exchange=‘direct_logs‘,
                     type=‘direct‘)

severity = sys.argv[1] if len(sys.argv) > 1 else ‘info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘direct_logs‘,
                      routing_key=severity,
                      body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()

2.消費者:

#!/usr/bin/env python3
#coding:utf8
import pika
import sys
############消費者####
connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()
channel.exchange_declare(exchange=‘direct_logs‘,
                         type=‘direct‘)
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
severities = sys.argv[1:]
if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])
    sys.exit(1)
for severity in severities:
    channel.queue_bind(exchange=‘direct_logs‘,
                       queue=queue_name,
                       routing_key=severity)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)
channel.start_consuming()
七、模糊匹配

exchange type = topic

在topic類型下,可以讓隊列綁定幾個模糊的關鍵字,之後發送者將數據發送到exchange,exchange將傳入”路由值“和 ”關鍵字“進行匹配,匹配成功,則將數據發送到指定隊列。

# 表示可以匹配 0 個 或 多個 單詞
  • 表示只能匹配 一個 單詞

    發送者路由值 隊列中

    old.boy.python old.* -- 不匹配

    old.boy.python old.# -- 匹配

1.消費者

#!/usr/bin/env python3
#coding:utf8
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                         type=‘topic‘)

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

binding_keys = sys.argv[1:]
if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
    sys.exit(1)

for binding_key in binding_keys:
    channel.queue_bind(exchange=‘topic_logs‘,
                       queue=queue_name,
                       routing_key=binding_key)

print(‘ [*] Waiting for logs. To exit press CTRL+C‘)

def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

2.生產者

#!/usr/bin/env python3
#coding:utf8
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()

channel.exchange_declare(exchange=‘topic_logs‘,
                        type=‘topic‘)

routing_key = sys.argv[1] if len(sys.argv) > 1 else ‘anonymous.info‘
message = ‘ ‘.join(sys.argv[2:]) or ‘Hello World!‘
channel.basic_publish(exchange=‘topic_logs‘,
                      routing_key=routing_key,
                      body=message) 
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()

更多內容:以下參考:
http://blog.csdn.net/songfreeman/article/details/50945025

work queue (用來在多個workers之間分發消息)

1.循環調度(Round-robin dispatching)

使用多個消費者來接收並處理消息
默認,RabbitMQ將循環的發送每個消息到下一個Consumer , 平均每個Consumer都會收到同樣數量的消息。 這種分發消息的方式成為 循環調度(round-robin)

  • 生產者:

    #!/usr/bin/env python3
    #coding:utf8
    import pika
    import sys
    #鏈接
    connec = pika.BlockingConnection(pika.ConnectionParameters(host=‘localhost‘))
    channel = connec.channel()
    #創建隊列
    channel.queue_declare(queue=‘worker‘)
    #插入數據
    message = ‘ ‘.join(sys.argv[1:]) or "Hello World"
    channel.basic_publish(exchange=‘‘,
                          routing_key=‘worker‘,
                          body=message,
                          properties=pika.BasicProperties(delivery_mode = 2,)
                          )
    print(" [x] Send %r " % message)
  • 消費者:

    #!/usr/bin/env python3
    #coding:utf8
    import time
    import pika
    
    connect = pika.BlockingConnection(pika.ConnectionParameters (host=‘localhost‘))
    channel = connect.channel()
    
    channel.queue_declare(‘worker‘)
    
    def callback(ch, method, properties,body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b‘.‘))
        print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                      queue=‘worker‘,
                      )
    channel.start_consuming()

執行的時候兩個消費者等待接收消息,
第一次生產者產生消息的時候被消費者1接收
第二次生產者產生消息的時候被消費者2接收

python之RabbitMQ