1. 程式人生 > >RabbitMQ與.net core(二)Producer與Exchange

RabbitMQ與.net core(二)Producer與Exchange

+= bytes 綁定 rabbit 版本 bubuko received wid fanout

原文:RabbitMQ與.net core(二)Producer與Exchange

Producer:消息的生產者,也就是創建消息的對象

Exchange:消息的接受者,也就是用來接收消息的對象,Exchange接收到消息後將消息按照規則發送到與他綁定的Queue中。下面我們來定義一個Producer與Exchange。

1.新建.netcore console項目,並引入RabbitMQ.Client的Nuget包

技術分享圖片

2.創建Exchange

using RabbitMQ.Client;

namespace RabbitMQConsole
{
    class Program
    {
        
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "39.**.**.**"; factory.Port = 5672; factory.VirtualHost = "/"; factory.UserName = "root"; factory.Password = "
root"; var exchange = "change2"; var route = "route2"; var queue = "queue2"; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange, type:
"direct", durable: true, autoDelete: false); //創建Exchange } } } } }

可以看到Echange的參數有:

type:可選項為,fanout,direct,topic,headers。區別如下:

    fanout:發送到所有與當前Exchange綁定的Queue中

    direct:發送到與消息的routeKey相同的Rueue中

    topic:fanout的模糊版本

    headers:發送到與消息的header屬性相同的Queue中

durable:持久化

autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。

運行程序,可以在可視化界面看到change2

技術分享圖片

接下來我們可以創建與change2綁定的queue

3.創建Queue

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);  #創建queue2
                    channel.QueueBind(queue, exchange, route);  #將queue2綁定到exchange2
                }

可以看到Echange的參數有:

durable:持久化

exclusive:如果為true,則queue只在channel存在時存在,channel關閉則queue消失

autoDelete:當最後一個綁定(隊列或者exchange)被unbind之後,該exchange自動被刪除。

去可視化界面看Queue

技術分享圖片

4.發送消息

                using (var channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange, type: "direct", durable: true, autoDelete: false);
                    channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                    channel.QueueBind(queue, exchange, route);
                    var props = channel.CreateBasicProperties();
                    props.Persistent = true; #持久化
                    channel.BasicPublish(exchange, route, true, props, Encoding.UTF8.GetBytes("hello rabbit"));
                }

技術分享圖片

5.消費消息

using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQClient
{
    class Program
    {
        private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
        {
            HostName = "39.**.**.**",
            Port = 5672,
            UserName = "root",
            Password = "root",
            VirtualHost = "/"
        };
        static void Main(string[] args)
        {
            var exchange = "change2";
            var route = "route2";
            var queue = "queue2";


            using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);
                while (true)
                {
                    var message = channel.BasicGet(queue, true);  #第二個參數說明自動釋放消息,如為false需手動釋放消息
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }
            }
        }
    }
}

運行查看結果

技術分享圖片

查看可視化界面

技術分享圖片

6.手動釋放消息

                while (true)
                {
                    var message = channel.BasicGet(queue, false);#設置為手動釋放
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                    }
                    channel.BasicAck(message.DeliveryTag, false); #手動釋放
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

我們再發一條消息,然後開始消費,加個斷點調試一下

技術分享圖片

查看一下Queue中消息狀態

技術分享圖片

然後直接取消調試,不讓程序走到釋放的那一步,再查看一下消息狀態

技術分享圖片

這麽說來只要不走到 channel.BasicAck(message.DeliveryTag, false);這一行,消息就不會被釋放掉,我們讓程序直接走到這一行代碼,查看一下消息的狀態

技術分享圖片

如圖已經被釋放了

7.讓失敗的消息回到隊列中

                while (true)
                {
                    var message = channel.BasicGet(queue, false);
                    if(message!=null)
                    {
                        var msgBody = Encoding.UTF8.GetString(message.Body);
                        Console.WriteLine(string.Format("***接收時間:{0},消息內容:{1}", DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss"), msgBody));
                        Console.WriteLine(message.DeliveryTag);    #當前消息被處理的次序數
                        if (1==1)
                            channel.BasicReject(message.DeliveryTag, true);
                    }
                    
                    System.Threading.Thread.Sleep(TimeSpan.FromSeconds(1));
                }

重新發送4條消息

技術分享圖片

開始消費

技術分享圖片

我們可以看到消息一直沒有沒消費,因為消息被處理之後又放到了隊尾

8.監聽消息

 using (IConnection conn = rabbitMqFactory.CreateConnection())
            using (IModel channel = conn.CreateModel())
            {
                channel.ExchangeDeclare(exchange, "direct", durable: true, autoDelete: false);
                channel.QueueDeclare(queue, durable: true, exclusive: false, autoDelete: false);
                channel.QueueBind(queue, exchange, route);

                channel.BasicQos(prefetchSize: 0, prefetchCount: 10, global: false);  #一次接受10條消息,否則rabbit會把所有的消息一次性推到client,會增大client的負荷
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                {
                    Byte[] body = ea.Body;
                    String message = Encoding.UTF8.GetString(body);
                    Console.WriteLine( message+Thread.CurrentThread.ManagedThreadId);
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };

                channel.BasicConsume(queue: queue, autoAck: false, consumer: consumer);
                Console.ReadLine();
            }

RabbitMQ與.net core(二)Producer與Exchange