RabbitMQ的釋出訂閱模式(Publish/Subscribe)
阿新 • • 發佈:2020-05-12
### 一、釋出/訂閱(Publish/Subscribe)模式
釋出訂閱是我們經常會用到的一種模式,生產者生產訊息後,所有訂閱者都可以收到。RabbitMQ的釋出/訂閱模型圖如下:
![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512153748920-2070970018.png)
1、該模式下生產者並不是直接操作佇列,而是將資料傳送給交換機,由交換機將資料傳送給與之繫結的佇列;
2、該模式必須宣告交換機,並且設定模式:` channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);`
fanout指分發模式(將每一條訊息都發送到與交換機繫結的隊)。
3、佇列必須繫結交換機:`channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");`
### 二、釋出訊息
訊息生產者向交換機(exchange)傳送訊息,程式碼如下:
```
// 定義交換機名稱
static string EXCHANGE_NAME = "ps_exchange_fanout";
public static void PublishMessage()
{
try
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定義exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
string msg = "hello ps!";
var body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(EXCHANGE_NAME, "", null, body);
Console.WriteLine("send msg:" + msg);
channel.Close();
conn.Close();
}
catch (Exception ex)
{
throw ex;
}
}
```
訊息傳送成功,截圖如下:
![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512151631893-1286800138.png)
### 三、訂閱訊息
在這裡需要兩個消費者,訊息傳送後,所有的訂閱者都可以收到訊息;
####3.1 消費者1
和輪詢分發以及公平分發不同的是,消費者需要將佇列繫結到交換機,來訂閱訊息;實現程式碼如下:
```
static string EXCHANGE_NAME = "ps_exchange_fanout";
static string QUEUE_NAME = "ps_queue_sub1";
///
/// 訂閱消費者1
///
static void SubscribeConsumer1()
{
var conn = RabbitMQHelper.GetConnection();
var channel = conn.CreateModel();
// 定義exchange
channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
// 繫結queue
channel.QueueDeclare(queue: QUEUE_NAME);
channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 定義Consumer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model,ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine($"SubscribeConsumer1 收到訊息: {message},時間:{DateTime.Now}");
};
//啟動消費者 設定為手動應答訊息
channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
Console.WriteLine("Subscribe Consumer1 消費者已啟動");
Console.ReadKey();
channel.Dispose();
conn.Close();
}
```
####3.2 消費者2
消費者2的程式碼和1的基本相同,大家可以將1的修改一下,就可以使用,在此就不重複貼出了;
####3.3 接收訊息結果
消費者1接收訊息截圖:
![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512152613503-679270531.png)
消費者2接收訊息截圖:
![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512152635009-1229922947.png)
通過上圖,我們可以看到,釋出者釋出訊息後,訂閱者1、2均受到了相同的訊息,至此功能已經完成;
### 四、小結
####4.1 訂閱者程式碼的主要流程
根據消費者的程式碼,我們可以提煉流程如下
(1)建立連線
(2)宣告exchange
(3)繫結佇列到exchange
(4)宣告消費者
(5)繫結消費者到channel,監聽處理訊息
(6)關閉連線
####4.2 訂閱成功後,我們開啟mq的管理地址可以看到,有兩個queue繫結到exchange上了:
![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512153247045-1824866531.png)
注意:如果我們的exchange沒有消費者訂閱,釋出的訊息將不會被儲存到任何佇列,直接丟失了;
參考連結:https://www.rabbitmq.com/tutorials/tutorial-three-dotn