1. 程式人生 > >Rabbitmq原始碼示例(生產者/消費者,非持久化/持久化)

Rabbitmq原始碼示例(生產者/消費者,非持久化/持久化)

//////////////// 非持久化生產者


        static void TestThreadFun()
        {
            string queueName = "hello";
            string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
            var body = Encoding.UTF8.GetBytes(message);
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "10.5.8.108"; // Rabbitmq伺服器地址
            factory.UserName = "vcyber";// Rabbitmq服務中預先分配的賬號(賬號的許可權決定後面的操作是否合法)
            factory.Password = "123456";// Rabbitmq服務賬號的密碼
            bool durable = false;    // 是否持久化


            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
   // 宣告一個佇列,並且指定佇列名稱和屬性
   // queueName:佇列名稱
   // durable:非 持久化
   // false1:非 所有生產者斷開連線之後,自動銷燬訊息佇列
   // false2:非 所有消費者斷開連線之後,自動銷燬訊息佇列
   // null: 
   // 備註:如果同名的訊息佇列已經存在,且新指定的屬性與已存在的不同,則丟擲異常
                    channel.QueueDeclare(queueName, durable, false, false, null);


                    for (int i = 0; i < nRequestCount; ++i)
                    {
// 傳送一個訊息
// "":交換機名稱
// queueName:路由鍵
// null:
// body:傳送資料
                        channel.BasicPublish("", queueName, null, body);  // 訊息為非持久化
                    }
                }
            }
        }




//////////////// 持久化生產者


        static void TestThreadFun()
        {
            string queueName = "hello";
            string message = "abcdefghijklmnopqrstuvwxyz一二三四五六七八九十0123456789";
            var body = Encoding.UTF8.GetBytes(message);
            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "10.5.8.108"; // Rabbitmq伺服器地址
            factory.UserName = "vcyber";// Rabbitmq服務中預先分配的賬號(賬號的許可權決定後面的操作是否合法)
            factory.Password = "123456";// Rabbitmq服務賬號的密碼
            bool durable = true;    // 是否持久化




            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
   // 宣告一個佇列,並且指定佇列名稱和屬性
   // queueName:佇列名稱
   // durable: 持久化
   // false1: 非 所有生產者斷開連線之後,自動銷燬訊息佇列
   // false2: 非 所有消費者斷開連線之後,自動銷燬訊息佇列
   // null: 
   // 備註:如果同名的訊息佇列已經存在,且新指定的屬性與已存在的不同,則丟擲異常
                    channel.QueueDeclare(queueName, durable, false, false, null);


   // 建立一個屬性物件,設定持久化為true
                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.SetPersistent(true);


                    for (int i = 0; i < nRequestCount; ++i)
                    {
// 傳送一個訊息
// "":交換機名稱
// queueName:路由鍵
// properties:持久化為true的屬性物件
// body:傳送資料
                        channel.BasicPublish("", queueName, properties, body);  // 訊息為非持久化
                    }
                }
            }
        }








//////////////// 非持久化消費者


        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "10.5.8.108"; // Rabbitmq伺服器地址
            factory.UserName = "vcyber";// Rabbitmq服務中預先分配的賬號(賬號的許可權決定後面的操作是否合法)
            factory.Password = "123456";// Rabbitmq服務賬號的密碼


            string queueName = "hello";// 訊息佇列名字
            bool durable = false;    // 是否持久化




            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
   // 宣告一個佇列,並且指定佇列名稱和屬性
   // queueName:佇列名稱
   // durable:非 持久化
   // false1:非 所有生產者斷開連線之後,自動銷燬訊息佇列
   // false2:非 所有消費者斷開連線之後,自動銷燬訊息佇列
   // null: 
   // 備註:如果同名的訊息佇列已經存在,且新指定的屬性與已存在的不同,則丟擲異常
                    channel.QueueDeclare(queueName, durable, false, false, null);


   // 建立一個消費者
                    var consumer = new QueueingBasicConsumer(channel);


   // 啟動一個消費者
   // queueName:佇列名稱
   // true:不需要回應。server將msg返回給消費者後,自動將本地訊息刪除。
該引數的使用與佇列的持久化屬性配合使用。非持久化佇列,設定true;持久化佇列,設定為false。
   // consumer:消費者物件
                    channel.BasicConsume(queueName, true, consumer);




                    while (true)
                    {
                        BasicDeliverEventArgs ea = null;


// 嘗試獲取訊息
// 1000:超時時間,ms
// ea:返回引數,函式成功時攜帶返回的訊息
                        if (!consumer.Queue.Dequeue(1000, out ea))
                        {
                            break;
                        }
                    }
                }


            }
        }




//////////////// 持久化消費者


        static void Main(string[] args)
        {
            var factory = new ConnectionFactory();
            factory.HostName = "10.5.8.108"; // Rabbitmq伺服器地址
            factory.UserName = "vcyber";// Rabbitmq服務中預先分配的賬號(賬號的許可權決定後面的操作是否合法)
            factory.Password = "123456";// Rabbitmq服務賬號的密碼


            string queueName = "hello";// 訊息佇列名字
            bool durable = true;    // 持久化


            using (var connection = factory.CreateConnection())
            {
                using (var channel = connection.CreateModel())
                {
   // 宣告一個佇列,並且指定佇列名稱和屬性
   // queueName:佇列名稱
   // durable:持久化
   // false1:非 所有生產者斷開連線之後,自動銷燬訊息佇列
   // false2:非 所有消費者斷開連線之後,自動銷燬訊息佇列
   // null: 
   // 備註:如果同名的訊息佇列已經存在,且新指定的屬性與已存在的不同,則丟擲異常
                    channel.QueueDeclare(queueName, durable, false, false, null);


   // 建立一個消費者
                    var consumer = new QueueingBasicConsumer(channel);


   // 啟動一個消費者
   // queueName:佇列名稱
   // true:需要回應。server將msg返回給消費者後,需要等到消費者返回確認之後,才會將本地訊息刪除。
該引數的使用與佇列的持久化屬性配合使用。非持久化佇列,設定true;持久化佇列,設定為false。
   // consumer:消費者物件
                    channel.BasicConsume(queueName, false, consumer);


                    while (true)
                    {
                        BasicDeliverEventArgs ea = null;
// 嘗試獲取訊息
// 1000:超時時間,ms
// ea:返回引數,函式成功時攜帶返回的訊息
                        if (!consumer.Queue.Dequeue(1000, out ea))
                        {
                            break;
                        }


// 向server傳送確認
                        channel.BasicAck(ea.DeliveryTag, false);
                    }


                }


            }
        }