1. 程式人生 > >(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息

(八)RabbitMQ訊息佇列-通過Topic主題模式分發訊息

前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配訊息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精準匹配的,而topic是通過routingkey來模糊匹配。
在topic模式下支援兩個特殊字元的匹配。

* (星號) 代表任意 一個單詞
# (井號) 0個或者多個單詞

注意:上面說的是單詞不是字元。

如下圖所示,RabbitMQ direct模式通過RoutingKey來精準匹配,RoutingKey為red的投遞到Queue1,RoutingKey為black和white的投遞到Queue2。
RabbitMQ direct模式

我們可以假設一個場景,我們要做一個日誌模組來收集處理不同的日誌,日誌區分包含三個維度的標準:模組、日誌緊急程度、日誌重要程度。模組分為:red、black、white;緊急程度分為:critical、normal;把重要程度分為:medium、low、high在RoutingKey欄位中我們把這三個維度通過兩個“.“連線起來。
現在我們需要對black模組,緊急程度為critical,重要程度為high的日誌分配到佇列1列印到螢幕;對所以模組重要程度為high的日誌和white緊急程度為critical的日誌傳送到佇列2持久化到硬碟。如下示例:

RabbitMQ topic模式

  • RoutingKey為“black.critical.high”的日誌會投遞到queue1和queue2,。

  • RoutingKey為“red.critical.high”的日誌會只投遞到queue2。

  • RoutingKey為“white.critical.high”的日誌會投遞到queue2,並且雖然queue2的兩個匹配規則都符合但只會向queue2投遞一份。

新建topic.php用來發布三種routingkey的訊息。


<?php

/*
 * topic 模式
 * create by superrd
 */

$exchangeName = 'extopic';
$routeKey1 = "black.critical.high";
$routeKey2 = "red.critical.high"
; $routeKey3 = "white.critical.high"; $message1 = 'black-critical-high!'; $message2 = 'red-critical-high!'; $message3 = 'white-critical-high!'; $connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd')); $connection->connect() or die("Cannot connect to the broker!\n"); try { $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_TOPIC); $exchange->setFlags(AMQP_DURABLE); $exchange->declareExchange(); $exchange->publish($message1,$routeKey1); var_dump("[x] Sent ".$message1); $exchange->publish($message2,$routeKey2); var_dump("[x] Sent ".$message2); $exchange->publish($message3,$routeKey3); var_dump("[x] Sent ".$message3); } catch (AMQPConnectionException $e) { var_dump($e); exit(); } $connection->disconnect();

q1.php用來監聽queue1佇列:


<?php

/*
 * topic 模式
 * create by superrd
 */

$queueName = 'queue1';
$exchangeName = 'extopic';
$routeKey = "black.critical.high";

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

$queue->bind($exchangeName, $routeKey);

//阻塞模式接收訊息

echo "Message:\n";
while(True){
        $queue->consume('processMessage');
//自動ACK應答
        //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}

q2.php用來監聽queue2佇列:

<?php

/*
 * topic 模式
 * create by superrd
 */

$queueName = 'queue2';
$exchangeName = 'extopic';
$routeKey1 = "#.high";
$routeKey2 = "white.critical.*";

$connection = new AMQPConnection(array('host' => '10.99.121.137', 'port' => '5672', 'vhost' => '/', 'login' => 'superrd', 'password' => 'superrd'));
$connection->connect() or die("Cannot connect to the broker!\n");

$channel = new AMQPChannel($connection);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_TOPIC);
$exchange->setFlags(AMQP_DURABLE);
$exchange->declareExchange();

$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$queue->setFlags(AMQP_DURABLE);
$queue->declareQueue();

$queue->bind($exchangeName, $routeKey1);
$queue->bind($exchangeName, $routeKey2);


//阻塞模式接收訊息

echo "Message:\n";
while(True){
        $queue->consume('processMessage');
//自動ACK應答
        //$queue->consume('processMessage', AMQP_AUTOACK);
}

$conn->disconnect();
/*
* 消費回撥函式
* 處理訊息
*/
function processMessage($envelope, $q) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //處理訊息
    $q->ack($envelope->getDeliveryTag()); //手動傳送ACK應答
}

先執行q1.php和q2.php指令碼保持訂閱狀態。然後執行topic.php指令碼釋出訊息。q1和q2收到的訊息如下:

RabbitMQ topic主題模式

RabbitMQ topic主題模式

如上截圖,驗證了我們之前的結論。

另外還有一些特殊情況例如:

  1. 如果binding_key 是 “#” - 它會接收所有的Message,不管routing_key是什麼,就像是fanout
    exchange。
  2. 如果 “*” and “#” 沒有被使用,那麼topic exchange就變成了direct exchange。

RabbitMQ技術交流QQ群:327034977(新增時請備註RabbitMQ)