1. 程式人生 > >RabbitMQ與.net core(四) 訊息的優先順序 與 死信佇列

RabbitMQ與.net core(四) 訊息的優先順序 與 死信佇列

1.訊息的優先順序

假如現在有個需求,我們需要讓一些優先順序最高的通知推送到客戶端,我們可以使用redis的sortedset,也可以使用我們今天要說的rabbit的訊息優先順序屬性

Producer程式碼

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class
Program { static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password
= "root"; var exchange = "change4"; var route = "route2"; var queue9 = "queue9"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type:
"fanout", durable: true, autoDelete: false);             //x-max-priority屬性必須設定,否則訊息優先順序不生效 channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false,arguments: new Dictionary<string, object> { { "x-max-priority", 50 } }); channel.QueueBind(queue9, exchange, queue9); while(true) { var messagestr = Console.ReadLine(); var messagepri = Console.ReadLine(); var props = channel.CreateBasicProperties(); props.Persistent = true; props.Priority = (byte)int.Parse(messagepri);//設定訊息優先順序 channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes(messagestr)); } } } } } }

consumer程式碼

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change4";
            var route = "route2";
            var queue9 = "queue9";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "fanout", durable: true, autoDelete: false);
                channel.QueueDeclare(queue9, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> { { "x-max-priority", 50 } });
                channel.QueueBind(queue9, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue9, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }
        }
    }
}

執行producer

在執行consumer

可以看出訊息是按優先順序消費的

2.死信佇列

死信佇列可以用來做容錯機制,當我們的訊息處理異常時我們可以把訊息放入到死信佇列中,以便後期處理,死信的產生有三種

1.訊息被拒(basic.reject or basic.nack)並且沒有重新入隊(requeue=false);

2.當前佇列中的訊息數量已經超過最大長度。

3.訊息在佇列中過期,即當前訊息在佇列中的存活時間已經超過了預先設定的TTL(Time To Live)時間;

看程式碼

using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RabbitMQConsole
{
    class Program
    {
        static void Main(string[] args)
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "39.**.**.**";
            factory.Port = 5672;
            factory.VirtualHost = "/";
            factory.UserName = "root";
            factory.Password = "root";

            var exchangeA = "changeA";
            var routeA = "routeA";
            var queueA = "queueA";

            var exchangeD = "changeD";
            var routeD = "routeD";
            var queueD = "queueD";

            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchangeD, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queueD, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queueD, exchangeD, routeD);

                    channel.ExchangeDeclare(exchangeA, type: "fanout", durable: true, autoDelete: false);
                    channel.QueueDeclare(queueA, durable: true, exclusive: false, autoDelete: false, arguments: new Dictionary<string, object> {
                                         { "x-dead-letter-exchange",exchangeD}, //設定當前佇列的DLX
                                         { "x-dead-letter-routing-key",routeD}, //設定DLX的路由key,DLX會根據該值去找到死信訊息存放的佇列
                                         { "x-message-ttl",10000} //設定訊息的存活時間,即過期時間
                                         });
                    channel.QueueBind(queueA, exchangeA, routeA);


                    var properties = channel.CreateBasicProperties();
                    properties.Persistent = true;
                    //釋出訊息
                    channel.BasicPublish(exchange: exchangeA,
                                         routingKey: routeA,
                                         basicProperties: properties,
                                         body: Encoding.UTF8.GetBytes("message"));
                }
            }
        }
    }
}

這樣10秒後訊息過期,我們可以看到queueD中有了訊息