開發中關鍵的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等,與RabbitMQ相關的開發工作,基本上是圍繞Connection和Channel這兩個類展開的。

連線RabbitMQ

一個Connection可以建立多個Channel例項,但Channel例項不能線上程間共享,應用程式應該為每一個執行緒開闢一個Channel。

Channel或者Connection中有個isOpen方法可以用來檢測其是否已處於開啟狀態。但並不推薦使用,這個方法的返回值依賴於shutdownCause的存在,有可能會產生競爭。更多的是捕獲ShutdownSignalException,IOException或SocketException等異常判斷RabbitMq的連線狀態。

實際操作過程中遇到BrokerUnreachableException異常

因為我使用的賬號是guest,guest賬號預設是不支援遠端連線,需要在http://localhost:15672(前提是安裝了web外掛)的Admin選項卡中新增一個新使用者(或者使用命令列新增)。

安裝web外掛

rabbitmq-plugins enable rabbitmq_management

新增新使用者:

sudo rabbitmqctl add_user  user_admin  passwd_admin

如上圖所示,新新增的使用者沒有任何許可權,需要點選使用者名稱設定許可權。

示例程式碼:

var factory = new ConnectionFactory {
HostName = "localhost", //主機名
UserName = "mymq", //預設使用者名稱
Password = "123456", //預設密碼
RequestedHeartbeat = TimeSpan.FromSeconds(30)
}; using (var connection = factory.CreateConnection())//連線伺服器
{
//建立一個通道
using (var channel = connection.CreateModel())
{
channel.QueueDeclare("stacking", false, false, false, null);//建立訊息佇列
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
string message = "RabbitMQ Test"; //傳遞的訊息內容
channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產訊息
Console.WriteLine($"Send:{message}");
}
}

在管理介面處看到訊息插入成功

使用新加的賬號連結MQ還會提示BrokerUnreachableException異常,很納悶。折騰了半天把WSL升級到WSL2就連結成功。

交換器和佇列

交換器和佇列是應用層面的構建模組,使用前應對其進行聲明確保其存在。

 var exchangeName = "exchange_name";
channel.ExchangeDeclare(exchangeName, "direct", true);//建立一個持久化的、非自動刪除的、繫結型別為direct的交換器
var queueName = channel.QueueDeclare().QueueName; //建立一個非持久化的、排他的、自動刪除的佇列(佇列名由RabbitMQ自動生成)
channel.QueueBind(queueName, exchangeName, "routing_key"); //使用路由鍵(routing_key)將佇列和交換器繫結 channel.QueueDeclare("queue_name", true); // QueueDeclare擁有多個過載

ExchangeDeclare方法詳解

各個引數詳細說明如下:

exchange:交換器的名稱。

type:交換器的型別,常見的如fanout、direct、topic。

durable:設定是否持久化。durable設定為true表示持久化,反之是非持久化。持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊。

autoDelete:設定是否自動刪除。autoDelete設定為true則表示自動刪除。自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結,之後所有與這個交換器繫結的佇列或者交換器都與此解綁才會刪除。

internal:設定是否是內建的。如果設定為true,則表示是內建的交換器,客戶端程式無法直接傳送訊息到這個交換器中,只能通過交換器路由到交換器這種方式。

argument:其他一些結構化引數

QueueDeclareNoWait方法實現設定了一個nowait引數(AMQP中Exchange.Declare命令的引數),意思是不需要等待服務區返回結果。

ExchangeDeclarePassive方法用來檢測相應的交換器是否存在。如果存在則正常返回;如果不存在則丟擲異常。

QueueDeclare方法詳解

方法的引數詳細說明如下:

queue:佇列的名稱。

durable:設定是否持久化。為true則設定佇列為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊。

exclusive:設定是否排他。為true則設定佇列為排他的。

autoDelete:設定是否自動刪除。為true則設定佇列為自動刪除。自動刪除的前提是:至少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除。

arguments:設定佇列的其他一些引數,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。

如果一個佇列被宣告為排他佇列,則該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。需要注意:排他佇列是基於連線(Connection)可見的,同一個連線的不同通道(Channel)可以訪問同一連線建立的排他佇列;“首次”是指如果一個連線已經聲明瞭一個排他佇列,其他連線不允許再建立同名的排他佇列;即使該佇列是持久化的,一旦連線關閉或客戶端退出,該排他佇列都會被自動刪除,這種佇列適用於一個客戶端同時傳送和讀取訊息的應用場景。

QueueDeclareNoWait方法實現設定了一個nowait引數,意思是不需要等待服務區返回結果。

QueueDeclarePassive方法用來檢測相應的佇列是否存在。如果存在則正常返回;如果不存在則丟擲異常。

QueueBind方法詳解

方法中涉及的引數:

queue:佇列名稱;

exchange:交換器的名稱;

routingKey:用來繫結佇列和交換器的路由鍵;

argument:定義繫結的一些引數。

ExchangeBind方法詳解

不僅可以將交換器與佇列繫結,也可以將交換器與交換器繫結。繫結之後,訊息從source交換器轉發到destination交換器

方法中涉及的引數:

destination:目的交換器名;

source:源交換器的名稱;

routingKey:用來繫結佇列和交換器的路由鍵;

argument:定義繫結的一些引數。

交換器的使用並不會真正耗費伺服器的效能,而佇列會。要衡量RabbitMQ當前的QPS只需看佇列的即可。

傳送訊息

BasicPublish方法用來發送一條訊息到。為了更好地控制傳送,可以使用mandatory這個引數

對應的具體引數解釋如下:

