1. 程式人生 > >RabbitMQ入門教程

RabbitMQ入門教程

fanout color -i www 快速 單播 odi 區別 multi

1.引言

RabbitMQ——Rabbit Message Queue的簡寫,但不能僅僅理解其為消息隊列,消息代理更合適。RabbitMQ 是一個由 Erlang 語言開發的AMQP(高級消息隊列協議)的開源實現,其內部結構如下:

技術分享圖片

RabbitMQ作為一個消息代理,主要和消息打交道,負責接收並轉發消息。RabbitMQ提供了可靠的消息機制、跟蹤機制和靈活的消息路由,支持消息集群和分布式部署。適用於排隊算法、秒殺活動、消息分發、異步處理、數據同步、處理耗時任務、CQRS等應用場景。

下面我們就來學習下RabbitMQ。

2. 環境搭建

本文主要基於Windows下使用Vs Code 基於.net core進行demo演示。開始之前我們需要準備好以下環境。

  • 安裝Erlang運行環境
    下載安裝Erlang。
  • 安裝RabbitMQ
    下載安裝Windows版本的RabbitMQ。
  • 啟動RabbitMQ Server
    點擊Windows開始按鈕,輸入RabbitMQ找到RabbitMQ Comman Prompt,以管理員身份運行。
  • 依次執行以下命令啟動RabbitMQ服務

    rabbitmq-service install
    rabbitmq-service enable
    rabbitmq-service start

    執行rabbitmqlctl status檢查RabbitMQ狀態

  • 安裝管理平臺插件
    執行rabbitmq-plugins enable rabbitmq_management

    即可成功安裝,使用默認賬號密碼(guest/guest)登錄http://localhost:15672/即可。

3. Hello RabbitMQ

在開始之前我們先來了解下消息模型:
技術分享圖片
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然後發布到隊列(queue)中,隊列再將消息發送到監聽的消費者。

下面我們我們通過demo來了解RabbitMQ的基本用法。

3.1.消息的發送和接收

創建RabbitMQ文件夾,打開命令提示符,分別創建兩個控制臺項目Send、Receive。

dotnet new console --name Send //創建發送端控制臺應用
cd Send //
進入Send目錄 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢復包 dotnet new console --name Receive //創建接收端控制臺應用 cd Receive //進入Receive目錄 dotnet add package RabbitMQ.Client //添加RabbitMQ.Client包 dotnet restore //恢復包

我們先來添加消息發送端邏輯:

//Send.cs 
public static void Main(string[] args)
{
    //1.1.實例化連接工廠
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立連接
    using (var connection = factory.CreateConnection())
    {
        //3. 創建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明隊列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 構建byte消息數據包
            string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
            var body = Encoding.UTF8.GetBytes(message);
            //6. 發送數據包
            channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent {0}", message);
        }
    }
}

再來完善消息接收端邏輯:

