1. 程式人生 > >RabbitMQ (八) : 消息確認機制之事務機制

RabbitMQ (八) : 消息確認機制之事務機制

private 消息 lar except publish help bsp 消費者 consumer

實在沒啥好說的.

生產者

    public class Producer
    {
        private const string QueueName = "test_work_queue";
        public static void Send()
        {
            //獲取一個連接
            IConnection connection = ConnectionHelper.GetConnection();

            //從連接中獲取一個通道
            IModel channel = connection.CreateModel();

            
//聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); //創建一個消息 string msg = "hello world "; try { //開啟事務機制 //事務機制性能不好,不建議使用.因為需要和服務器發生額外的通信,降低了 RabbitMQ 的吞吐量 channel.TxSelect();
//發送消息 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); //提交 channel.TxCommit(); Console.WriteLine($"send {msg}"); } catch (Exception e) { //回滾 channel.TxRollback();
Console.WriteLine(e); } channel.Close(); connection.Close(); } }

消費者

    public class Consumer
    {
        private const string QueueName = "test_work_queue";
        public static void Receive()
        {
            //獲取連接
            RabbitMQ.Client.IConnection connection = ConnectionHelper.GetConnection();

            //創建通道
            RabbitMQ.Client.IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //註冊事件
            consumer.Received += Consumer_Received;

            //監聽隊列
            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }

        private static void Consumer_Received(object sender, BasicDeliverEventArgs e)
        {
            byte[] bytes = e.Body;
            string str = Encoding.Default.GetString(bytes);
            Console.WriteLine("consumer : " + str);
        }
    }

RabbitMQ (八) : 消息確認機制之事務機制