RabbitMq初探——發布與訂閱
阿新 • • 發佈:2017-10-20
9.png 數據 sent 發布 需要 關心 targe create day
publish and subscribe
前言
前面的例子 我們都是用到的都是消息單一消費,即一條消息被單個消費者消費。像微博系統的消息推送,是一條消息推送給所有訂閱到該頻道的用戶。
這裏我們就需要用到rabbitmq的發布與訂閱(publish and subscribe)
原理
前面我們弱化rabbitmq,只抽象出了 生產者、隊列、消費者三個概念。
現在需要介紹rabbitmq的整體數據流轉過程。
數據由生產者發送給交換機,交換機接收數據並把它發送給與自己綁定好的隊列,隊列接收消息並且把它發送給消費者。
事實上,生產者根本不會知道消息是發送給誰的,也不需要關心。who cares?!
exchange的類型
移步 RabbitMQ各種交換機類型Exchange Types介紹
發布與訂閱
一個中心生產者,多個消費者。
生產者生產消息給類型為fanout的exchange,多個queue與該exchange綁定,消費者從queue中獲取消息。
代碼
1. 生產者、消費者聲明類型為fanout、名稱為logs的交換機
2. 消費者進程聲明名稱隨機的queue(用於每new 一個進程就會產生一個隊列),將queue與logs exchange綁定
整體代碼如下
fanout_sender.php
<?php /** * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/20 * Time: 14:20*/ require_once __DIR__ . ‘/vendor/autoload.php‘; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘); $channel = $connection->channel(); $channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false); $data = implode(‘ ‘, array_slice($argv, 1)); empty($data) && $data = ‘Hello World‘; $msg = new AMQPMessage($data); $channel->basic_publish($msg, ‘logs‘); echo " [x] Sent $data \n"; $channel->close(); $connection->close();
fanout_receive.php
<?php /** * Created by PhpStorm. * User: wangdaxi * Date: 2017/10/20 * Time: 14:32 */ require_once __DIR__ . ‘/vendor/autoload.php‘; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘); $channel = $connection->channel(); $channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, ‘logs‘); echo ‘ [*] Waiting for logs. To exit press CTRL+C‘, "\n"; $callback = function($msg){ echo "\n [x] " . $msg->body; }; //消費,關閉ack $channel->basic_consume($queue_name, ‘‘, false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close();
測試
開啟一個終端作為生產者P,兩個消費者作為消費者C1,C2。
生產者生產消息,會發現每個消費者都會收到同樣的消息。很簡單,不上圖。
以上。
RabbitMq初探——發布與訂閱