1. 程式人生 > >RabbitMQ (九) : 消息確認機制之 confirm 機制

RabbitMQ (九) : 消息確認機制之 confirm 機制

sel 異步 confirm lin creat 生產 不能 消息 tip

confirm 機制分串行和並行兩種.

串行

生產者

    public class Producer
    {
        private const string QueueName = "test_confirm_queue";
        public static void Send()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            channel.QueueDeclare(QueueName, 
false, false, false, null); //開啟confirm機制.註意 : 一個隊列如果之前已經設置成了事務機制,那麽不能再設置為 confirm 機制.反之亦然 channel.ConfirmSelect(); string msg = "hello world "; //發送消息 for (int i = 0; i < 10; i++) { channel.BasicPublish("", QueueName, null
, Encoding.Default.GetBytes(msg + i)); } //設置confirm機制的串行機制,如果消息采用的是持久化,那麽確認消息會在持久化後發出.
       //可以發一批後,調用該方法;也可以每發一條調用一次.
       //當然是發一批消息後調用好些!
            if (channel.WaitForConfirms())
            {
                Console.WriteLine("send is success");
            }
            
else { Console.WriteLine("send is failed"); } channel.Close(); connection.Close(); } }

並行(異步)

生產者

    public class Producer
    {
        private const string QueueName = "test_confirm_queue";

        //這裏一定要用 SortedSet
        private static readonly SortedSet<ulong> ConfirmSort = new SortedSet<ulong>();
        public static void Send()
        {
            using (IConnection connection = ConnectionHelper.GetConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.QueueDeclare(QueueName, false, false, false, null);
                    channel.ConfirmSelect();

                    //成功
                    channel.BasicAcks += (s, e) =>
                    {
                        //多條
                        if (e.Multiple)
                        {
                            Console.WriteLine("最後成功的一條是 : " + e.DeliveryTag);
                            ConfirmSort.RemoveWhere(r => r <= e.DeliveryTag);
                        }
                        //單條
                        else
                        {
                            Console.WriteLine(e.DeliveryTag + " 成功發送 ");
                            ConfirmSort.Remove(e.DeliveryTag);
                        }
                    };


                    //失敗
                    channel.BasicNacks += (s, e) =>
                    {
                        //多條
                        if (e.Multiple)
                        {
                            Console.WriteLine("最後失敗的一條是 : " + e.DeliveryTag);
                        }
                        //單條
                        else
                        {
                            Console.WriteLine(e.DeliveryTag + " 發送失敗 ");
                        }
                    };


                    //發送消息
                    string msg = "hello world ";
                    int i = 0;
                    while (true)
                    {
                        ulong seqNo = channel.NextPublishSeqNo;
                        channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg + i));
                        ConfirmSort.Add(seqNo);//把編號加入到集合中
                        i++;
                    }
                }
            }
        }
    }

RabbitMQ (九) : 消息確認機制之 confirm 機制