//Receive.cs  省略部分代碼
public static void Main()
{
    //1.實例化連接工廠
    var factory = new ConnectionFactory() { HostName = "localhost" };
    //2. 建立連接
    using (var connection = factory.CreateConnection())
    {
        //3. 創建信道
        using (var channel = connection.CreateModel())
        {
            //4. 申明隊列
            channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
            //5. 構造消費者實例
            var consumer = new EventingBasicConsumer(channel);
            //6. 綁定消息接收後的事件委托
            consumer.Received += (model, ea) =>
            {
                var message = Encoding.UTF8.GetString(ea.Body);
                Console.WriteLine(" [x] Received {0}", message);
                Thread.Sleep(6000);//模擬耗時
                Console.WriteLine (" [x] Done");
            };
            //7. 啟動消費者
            channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer);
            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

先運行消息接收端,再運行消息發送端,結果如下圖。

技術分享圖片

從上面的代碼中可以看出,發送端和消費端的代碼前4步都是一樣的。主要的區別在於發送端調用channel.BasicPublish方法發送消息;而接收端需要實例化一個EventingBasicConsumer實例來進行消息處理邏輯。另外一點需要註意的是:消息接收端和發送端的隊列名稱(queue)必須保持一致,這裏指定的隊列名稱為hello。

3.2. 循環調度

使用工作隊列的好處就是它能夠並行的處理隊列。如果堆積了很多任務,我們只需要添加更多的工作者(workers)就可以了。我們先啟動兩個接收端,等待消息接收,再啟動一個發送端進行消息發送。

技術分享圖片

我們增加運行一個消費端後的運行結果:

技術分享圖片

從圖中可知,我們循環發送4條信息,兩個消息接收端按順序被循環分配。
默認情況下,RabbitMQ將按順序將每條消息發送給下一個消費者。平均每個消費者將獲得相同數量的消息。這種分發消息的方式叫做循環(round-robin)。

3.3. 消息確認

按照我們上面的demo,一旦RabbitMQ將消息發送到消費端,消息就會立即從內存中移出,無論消費端是否處理完成。在這種情況下,消息就會丟失。

為了確保一個消息永遠不會丟失,RabbitMQ支持消息確認(message acknowledgments)。當消費端接收消息並且處理完成後,會發送一個ack(消息確認)信號到RabbitMQ,RabbitMQ接收到這個信號後,就可以刪除掉這條已經處理的消息任務。但如果消費端掛掉了(比如,通道關閉、連接丟失等)沒有發送ack信號。RabbitMQ就會明白某個消息沒有正常處理,RabbitMQ將會重新將消息入隊,如果有另外一個消費端在線,就會快速的重新發送到另外一個消費端。

RabbitMQ中沒有消息超時的概念,只有當消費端關閉或奔潰時,RabbitMQ才會重新分發消息。

微調下Receive中的代碼邏輯:

//5. 構造消費者實例
 var consumer = new EventingBasicConsumer(channel);
 //6. 綁定消息接收後的事件委托
 consumer.Received += (model, ea) =>
 {
     var message = Encoding.UTF8.GetString(ea.Body);
     Console.WriteLine(" [x] Received {0}", message);
     Thread.Sleep(6000);//模擬耗時
     Console.WriteLine(" [x] Done");
     // 7. 發送消息確認信號(手動消息確認)
     channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
 };
 //8. 啟動消費者
 //autoAck:true;自動進行消息確認,當消費端接收到消息後,就自動發送ack信號,不管消息是否正確處理完畢
 //autoAck:false;關閉自動消息確認,通過調用BasicAck方法手動進行消息確認
 channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer);

主要改動的是將 autoAck:true修改為autoAck:fasle,以及在消息處理完畢後手動調用BasicAck方法進行手動消息確認。

技術分享圖片

從圖中可知,消息發送端連續發送4條消息,其中消費端1先被分配處理第一條消息,消費端2被循環分配第二條消息,第三條消息由於沒有空閑消費者仍然在隊列中。
在消費端2未處理完第一條消息之前,手動中斷(ctrl+c)。我們可以發現RabbitMQ在下一次分發時,會優先將被中斷的消息分發給消費端1處理。

3.4. 消息持久化

消息確認確保了即使消費端異常,消息也不會丟失能夠被重新分發處理。但是如果RabbitMQ服務端異常,消息依然會丟失。除非我們指定durable:true,否則當RabbitMQ退出或奔潰時,消息將依然會丟失。通過指定durable:true,並指定Persistent=true,來告知RabbitMQ將消息持久化。

//send.cs
//4. 申明隊列(指定durable:true,告知rabbitmq對消息進行持久化)
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments
//將消息標記為持久性 - 將IBasicProperties.SetPersistent設置為true
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
//5. 構建byte消息數據包
string message = args.Length > 0 ? args[0] : "Hello RabbitMQ!";
var body = Encoding.UTF8.GetBytes(message);
//6. 發送數據包(指定basicProperties)
channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: properties, body: body);

將消息標記為持久性不能完全保證消息不會丟失。雖然它告訴RabbitMQ將消息保存到磁盤,但是當RabbitMQ接受消息並且還沒有保存時??,仍然有一個很短的時間窗口。RabbitMQ 可能只是將消息保存到了緩存中,並沒有將其寫入到磁盤上。持久化是不能夠一定保證的,但是對於一個簡單任務隊列來說已經足夠。如果需要確保消息隊列的持久化,可以使用publisher confirms.

