1. 程式人生 > >RabbitMQ的釋出訂閱模式(Publish/Subscribe)

RabbitMQ的釋出訂閱模式(Publish/Subscribe)

### 一、釋出/訂閱(Publish/Subscribe)模式 釋出訂閱是我們經常會用到的一種模式,生產者生產訊息後,所有訂閱者都可以收到。RabbitMQ的釋出/訂閱模型圖如下: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512153748920-2070970018.png) 1、該模式下生產者並不是直接操作佇列,而是將資料傳送給交換機,由交換機將資料傳送給與之繫結的佇列; 2、該模式必須宣告交換機,並且設定模式:` channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);`   fanout指分發模式(將每一條訊息都發送到與交換機繫結的隊)。 3、佇列必須繫結交換機:`channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");` ### 二、釋出訊息 訊息生產者向交換機(exchange)傳送訊息,程式碼如下: ``` // 定義交換機名稱 static string EXCHANGE_NAME = "ps_exchange_fanout"; public static void PublishMessage() { try { var conn = RabbitMQHelper.GetConnection(); var channel = conn.CreateModel(); // 定義exchange channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout); string msg = "hello ps!"; var body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(EXCHANGE_NAME, "", null, body); Console.WriteLine("send msg:" + msg); channel.Close(); conn.Close(); } catch (Exception ex) { throw ex; } } ``` 訊息傳送成功,截圖如下: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512151631893-1286800138.png) ### 三、訂閱訊息 在這裡需要兩個消費者,訊息傳送後,所有的訂閱者都可以收到訊息; ####3.1 消費者1 和輪詢分發以及公平分發不同的是,消費者需要將佇列繫結到交換機,來訂閱訊息;實現程式碼如下: ``` static string EXCHANGE_NAME = "ps_exchange_fanout"; static string QUEUE_NAME = "ps_queue_sub1"; /// /// 訂閱消費者1 /// static void SubscribeConsumer1() { var conn = RabbitMQHelper.GetConnection(); var channel = conn.CreateModel(); // 定義exchange channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout); // 繫結queue channel.QueueDeclare(queue: QUEUE_NAME); channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 定義Consumer var consumer = new EventingBasicConsumer(channel); consumer.Received += (model,ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine($"SubscribeConsumer1 收到訊息: {message},時間:{DateTime.Now}"); }; //啟動消費者 設定為手動應答訊息 channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer); Console.WriteLine("Subscribe Consumer1 消費者已啟動"); Console.ReadKey(); channel.Dispose(); conn.Close(); } ``` ####3.2 消費者2 消費者2的程式碼和1的基本相同,大家可以將1的修改一下,就可以使用,在此就不重複貼出了; ####3.3 接收訊息結果 消費者1接收訊息截圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512152613503-679270531.png) 消費者2接收訊息截圖: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512152635009-1229922947.png) 通過上圖,我們可以看到,釋出者釋出訊息後,訂閱者1、2均受到了相同的訊息,至此功能已經完成; ### 四、小結 ####4.1 訂閱者程式碼的主要流程 根據消費者的程式碼,我們可以提煉流程如下 (1)建立連線 (2)宣告exchange (3)繫結佇列到exchange (4)宣告消費者 (5)繫結消費者到channel,監聽處理訊息 (6)關閉連線 ####4.2 訂閱成功後,我們開啟mq的管理地址可以看到,有兩個queue繫結到exchange上了: ![](https://img2020.cnblogs.com/blog/653404/202005/653404-20200512153247045-1824866531.png) 注意:如果我們的exchange沒有消費者訂閱,釋出的訊息將不會被儲存到任何佇列,直接丟失了; 參考連結:https://www.rabbitmq.com/tutorials/tutorial-three-dotn