exchange:交換器的名稱,指明訊息需要傳送到哪個交換器中。如果設定為空字串,則訊息會被髮送到RabbitMQ預設的交換器中。

routingKey:路由鍵,交換器根據路由鍵將訊息儲存到相應的佇列之中。

basicProperties:訊息的基本屬性集,其包含14個屬性成員,分別有contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。

byte[] body:訊息體(payload),真正需要傳送的訊息。

mandatory:設定為true時,如果exchange根據自身型別和訊息routeKey無法找到一個符合條件的queue,會呼叫basic.return方法將訊息返還給生產者;設為false時,出現上述情形broker會直接將訊息扔掉。

豐富了一下第一部分的程式碼:

var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
properties.Priority = 2;
properties.ContentType = "text/plain";
properties.Expiration = "60000";
string message = "RabbitMQ Test"; //傳遞的訊息內容
channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產訊息

消費訊息

RabbitMQ的消費模式分兩種:推(Push)模式和拉(Pull)模式。推模式採用Basic.Consume進行消費,而拉模式則是呼叫Basic.Get進行消費。

推模式

推模式接收訊息需要例項化一個EventingBasicConsumer類,訂閱Received事件來接收訊息。EventingBasicConsumer實現了DefaultBasicConsumer類,實際使用中如果不滿足需求可以繼承該類。

示例程式碼:

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
var body = ea.Body.ToArray();
Console.WriteLine($"Received:{Encoding.UTF8.GetString(body)}");
channel.BasicAck(ea.DeliveryTag, false);
};
var consumerTag = channel.BasicConsume("stacking", false, consumer);

BasicConsume方法對應的引數說明如下:

queue:佇列的名稱;

autoAck:設定是否自動確認。建議設成false,即不自動確認;

consumerTag:消費者標籤,用來區分多個消費者;

arguments:設定消費者的其他引數;

callback:設定消費者的回撥函式。

BasicConsume返回字串型別consumerTag,可以通過呼叫channel.BasicCancel(consumerTag)顯式地取消一個消費者的訂閱。BasicCancel方法會首先觸發HandleConsumerOk方法,之後觸發HandleDelivery方法,最後才觸發HandleCancelOk方法.

拉模式

拉模式通過channel.BasicGet方法可以單條地獲取訊息。

示例程式碼:

var result = channel.BasicGet("stacking",false);
Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}");
channel.BasicAck(result.DeliveryTag, false);

Basic.Consume將通道(Channel)置為接收模式,直到取消佇列的訂閱,RabbitMQ會不斷地推送訊息給消費者,當然推送訊息的個數還是會受到Basic.Qos的限制。如果只想從佇列獲得單條訊息而不是持續訂閱,建議使用Basic.Get進行消費。但是不能將Basic.Get放在一個迴圈裡來代替Basic.Consume,這樣做會嚴重影響RabbitMQ的效能。

消費端的確認與拒絕

為了保證訊息從佇列可靠地達到消費者,RabbitMQ提供了訊息確認機制(message acknowledgement)。消費者在訂閱佇列時指定autoAck引數,當autoAck為false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上是先打上刪除標記,之後再刪除)。當autoAck為true時,RabbitMQ會自動把傳送出去的訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些訊息。

當autoAck引數設定為false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題。

當autoAck引數設定為false,對於RabbitMQ服務端而言,佇列中的訊息分為兩部分:一部分是等待投遞給消費者的訊息;一部分是已投遞給消費者,但是還沒有收到消費者確認訊號的訊息。如果RabbitMQ一直沒有收到消費者的確認訊號,並且此訊息的消費者斷開連線,則RabbitMQ會安排該訊息重新進入佇列,等待投遞給下一個消費者(可能還是原來的那個消費者 ),並且RabbitMQ不會為未確認的訊息設定過期時間。

消費訊息時autoAck引數設定為false需要主動呼叫channel.BasicAck對訊息進行確認,以便RabbitMQ刪除訊息,對應的也可以呼叫channel.BasicReject方法拒絕訊息,由其他消費端處理或者丟棄。

deliveryTag可以看作訊息的編號。如果requeue引數設定為true,則RabbitMQ會重新將這條訊息存入佇列;如果requeue引數設定為false,則RabbitMQ立即會把訊息從佇列中移除,不會把它傳送給新的消費者。

Basic.Reject命令一次只能拒絕一條訊息,如果想要批量拒絕訊息,則可以使用Basic.Nack這個命令,對應的實現方法為channel.BasicNack.

其中deliveryTag和requeue的含義可以參考BasicReject方法。multiple引數設定為false則表示僅拒絕編號為deliveryTag的單條訊息;multiple引數設定為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的訊息。

channel.BasicReject或者channel.BasicNack中的requeue設定為false,可以啟用“死信佇列”的功能。死信佇列可以通過檢測被拒絕或者未送達的訊息來追蹤問題。

關閉連線

可以顯示的呼叫Connection和Channel的Close方法來關閉連線,也可以藉助using來管理連線。

Connection和Channel所具備的生命週期如下:

Open:開啟狀態,代表當前物件可以使用。

Closing:正在關閉狀態。當前物件被顯式地通知呼叫關閉方法(shutdown),這樣就產生了一個關閉請求讓其內部物件進行相應的操作,並等待這些關閉操作的完成。

Closed:已經關閉狀態。當前物件已經接收到所有的內部物件已完成關閉動作的通知,並且其也關閉了自身。

在Connection和Channel中都定義了對應實現監聽狀態的改變。

Connection

Channel

Github

示例程式碼地址:https://github.com/MayueCif/RabbitMQ