3.5. 公平分發

RabbitMQ的消息分發默認按照消費端的數量,按順序循環分發。這樣僅是確保了消費端被平均分發消息的數量,但卻忽略了消費端的閑忙情況。這就可能出現某個消費端一直處理耗時任務處於阻塞狀態,某個消費端一直處理一般任務處於空置狀態,而只是它們分配的任務數量一樣。

技術分享圖片

但我們可以通過channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時,不再分配任務。

//Receive.cs
//4. 申明隊列
channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
//設置prefetchCount : 1來告知RabbitMQ,在未收到消費端的消息確認時,不再分發消息,也就確保了當消費端處於忙碌狀態時
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

這時你需要註意的是如果所有的消費端都處於忙碌狀態,你的隊列可能會被塞滿。你需要註意這一點,要麽添加更多的消費端,要麽采取其他策略。

4. Exchange

細心的你也許發現上面的demo,生產者和消費者直接是通過相同隊列名稱進行匹配銜接的。消費者訂閱某個隊列,生產者創建消息發布到隊列中,隊列再將消息轉發到訂閱的消費者。這樣就會有一個局限性,即消費者一次只能發送消息到某一個隊列。

那消費者如何才能發送消息到多個消息隊列呢?
RabbitMQ提供了Exchange,它類似於路由器的功能,它用於對消息進行路由,將消息發送到多個隊列上。Exchange一方面從生產者接收消息,另一方面將消息推送到隊列。但exchange必須知道如何處理接收到的消息,是將其附加到特定隊列還是附加到多個隊列,還是直接忽略。而這些規則由exchange type定義,exchange的原理如下圖所示。
技術分享圖片

常見的exchange type 有以下幾種:

  • direct(明確的路由規則:消費端綁定的隊列名稱必須和消息發布時指定的路由名稱一致)
  • topic (模式匹配的路由規則:支持通配符)
  • fanout (消息廣播,將消息分發到exchange上綁定的所有隊列上)

下面我們就來一一這介紹它們的用法。

4.1 fanout

本著先易後難的思想,我們先來了解下fanout的廣播路由機制。fanout的路由機制如下圖,即發送到 fanout 類型exchange的消息都會分發到所有綁定該exchange的隊列上去。

技術分享圖片

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用fanout exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "fanoutEC", type: "fanout");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到指定exchange,fanout類型無需指定routingKey
channel.BasicPublish(exchange: "fanoutEC", routingKey: "", basicProperties: null, body: body);

消費者示例代碼:

//申明fanout類型exchange
channel.ExchangeDeclare (exchange: "fanoutEC", type: "fanout");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到指定fanout類型exchange,無需指定路由鍵
channel.QueueBind (queue : queuename, exchange: "fanoutEC", routingKey: "");

4.2. direct

direct相對於fanout就屬於完全匹配、單播的模式,路由機制如下圖,即隊列名稱和消息發送時指定的路由完全匹配時,消息才會發送到指定隊列上。
技術分享圖片

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用direct exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "directEC", type: "direct");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到direct類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "directEC", routingKey: "green", basicProperties: null, body: body);

消費者示例代碼:

//申明direct類型exchange
channel.ExchangeDeclare (exchange: "directEC", type: "direct");
//綁定隊列到direct類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : green, exchange: "directEC", routingKey: "green");

4.3. topic

topic是direct的升級版,是一種模式匹配的路由機制。它支持使用兩種通配符來進行模式匹配:符號#和符號*。其中*匹配一個單詞, #則表示匹配0個或多個單詞,單詞之間用.分割。如下圖所示。
技術分享圖片

生產者示例代碼:

// 生成隨機隊列名稱
var queueName = channel.QueueDeclare().QueueName;
//使用topic exchange type,指定exchange名稱
channel.ExchangeDeclare(exchange: "topicEC", type: "topic");
var message = "Hello Rabbit!";
var body = Encoding.UTF8.GetBytes(message);
//發布到topic類型exchange,必須指定routingKey
channel.BasicPublish(exchange: "topicEC", routingKey: "first.green.fast", basicProperties: null, body: body);

