1. 程式人生 > >RabbitMq初探——發布與訂閱

RabbitMq初探——發布與訂閱

9.png 數據 sent 發布 需要 關心 targe create day

publish and subscribe

前言


前面的例子 我們都是用到的都是消息單一消費,即一條消息被單個消費者消費。像微博系統的消息推送,是一條消息推送給所有訂閱到該頻道的用戶。

這裏我們就需要用到rabbitmq的發布與訂閱(publish and subscribe)

原理


前面我們弱化rabbitmq,只抽象出了 生產者、隊列、消費者三個概念。

現在需要介紹rabbitmq的整體數據流轉過程。

數據由生產者發送給交換機,交換機接收數據並把它發送給與自己綁定好的隊列,隊列接收消息並且把它發送給消費者。

事實上,生產者根本不會知道消息是發送給誰的,也不需要關心。who cares?!

技術分享

exchange的類型

移步 RabbitMQ各種交換機類型Exchange Types介紹

發布與訂閱

一個中心生產者,多個消費者。

生產者生產消息給類型為fanout的exchange,多個queue與該exchange綁定,消費者從queue中獲取消息。

代碼


1. 生產者、消費者聲明類型為fanout、名稱為logs的交換機

2. 消費者進程聲明名稱隨機的queue(用於每new 一個進程就會產生一個隊列),將queue與logs exchange綁定

整體代碼如下

fanout_sender.php

<?php
/**
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/20
 * Time: 14:20
 
*/ require_once __DIR__ . ‘/vendor/autoload.php‘; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘); $channel = $connection->channel(); $channel->exchange_declare(‘logs‘, ‘fanout‘, false, false
, false); $data = implode(‘ ‘, array_slice($argv, 1)); empty($data) && $data = ‘Hello World‘; $msg = new AMQPMessage($data); $channel->basic_publish($msg, ‘logs‘); echo " [x] Sent $data \n"; $channel->close(); $connection->close();

fanout_receive.php

<?php
/**
 * Created by PhpStorm.
 * User: wangdaxi
 * Date: 2017/10/20
 * Time: 14:32
 */
require_once __DIR__ . ‘/vendor/autoload.php‘;
use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection(‘127.0.0.1‘, 5672, ‘guest‘, ‘guest‘);
$channel = $connection->channel();

$channel->exchange_declare(‘logs‘, ‘fanout‘, false, false, false);

list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

$channel->queue_bind($queue_name, ‘logs‘);

echo ‘ [*] Waiting for logs. To exit press CTRL+C‘, "\n";

$callback = function($msg){
    echo "\n [x] " . $msg->body;
};

//消費,關閉ack
$channel->basic_consume($queue_name, ‘‘, false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

$channel->close();
$connection->close();

測試


開啟一個終端作為生產者P,兩個消費者作為消費者C1,C2。

生產者生產消息,會發現每個消費者都會收到同樣的消息。很簡單,不上圖。

以上。

RabbitMq初探——發布與訂閱