1. 程式人生 > >基於Python語言使用RabbitMQ訊息佇列(三)

基於Python語言使用RabbitMQ訊息佇列(三)

釋出/訂閱

前面的教程中我們已經建立了一個工作佇列。在一個工作佇列背後的假設是每個任務恰好會傳遞給一個工人。在這一部分裡我們會做一些完全不同的東西——我們會發送訊息給多個消費者。這就是所謂的“釋出/訂閱”模式。

為了解釋這種模式,我們將會構建一個簡單的日誌系統。它包含兩個程式——第一個產生日誌訊息,第二個接收並把他們打印出來。

在我們的日誌系統中,每一個接收程式的正在執行的拷貝都會獲知訊息,那樣我們將能夠執行一個接收者把日誌指向磁碟;同時我們將能夠執行另一個接收者在螢幕上檢視日誌。

實質上,被髮布的訊息將會廣播給所有接收。

交易所

在前面的教程中我們向一個佇列中傳送和接收訊息. 現在來介紹Rabbit中的完整的訊息模型 .

我們快速回顧一下前面的教程:

  • 生產者是一個傳送訊息的使用者應用。
  • 佇列是一個儲存訊息的緩衝區。
  • 消費者是一個接收訊息的使用者應用。

RabbitMQ中訊息模型的核心思想是,生產者從不直接傳送任何訊息給一個佇列。實際上,生產者通常甚至一點都不知道一條訊息會被髮送給什麼佇列。
相反,生產者只能傳送訊息給一個交易所(exchange)。交易所是一個很簡單的東西。它一面接收來自生產者的訊息,一面把訊息推送給佇列。交易所必須準確知道如何對待它接收到的訊息。它該被追加到一個特定佇列中嗎?還是應該把它追加到多個佇列?又或者它該被忽略?這些規則都是由交易型別(exchange type)所定義的。
這裡寫圖片描述


有幾種可用的交易型別: direct, topic, headers 和 fanout. 我們會關注最後一個——fanout. 我們來建立一個這種型別的交易所,命名為 logs:

channel.exchange_declare(exchange='logs',
                         type='fanout')

fanout型別交易所非常簡單。從名字中你可能已經猜到(其實英語非母語的也不大好猜到,fan(風扇)+out(外、出)的組合,我覺得在這裡可以理解為扇出去、揚出去的意思),它只是廣播它接到的所有訊息給它知道的所有佇列 ,這恰恰是我們的日誌程式所需要的。

列出所有交際所

為了列出所有交易所,你可以執行Trabbitmqctl:

sudo rabbitmqctl list_exchanges 在這個列表中會有些像amq.* 的交易所和預設的(未命名的)交易所
這些是被預設建立的,但你目前不太可能需要用到它們。

預設交易所

在之前的教程中我們隊交易所一無所知。但仍然能夠給佇列傳送訊息。這是因為我們使用了預設交易所,我們通過空字串 (“”)來標識。

回顧一下我們之前如何釋出一條訊息:

channel.basic_publish(exchange='',
routing_key='hello',
body=message)

exchange 引數就是交易所的名字。空字串代表預設或者無名交易所:
訊息路由到名字被routing_key所指定的佇列,如果佇列存在的話。

現在我們可以把訊息釋出到命名交易所了:

channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)

臨時佇列

你可能記得我們先前使用了有特定名字的佇列 (記得 hello 和 task_queue?)。能給佇列命名對我們來說很關鍵——我們需要把工人們指向同一個佇列。當你想要在生產者和消費者中分享佇列時,給佇列一個名字就很重要。

但那不符合我們的日誌程式需要。我們想獲取所有日誌訊息,不只是他們的一個子集。我們也僅對當前活躍的訊息感興趣,而不是舊有的,解決這個問題我們需要做兩件事。

第一,無論何時連線到Rabbit我們都需要一個新鮮的(fresh)空的(empty)佇列 ,為實現這一點我們使用隨機的名字建立一個佇列,或者更好的——讓伺服器隨機選一個佇列名給我們。 我們不給queue_declare提供佇列引數就可以做到:

result = channel.queue_declare()

這樣的話, result.method.queue 就會包含一個任意的佇列名,例如它可能看上去是這樣的 : amq.gen-JzTY20BRgKO-HjmUJj0wLg.

第二, 一旦我們斷開消費者連結,佇列就該被刪除掉。通過一個exclusive 標誌實現:

result = channel.queue_declare(exclusive=True)

繫結

這裡寫圖片描述
我們已經建立了一個fanout 交易所和一個佇列。現在我們要通知交易所傳送訊息給我們的佇列。交易所和一個佇列之間的關係叫做繫結。

channel.queue_bind(exchange='logs',
                   queue=result.method.queue)

從現在開始logs交易所會追加訊息給我們的佇列。

列出所有繫結

你可以列出所有存在的繫結,使用…好吧,你已經猜到了:
rabbitmqctl list_bindings

整合

這裡寫圖片描述
用來發送日誌訊息的生產者程式,看起來和之前並沒有多大不同。最大的變化是我們現在想要傳送訊息給logs交易所而不是無名交易所。當傳送時我們需要提供一個routing_key,但對於fanout型別交易所來說它的值是被忽略的。 下面是emit_log.py 指令碼程式碼:

#!/usr/bin/env python
import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs',
                      routing_key='',
                      body=message)
print(" [x] Sent %r" % message)
connection.close()

如你所見,建立完連線後我們生命了交易所。這一步是必須的,因為釋出到一個不存在的交易所是被禁止的。

如果沒有佇列連線(bound)到交易所,訊息就會丟失,但對我們來說沒有關係;如果沒有消費者監聽我們可以安全地忽略掉這個訊息。

receive_logs.py程式碼:

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='logs',
                         type='fanout')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

channel.queue_bind(exchange='logs',
                   queue=queue_name)

print(' [*] Waiting for logs. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] %r" % body)

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()

大功告成!如果你想把日誌儲存到一個檔案中只需要開啟控制檯,鍵入:

python receive_logs.py > logs_from_rabbit.log

如果你想在螢幕檢視日誌, 開啟一個新的控制檯執行:

python receive_logs.py

下兩圖分別為在我的Ubuntu終端使用cat命令輸出的存入到日誌檔案的日誌和螢幕顯示的日誌
圖1
1
圖2
這裡寫圖片描述
當然需要執行emit_log.py
這裡寫圖片描述
使用 rabbitmqctl list_bindings 你可以確認程式碼建立了繫結和佇列,就像我們想要的那樣。在兩個執行的情況下你應該會看到如下內容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

結果的解釋很直白: 來自logs交易所的資料去往兩個伺服器指定了名字的佇列 ,這正是我們所預想的。
若想知道如何監聽訊息子集,請前往下一節。