消費者示例代碼:

//申明topic類型exchange
channel.ExchangeDeclare (exchange: "topicEC", type: "topic");
//申明隨機隊列名稱
var queuename = channel.QueueDeclare ().QueueName;
//綁定隊列到topic類型exchange,需指定路由鍵routingKey
channel.QueueBind (queue : queuename, exchange: "topicEC", routingKey: "#.*.fast");

5. RPC

RPC——Remote Procedure Call,遠程過程調用。
那RabbitMQ如何進行遠程調用呢?示意圖如下:
技術分享圖片
第一步,主要是進行遠程調用的客戶端需要指定接收遠程回調的隊列,並申明消費者監聽此隊列。
第二步,遠程調用的服務端除了要申明消費端接收遠程調用請求外,還要將結果發送到客戶端用來監聽的結果的隊列中去。

遠程調用客戶端:

//申明唯一guid用來標識此次發送的遠程調用請求
 var correlationId = Guid.NewGuid().ToString();
 //申明需要監聽的回調隊列
 var replyQueue = channel.QueueDeclare().QueueName;
 var properties = channel.CreateBasicProperties();
 properties.ReplyTo = replyQueue;//指定回調隊列
 properties.CorrelationId = correlationId;//指定消息唯一標識
 string number = args.Length > 0 ? args[0] : "30";
 var body = Encoding.UTF8.GetBytes(number);
 //發布消息
 channel.BasicPublish(exchange: "", routingKey: "rpc_queue", basicProperties: properties, body: body);
 Console.WriteLine($"[*] Request fib({number})");
 // //創建消費者用於處理消息回調(遠程調用返回結果)
 var callbackConsumer = new EventingBasicConsumer(channel);
 channel.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
 callbackConsumer.Received += (model, ea) =>
 {
      //僅當消息回調的ID與發送的ID一致時,說明遠程調用結果正確返回。
     if (ea.BasicProperties.CorrelationId == correlationId)
     {
         var responseMsg = $"Get Response: {Encoding.UTF8.GetString(ea.Body)}";
         Console.WriteLine($"[x]: {responseMsg}");
     }
 };

遠程調用服務端:

//申明隊列接收遠程調用請求
channel.QueueDeclare(queue: "rpc_queue", durable: false,
    exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
Console.WriteLine("[*] Waiting for message.");
//請求處理邏輯
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body);
    int n = int.Parse(message);
    Console.WriteLine($"Receive request of Fib({n})");
    int result = Fib(n);
    //從請求的參數中獲取請求的唯一標識,在消息回傳時同樣綁定
    var properties = ea.BasicProperties;
    var replyProerties = channel.CreateBasicProperties();
    replyProerties.CorrelationId = properties.CorrelationId;
    //將遠程調用結果發送到客戶端監聽的隊列上
    channel.BasicPublish(exchange: "", routingKey: properties.ReplyTo,
        basicProperties: replyProerties, body: Encoding.UTF8.GetBytes(result.ToString()));
    //手動發回消息確認
    channel.BasicAck(ea.DeliveryTag, false);
    Console.WriteLine($"Return result: Fib({n})= {result}");
};
channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer);

6. 總結

基於上面的demo和對幾種不同exchange路由機制的學習,我們發現RabbitMQ主要是涉及到以下幾個核心概念:

  1. Publisher:生產者,消息的發送方。
  2. Connection:網絡連接。
  3. Channel:信道,多路復用連接中的一條獨立的雙向數據流通道。
  4. Exchange:交換器(路由器),負責消息的路由到相應隊列。
  5. Binding:隊列與交換器間的關聯綁定。消費者將關註的隊列綁定到指定交換器上,以便Exchange能準確分發消息到指定隊列。
  6. Queue:隊列,消息的緩沖存儲區。
  7. Virtual Host:虛擬主機,虛擬主機提供資源的邏輯分組和分離。包含連接,交換,隊列,綁定,用戶權限,策略等。
  8. Broker:消息隊列的服務器實體。
  9. Consumer:消費者,消息的接收方。

文章來源:https://www.cnblogs.com/sheng-jie/p/7192690.html

RabbitMQ入門教程