1. 程式人生 > >RabbitMQ教程之php-amqplib(六)主題

RabbitMQ教程之php-amqplib(六)主題

主題 (topics)

using php-amqplib

在上一節教程中,我們改進了我們的日誌記錄系統。我們使用可以選擇性接收資訊的 direct 型別交換機,而不是使用只能進行虛擬廣播的 fanout 型別交換機。

雖然使用 direct 型別交換改進了我們的系統,但它仍有限制 - 它不能基於對多個標準進行路由選擇。

在我們的日誌系統中,我們可能不僅要根據日誌的嚴重性訂閱日誌,還可以基於發出日誌的源進行訂閱。你也許在 unix syslog 中瞭解了這個概念(不理解也沒關係,接著往下看,自然就明白了)。

這將給我們的系統帶來很大的靈活性 - 我們可能要監聽來自 cron的嚴重錯誤,也要監聽 kern

的所有日誌。

為了在我們的系統中實現這樣的功能,我們需要學習一個更復雜的 topic exchange

Topic exchange

傳送到 topic exchange 的訊息不能任意命名一個 routing key - 它必須是由一個.劃分單詞列表。這些單詞可以是任意的,但它們通常指定與訊息相關聯的一些功能。這裡有幾個有效的 routing key: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit" , routing key 中可以包含多個單詞,最多可以達到255個位元組。

binding key也必須是相同的形式。topic exchange

背後的邏輯類似於 direct exchange - 使用特定routing key1 傳送的訊息將被傳遞到與binding key繫結的所有佇列。但是,binding key 有兩個重要的特殊情況:

  1. *(星號) 可以代替一個單詞
  2. #(雜湊) 可以匹配0個或多個單詞

在下面的例子中簡單解釋一下(類似於正則):

python-five

在本例中,我們將傳送所有描述動物的訊息。訊息將使用由三個單詞(兩個點)組成的routing key傳送。其中第一個單詞描述 速度,第二個描述顏色,第三個描述種類:”..”。

我們接著建立三個繫結:Q1 binding key “*.orange.*”, Q2 binding key “*.*.rabbit” “lazy.#”。

這三個繫結可以解釋為:

Q1 對所有橙色的動物感興趣。
Q2 想要獲取有關兔子的一切訊息,以及所有惰性動物的一切。

一條 routing key 為 “quick.orange.rabbit” 的訊息將傳遞上面的到兩個對列。routing key 為 “lazy.orange.elephant” 的訊息也將傳遞上面的到兩個對列。另外 “lazy.pink.rabbit” 訊息將只會被傳遞到Q2一次, 即使它匹配了兩個 binding key。”quick.brown.fox” 不匹配任何 binding key, 所以它將被丟棄。

如果我們不遵守以上的規則傳送 routing key 為一個或者四個單詞的訊息會發生什麼? 比如,”orange” 或者 “quick.orange.male.rabbit”。那麼我們將丟失這些訊息,因為它們不匹配任何 binding key

另一方面,”lazy.orange.male.rabbit” 即使它有四個單詞,但它能匹配最後一個繫結,並且將被傳遞到Q2中。

注意事項
topic exchange 很強大,並且表現得和其他型別交換一樣
當佇列與 “#” 繫結時,它將接收所有訊息而不管routing key的值,如同 fanout exchange
當不使用 “*” 和 “#” 特殊字元時,topic exchange 如同 direct exchange

整合程式碼

我們將在日誌記錄系統中使用主題交換。我們將假設 routing key中有兩個單詞開始工作,比如:”.”。

程式碼幾乎和上一節一樣。

emit_log_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

$routing_key = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'anonymous.info';
$data = implode(' ', array_slice($argv, 2));
if(empty($data)) $data = "Hello World!";

$msg = new AMQPMessage($data);

$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent ",$routing_key,':',$data," \n";

$channel->close();
$connection->close();

?>

receive_logs_topic.php:

<?php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->exchange_declare('topic_logs', 'topic', false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$binding_keys = array_slice($argv, 1);
if( empty($binding_keys )) {
    file_put_contents('php://stderr', "Usage: $argv[0] [binding_key]\n");
    exit(1);
}

foreach($binding_keys as $binding_key) {
    $channel->queue_bind($queue_name, 'topic_logs', $binding_key);
}

echo ' [*] Waiting for logs. To exit press CTRL+C', "\n";

$callback = function($msg){
  echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

?>

收聽所有日誌:

php receive_logs_topic.php "#"

收聽所有來自 kern 的日誌:

php receive_logs_topic.php "kern.*"

只收聽 “critical” 型別的日誌:

php receive_logs_topic.php "*.critical"

你也可以建立多個繫結:

php receive_logs_topic.php "kern.*" "*.critical"

發出 routing key為 “kern.critical”型別的日誌

php emit_log_topic.php "kern.critical" "A critical kernel error"

至此你可以盡情鼓弄這些程式。需要注意的是,這些程式碼沒有對 binding keyrouting key 賦預設值,除此之外你可能需要路由不止兩個單詞。

接下來,我們需要弄明白怎樣做一個往返資訊的遠端程式:遠端呼叫