1. 程式人生 > >RabbitMQ與.net core(三) fanout型別Exchange 與 訊息的過期時間 與 佇列的存活時間

RabbitMQ與.net core(三) fanout型別Exchange 與 訊息的過期時間 與 佇列的存活時間

原文: RabbitMQ與.net core(三) fanout型別Exchange 與 訊息的過期時間 與 佇列的存活時間

上一篇我們講了關於direct型別的Exchange,這一片我們來了解一下fanout型別的Exchange。

1.Exchange的fanout型別

fanout型別的Exchange的特點是會把訊息傳送給與之繫結的所有Queue中,我們來測試一下。程式碼如下

using RabbitMQ.Client;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole { class Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName
= "root"; factory.Password = "root"; var exchange = "change3"; var route = "route2"; var queue3 = "queue3"; var queue4 = "queue4"; var queue5 = "queue5"; using (var connection = factory.CreateConnection()) {
using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false); channel.QueueDeclare(queue3, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue3, exchange, queue3); channel.QueueDeclare(queue4, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue4, exchange, queue4); channel.QueueDeclare(queue5, durable: true, exclusive: false, autoDelete: false); channel.QueueBind(queue5, exchange, queue5); var props = channel.CreateBasicProperties(); props.Persistent = true; channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit")); } } } } }

執行程式碼,去視覺化工具中檢視一下

消費其中的一個

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change3";
            var route = "route2";
            var queue = "queue3";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }
        }
    }
}

結果如下

大家可以依次消費其他兩個Queue,這裡就不演示了

2.訊息的過期時間

我們在傳送一些訊息的時候,有時希望給訊息設定一下過期時間,我們可以通過兩種方式來設定

2.1設定佇列的過期時間

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
            //佇列過期時間,單位毫秒
                    channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.QueueBind(queue7, exchange, queue7);

                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));

                }
            }
        }
    }
}

這樣過8秒去Queue就看不到該訊息了

2.2設定message的過期時間

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false,arguments:new Dictionary<string, object> { { "x-message-ttl", 8000 } });
                    channel.QueueBind(queue7, exchange, queue7);

                    var props = channel.CreateBasicProperties();
            //message過期時間,單位毫秒
                    props.Expiration = "30000";
                    props.Persistent = true;
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));

                }
            }
        }
    }
}

我們發現還是8秒就過期了,說明如果同時設定了佇列與訊息的過期時間,則按照佇列的時間過期。我們把佇列的過期時間去掉重新試一下。

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue7 = "queue7";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue7, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue7, exchange, queue7);

                    var props = channel.CreateBasicProperties();
                    props.Expiration = "30000";
                    props.Persistent = true;
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));

                }
            }
        }
    }
}

3.佇列生存時間

我們還可以設定一個佇列的生存時間

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchange = "change4";
            var route = "route2";
            var queue8 = "queue8";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue8, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> {
                        { "x-expires",10000} //設定當前佇列的過期時間為10000毫秒
                    });
                    channel.QueueBind(queue8, exchange, queue8);

                    var props = channel.CreateBasicProperties();
                    props.Persistent = true;
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));

                }
            }
        }
    }
}

這樣10秒後佇列就消失了