RabbitMQ (七) : 訂閱者模式之主體模式 ( topic )
主體模式和路由模式很像
路由模式是精確匹配
主體模式是模糊匹配
依然先通過管理後臺新增一個交換機.
生產者
public class Producer { private const string ExchangeName = "test_exchange_topic"; public static void Send() { //獲取一個連線 IConnection connection = ConnectionHelper.GetConnection(); //從連線中獲取一個通道 IModel channel = connection.CreateModel(); //宣告交換機 //channel.ExchangeDeclare(ExchangeName, "topic", false, false, null); //每次只向消費者傳送一條訊息,消費者使用後,手動確認後,才會傳送另外一條 channel.BasicQos(0, 1, false); string msg = "hello world "; //傳送訊息,只發送到路由鍵為"product.delete" 或者 "product.#"的佇列. //# 匹配一個或多個 //* 匹配一個 //上限為 255 個位元組 channel.BasicPublish(ExchangeName, "product.delete", null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); channel.Close(); connection.Close(); } }
消費者1
public class Consumer1 { private const string QueueName = "test_exchange1_queue"; private const string ExchangeName = "test_exchange_topic"; public static void Receive() { //獲取連線 RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection(); //建立通道 RabbitMQ.Client.IModel channel = connection.CreateModel(); //宣告佇列 channel.QueueDeclare(QueueName, false, false, false, null); //將佇列繫結到交換機上 channel.QueueBind(QueueName, ExchangeName, "product.add", null); channel.QueueBind(QueueName, ExchangeName, "product.update", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //註冊事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer1 : " + str); channel.BasicAck(e.DeliveryTag, false);//手動應答 }; //監聽佇列 //bool autoAck = true;//自動確認,一旦mq將訊息分發給了消費者,就會從記憶體中刪除該訊息 bool autoAck = false;//手動應答. channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer); } }
消費者2
public class Consumer2 { private const string QueueName = "test_exchange2_queue"; private const string ExchangeName = "test_exchange_topic"; public static void Receive() { //獲取連線 RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection(); //建立通道 RabbitMQ.Client.IModel channel = connection.CreateModel(); //宣告佇列 channel.QueueDeclare(QueueName, false, false, false, null); //將佇列繫結到交換機上 channel.QueueBind(QueueName, ExchangeName, "product.#", null);//新增消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //註冊事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer2 : " + str); channel.BasicAck(e.DeliveryTag, false);//手動應答 }; //監聽佇列 //bool autoAck = true;//自動確認,一旦mq將訊息分發給了消費者,就會從記憶體中刪除該訊息 bool autoAck = false;//手動應答. channel.BasicConsume(QueueName, autoAck, "", false, false, null, consumer); } }
執行結果:
由於消費者1的路由鍵只有 "product.add" 和 "product.update" ,不包含"product.delete",
而消費者2的路由鍵是"product.#",可以模糊匹配上"product.delete",
所以交換機只會把訊息轉發到消費者2宣告的佇列中.