備份交換器
備份交換器,英文名稱為Alternate Exchange,簡稱AE。通過在宣告交換器(呼叫channel.ExchangeDeclare方法)時新增alternate-exchange引數來實現。
備份交換器工作流程如下:
using (var channel = connection.CreateModel())
{
//設定備胎交換器引數
var arguments = new Dictionary<string, object>();
arguments.Add("alternate-exchange","myAe");
channel.ExchangeDeclare("normalExchange", "direct",true,false, arguments);
channel.ExchangeDeclare("myAe", "fanout", true, false);
channel.QueueDeclare("nromalQueue",true,false,false);
channel.QueueBind("nromalQueue", "normalExchange", "normalKey");
channel.QueueDeclare("unroutedQueue", true, false, false);
channel.QueueBind("unroutedQueue", "myAe","ae"); var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 2;
string message = "RabbitMQ Test"; //傳遞的訊息內容
channel.BasicPublish("normalExchange", "normalKey", properties, Encoding.UTF8.GetBytes(message)); //生產訊息
channel.BasicPublish("normalExchange", "un-routkey", properties, Encoding.UTF8.GetBytes(message)); //生產訊息 Console.WriteLine($"Send:{message}");
}
程式碼中聲明瞭兩個交換器normalExchange和myAe,分別綁定了normalQueue和unroutedQueue這兩個佇列,同時將myAe設定為normalExchange的備份交換器。myAe的交換器型別為fanout。同時生成兩條訊息,其中“un-routkey”並沒有定義對應路由。
執行效果:
切換到queues檢視,可以看到兩個佇列分別有一條需要消費的訊息。
備份交換器與普通的交換器沒有太大的區別,為了方便使用,建議設定為fanout型別
過期時間(TTL)
TTL,Time to Live的簡稱,即過期時間。RabbitMQ可以對佇列和訊息設定TTL。
設定佇列的TTL
channel.QueueDeclare方法中的x-expires引數可以設定佇列被自動刪除前處於未使用狀態的時間。未使用是指佇列上沒有任何的消費者,佇列也沒有被重新宣告過,並且其間內也未呼叫過Basic.Get命令。
var arguments = new Dictionary<string, object>();
arguments.Add("x-expires", 10000); //單位毫秒
channel.QueueDeclare(arguments: arguments);
RabbitMQ重啟後,持久化的佇列的過期時間會被重新計算。
設定訊息的TTL
訊息的過期時間有兩種設定方式。第一種是通過佇列屬性設定,佇列中所有訊息都有相同的過期時間。第二種方法是對訊息本身進行設定。
如果兩種方法一起使用,則訊息的TTL以兩者之間較小的那個數值為準。訊息在佇列中的生存時間一旦超過設定的TTL值,就會變成“死信”(Dead Message)
通過佇列屬性設定訊息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl引數實現的,這個引數的單位是毫秒。
arguments.Add("x-message-ttl", 6000); //單位毫秒
channel.QueueDeclare(arguments: arguments);
對訊息本身進行過期時間設定前面程式碼有涉及過,是通過BasicPublish方法的basicProperties引數指定。
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = 1;
properties.Priority = 2;
properties.Expiration = "6000";
var message = "RabbitMQ Test"; //傳遞的訊息內容
channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生產訊息
死信佇列
DLX,全稱為Dead-Letter-Exchange,即死信交換器。當一個訊息在佇列中變成死信(dead message),它就會被重新被髮送到DLX(死信交換器),繫結DLX的佇列就稱之為死信佇列。
通過在channel.QueueDeclare方法中設定x-dead-letter-exchange引數來為這個佇列新增DLX.
示例程式碼:
var arguments = new Dictionary<string, object>();
arguments.Add("x-expires", 10000); //單位毫秒
arguments.Add("x-message-ttl", 6000); //單位毫秒
channel.QueueDeclare(arguments: arguments); channel.ExchangeDeclare("dlx_exchange", "direct");
var argumentsDlx = new Dictionary<string, object>();
argumentsDlx.Add("x-dead-letter-exchange", "dlx_exchange");
argumentsDlx.Add("x-dead-letter-routing-key", "dlx-routing-key"); //為DLX指定路由鍵,如果沒有特殊指定,則使用原佇列的路由鍵
channel.QueueDeclare("dlx_queue",false,false,false,argumentsDlx);
可以看到dlx_queue有標記了DLX和DLK(Dead-Letter-Key)。
DLX是一個非常有用的特性。異常情況下,訊息不能夠被消費者正確消費(消費者呼叫了Basic.Nack或者Basic.Reject)而被置入死信佇列中,後續分析程式可以通過死信佇列中的內容來分析當時所遇到的異常情況,從而改善和優化系統。
延遲佇列
延遲佇列儲存的是對應的延遲訊息,“延遲訊息”並不想讓消費者立刻訊息,而是等待特定時間後,消費者才能拿到這個訊息進行消費。如一個訂單系統中,使用者下單30分鐘內沒有完成支付那麼就對該訂單進行異常處理,就可以使用延遲佇列來處理這些訂單。
RabbitMQ並支援延遲佇列的功能,但是我們可以通過DLX和TTL模擬出延遲佇列的功能。如下圖所示:
例項中設定了5秒、10秒、30秒、1分鐘四個延時等級。根據應用需求的不同,生產者在傳送訊息的時候設定不同的路由鍵,從而將訊息傳送到與交換器繫結的對應佇列中。這些佇列分別設定了過期時間為5秒、10秒、30秒、1分鐘,同時也分別配置了DLX和相應的死信佇列。佇列中訊息過期時,就會轉存到相應的死信佇列(即延遲佇列)中,在根據業務自身的情況,分別選擇不同延遲等級的延遲佇列進行消費。
優先順序佇列
優先順序佇列,有高優先順序的佇列具有高的優先權,優先順序高的訊息有優先被消費的特權。
可以通過設定佇列的x-max-priority引數來實現。並在傳送訊息時通過Priority設定訊息優先順序。
using (var channel = connection.CreateModel())
{ channel.ExchangeDeclare("priority_exchange", "direct");
var arguments = new Dictionary<string, object>();
arguments.Add("x-max-priority", 10); //
channel.QueueDeclare("priority_queue", false, false, false, arguments);
channel.QueueBind("priority_queue", "priority_exchange", "priority_key"); var properties = channel.CreateBasicProperties();
properties.Priority = 2;
string message = "Priority 2"; //傳遞的訊息內容
channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生產訊息
properties.Priority = 5;
message = "Priority 5"; //傳遞的訊息內容
channel.BasicPublish("priority_exchange", "priority_key", properties, Encoding.UTF8.GetBytes(message)); //生產訊息
var result = channel.BasicGet("priority_queue", false);
channel.BasicAck(result.DeliveryTag, true);
Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}"); }
執行上面的程式碼,不管我們如何調整生成訊息的順序,channel.BasicGet取出來的始終是Priority為5的那條訊息。
Github
示例程式碼地址:https://github.com/MayueCif/RabbitMQ