1. 程式人生 > >基於Python語言使用RabbitMQ消息隊列(一)

基於Python語言使用RabbitMQ消息隊列(一)

receiving block lose exit 想要 哪些 命名 bin 代碼

介紹

RabbitMQ 是一個消息中間人(broker): 它接收並且發送消息. 你可以把它想象成一個郵局: 當你把想要寄出的信放到郵筒裏時, 你可以確定郵遞員會把信件送到收信人那裏. 在這個比喻中, RabbitMQ 就是一個郵筒, 同時也是郵局和郵遞員 .
和郵局的主要不同點在於RabbitMQ不處理紙質信件, 而是 接收(accepts), 存儲(stores) 和轉發(forwards)二進制數據塊 —— 消息(messages).
在RabbitMQ中有一些自己的行業術語要了解 .
生產(producing)在這裏的意思就是發送(sending). 一個發送消息的程序就是生產者( producer) :

技術分享圖片
隊列(queue) 可以看做是郵筒的別名 ,它存在於RabbitMQ中. 雖然消息在RabbitMQ和你的應用程序中流轉, 但它只能被存儲在隊列當中. 一個隊列只受到主機的內存和磁盤的限制, 它實際上是個大的消息緩沖區. 許多生產者可以發送消息到一個隊列, 許多消費者可以從隊列中接收數據. 下面是隊列的示意圖:
技術分享圖片
消費(consuming) 與接收(receiving)有相似的含義. 消費者(consumer)就是等待接收消息的程序 :
技術分享圖片
要註意的是 生產者, 消費者, 和中間人不必在相同的主機上,實際上大多數情況下它們都不在同一臺主機上
(using the pika 0.10.0 Python client)

在教程的這部分裏我們用Python寫兩個小程序; 一個 發送消息的生產者 (sender), 一個接收消息並把它打印出來的消費者consumer (receiver)

在下面的圖例中, “P” 代表我們的生產者 , “C”代表我們的消費者. 中間的盒子是一個隊列—由RabbitMQ 維持的消息緩沖區.

我們的整體設計大致如下圖所示:
技術分享圖片
生產者發送消息到名為 “hello”的 隊列. 消費者從那個隊列中接收消息

RabbitMQ 庫

RabbitMQ遵循 AMQP 0.9.1, 這是一個開源的, 多用途(general-purpose)的消息發送協議.

針對RabbitMQ,在不同語言中有多種客戶端可用. 在本教程系列中我們將使用 Pika, 這是由RabbitMQ團隊推薦的 Python客戶端. 你可以使用pip安裝.

發送
技術分享圖片
我們的第一個程序send.py 將會發送一條消息到隊列中. 我們要做的第一件事是和 RabbitMQ 服務建立連接.

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(‘localhost‘))
channel = connection.channel()
  • 1
  • 2
  • 3
  • 4
  • 5

現在我們已經建立了一個到本地機器的中間人(broker)的連接, 如果想要連接到不同的機器上的中間人,只要把‘localhost’替換成指定的名字和IP地址即可.

下一步, 在發送前我們要確保接收的隊列存在. 如果我們發送消息到一個不存在的地址, RabbitMQ 會把消息丟棄掉. 我們創建一個名為‘hello’的隊列 ,把消息發送到這個隊列中:

channel.queue_declare(queue=‘hello‘)
  • 1

到這裏我們準備好要發送消息了,第一條消息只是一個簡單的字符串“hello world!”,把它發送到隊列中

In RabbitMQ 一條消息從不會被直接發送到隊列, 它會先經過一個交換所(exchange). 但我我們不要被細節纏住 ? 你會在教程的第三部分了解更多關於交換所的內容. 目前我們需要知道的就是如何使用有空字符串所指定的默認交換所。這個交換所允許我們準確指定消息應該前往哪個隊列。 隊列名由 “routing_key”參數指定:

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
  • 1
  • 2
  • 3
  • 4

退出程序前我們需要確保網絡緩沖區(network buffers)被沖刷(flushed),並且我們的消息真的被發送到了RabbitMQ. 這只需要通過關閉連接來完成:

connection.close()
  • 1

接收
技術分享圖片
我們的第二個程序 receive.py 將會從隊列接收消息並且打印出來。

同樣,我們首先要連接到RabbitMQ 服務。 連接到Rabbit的代碼同前面的一樣 。

下一步,同先前一樣,要確保隊列存在. 使用queue_declare 創建隊列是一個冪等(idempotent)操作 ? 我們想運行多少次這個命令都可以, 但只有一個隊列被創建.

channel.queue_declare(queue=‘hello‘)
  • 1

你可能會問為什麽又一次聲明隊列 ? 我們在前面的代碼中已經聲明過一次. 如果我們確定隊列存在的話的話可以避免那麽做. 例如 send.py 已經運行了. 但我們不確定哪個程序先運行. 在這種情況下最好在兩個程序中都聲明一下,這是一個好的習慣。

列出所有隊列

如果你想查看RabbitMQ 擁有哪些隊列,有多少消息在其中.你可以使用 rabbitmqctl 工具:

sudo rabbitmqctl list_queues
在 Windows中:

rabbitmqctl.bat list_queues

從隊列中接收消息會稍微復雜一些. 通過給隊列提供一個callback 函數來實現. 無論何時接收到消息, 這個callback 函數都會被 Pika 庫調用. 在我們這裏,這個函數會打印出接收到的消息.

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
  • 1
  • 2

下一步, 我們需要告訴 RabbitMQ 這個callback函數應該從我們的 “hello”隊列中接收消息:

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)
  • 1
  • 2
  • 3

這裏的“no_ack ”參數會在後面有介紹.

最後我們加一個等待接收數據並且在必要時運行回調函數的永遠不會終止的循環.

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
  • 1
  • 2

整合
send.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()


channel.queue_declare(queue=‘hello‘)

channel.basic_publish(exchange=‘‘,
                      routing_key=‘hello‘,
                      body=‘Hello World!‘)
print(" [x] Sent ‘Hello World!‘")
connection.close()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

receive.py的完整代碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=‘localhost‘))
channel = connection.channel()


channel.queue_declare(queue=‘hello‘)

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue=‘hello‘,
                      no_ack=True)

print(‘ [*] Waiting for messages. To exit press CTRL+C‘)
channel.start_consuming()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

現在在終端運行我們的程序. 首先,啟動一個消費者程序, 這會持續運行來等待接收消息:

python receive.py
  • 1

下面是在我的Ubuntu終端上的運行結果:
技術分享圖片
現在來啟動生產者. 生產者程序在運行完會退出:

python send.py
  • 1

技術分享圖片

在回頭看之前打開的消費者程序終端,已經接到了消息:
技術分享圖片

我們已經學會了如何向一個命名隊列中發送和接收消息. 下一節我們來構建一個簡單的工作隊列(work queue)

基於Python語言使用RabbitMQ消息隊列(一)