1. 程式人生 > >RabbitMQ (五) : 訂閱者模式之分發模式 ( fanout )

RabbitMQ (五) : 訂閱者模式之分發模式 ( fanout )

class 是把 end con als connect 註冊事件 () inf

前面講到了簡單隊列和工作隊列.

這兩種隊列有個非常明顯的缺點 : 生產者發送的消息,只能進入到一個隊列.

消息只能進入到一個隊列就意味著消息只能被一個消費者消費.

盡管工作隊列模式中,一個隊列中的消息可以被多個消費者消費,但是,具體到每一條消息,卻只能被一個消費者消費.

如果想要一個消息被多個消費者消費,那麽生產者就必須把這條消息發送到多個隊列中去.

RabbitMQ 在這個點的設計是 :

在生產者和隊列兩者之間加入了一個叫做"交換機"的東西.

生產者發送消息時,不直接發送到隊列,而是發送到"交換機"(其實簡單隊列和工作隊列也是這樣的...前面的文章有提到,它們用的是默認的交換機).

"交換機"再根據聲明的類型(fanout,direct,topic,headers),轉發給符合要求的隊列.

技術分享圖片

這裏有個非常重要的知識點:

交換機只是一個"中轉的機器",它不是一個消息隊列,它沒有存儲消息的能力.這點很重要!

這意味著,當生產者把消息發送給某個交換機時,如果這時候,這個交換機沒有被任何隊列綁定,那麽這些消息將會丟失!

這種利用交換機,將消息"發送"到多個隊列的模式叫做 : 訂閱者模式.

這篇文章主要介紹訂閱者模式中的分發模式,

這種模式下,消息會被所有消費者消費.也就是說,只要是"綁定"到某個交換機的隊列,都會收到生產者發送到該交換機的消息.

生產者

    public class Producer
    {
        /// <summary>
        ///
交換機名稱 /// </summary> private const string ExchangeName = "test_exchange_fanout"; public static void Send() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); //聲明交換機,第2個參數為交換機類型
channel.ExchangeDeclare(ExchangeName, "fanout", false, false, null); for (int i = 0; i < 50; i++) { string msg = "hello world " + i; //第2個參數為路由鍵,這種模式顯然不需要路由鍵了,因為我們是把消息發送到所有綁定到該交換機的隊列. channel.BasicPublish(ExchangeName, "", null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } channel.Close(); connection.Close(); } }

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_fanout";

        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            channel.QueueDeclare(QueueName, false, false, false, null);

            //將隊列綁定到交換機上
            channel.QueueBind(QueueName, ExchangeName, "", null);

            //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
            };

            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

消費者2

只有這兩句不一樣

        private const string QueueName = "test_exchange2_queue";

        Console.WriteLine("consumer2 : " + str);

運行結果就不上圖.

RabbitMQ (五) : 訂閱者模式之分發模式 ( fanout )