一、RabbitMQ簡介
是一個開源的訊息代理和佇列伺服器,用來通過普通協議在完全不同的應用之間共享資料,RabbitMQ是使用Erlang(高併發語言)語言來編寫的,並且RabbitMQ是基於AMQP協議的。
1.1 AMQP協議
Advanced Message Queuing Protocol(高階訊息佇列協議)
1.2 AMQP專業術語:(多路複用->在同一個執行緒中開啟多個通道進行操作)
- Server:又稱broker,接受客戶端的連結,實現AMQP實體服務
- Connection:連線,應用程式與broker的網路連線
- Channel:網路通道,幾乎所有的操作都在channel中進行,Channel是進行訊息讀寫的通道。客戶端可以建立多個channel,每個channel代表一個會話任務。
- Message:訊息,伺服器與應用程式之間傳送的資料,由Properties和Body組成.Properties可以對訊息進行修飾,必須訊息的優先順序、延遲等高階特性;Body則是訊息體內容。
- virtualhost: 虛擬地址,用於進行邏輯隔離,最上層的訊息路由。一個virtual host裡面可以有若干個Exchange和Queue,同一個Virtual Host 裡面不能有相同名稱的Exchange 或 Queue。
- Exchange:交換機,接收訊息,根據路由鍵轉單訊息到繫結佇列
- Binding: Exchange和Queue之間的虛擬連結,binding中可以包換routing key
- Routing key: 一個路由規則,虛擬機器可用它來確定如何路由一個特定訊息。(如負載均衡)
1.3 RabbitMQ整體架構
ClientA(生產者)傳送訊息到Exchange1(交換機),同時帶上RouteKey(路由Key),Exchange1找到繫結交換機為它和繫結傳入的RouteKey的佇列,把訊息轉發到對應的佇列,消費者Client1,Client2,Client3只需要指定對應的佇列名既可以消費佇列資料。
交換機和佇列多對多關係,實際開發中一般是一個交換機對多個佇列,防止設計複雜化。
二、安裝RabbitMQ
安裝方式不影響下面的使用,這裡用Docker安裝
#15672埠為web管理端的埠,5672為RabbitMQ服務的埠
docker run -d --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 15672:15672 -p 5672:5672 rabbitmq:3-management
輸入:ip:5672訪問驗證。
建一個名為develop的Virtual host(虛擬主機)使用,專案中一般是一個專案建一個Virtual host用,能夠隔離佇列。
切換Virtual host
三、RabbitMQ六種佇列模式在.NetCore中使用
(1)簡單佇列
最簡單的工作佇列,其中一個訊息生產者,一個訊息消費者,一個佇列。也稱為點對點模式
描述:一個生產者 P 傳送訊息到佇列 Q,一個消費者 C 接收
建一個RabbitMQHelper.cs類
/// <summary>
/// RabbitMQ幫助類
/// </summary>
public class RabbitMQHelper
{
private static ConnectionFactory factory;
private static object lockObj = new object();
/// <summary>
/// 獲取單個RabbitMQ連線
/// </summary>
/// <returns></returns>
public static IConnection GetConnection()
{
if (factory == null)
{
lock (lockObj)
{
if (factory == null)
{
factory = new ConnectionFactory
{
HostName = "172.16.2.84",//ip
Port = 5672,//埠
UserName = "admin",//賬號
Password = "123456",//密碼
VirtualHost = "develop" //虛擬主機
};
}
}
}
return factory.CreateConnection();
}
}
生產者程式碼:
新建傳送類Send.cs
public static void SimpleSendMsg()
{
string queueName = "simple_order";//佇列名
//建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//建立通道
using (var channel = connection.CreateModel())
{//建立佇列
channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
for (var i = 0; i < 10; i++)
{
string message = $"Hello RabbitMQ MessageHello,{i + 1}";
var body = Encoding.UTF8.GetBytes(message);//傳送訊息
channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
Console.WriteLine($"傳送訊息到佇列:{queueName},內容:{message}");
}
}
}
}
建立佇列引數解析:
durable:是否持久化。
exclusive:排他佇列,只有建立它的連線(connection)能連,建立它的連線關閉,會自動刪除佇列。
autoDelete:被消費後,消費者數量都斷開時自動刪除佇列。
arguments:建立佇列的引數。
傳送訊息引數解析:
exchange:交換機,為什麼能傳空呢,因為RabbitMQ內建有一個預設的交換機,如果傳空時,就會用預設交換機。
routingKey:路由名稱,這裡用佇列名稱做路由key。
mandatory:true告訴伺服器至少將訊息route到一個佇列種,否則就將訊息return給傳送者;false:沒有找到路由則訊息丟棄。
執行效果:
佇列產生10條訊息。
消費者程式碼:
新建Recevie.cs類
public static void SimpleConsumer()
{
string queueName = "simple_order";
var connection = RabbitMQHelper.GetConnection();
{
//建立通道
var channel = connection.CreateModel();
{
//建立佇列
channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
int i = 0;
consumer.Received += (model, ea) =>
{
//消費者業務處理
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{i},執行緒id:{Thread.GetCurrentProcessorId()},佇列{queueName}消費訊息長度:{message.Length}");
i++;
};
channel.BasicConsume(queueName, true, consumer);
}
}
}
消費者只需要知道佇列名就可以消費了,不需要Exchange和routingKey。
注:消費者這裡有一個建立佇列,它本身不需要,是預防消費端程式先執行,沒有佇列會報錯。
執行效果:
訊息已經被消費完。
(2)工作佇列模式
一個訊息生產者,一個交換器,一個訊息佇列,多個消費者。同樣也稱為點對點模式
生產者P傳送訊息到佇列,多個消費者C消費佇列的資料。
工作佇列也稱為公平性佇列模式,迴圈分發,RabbitMQ 將按順序將每條訊息傳送給下一個消費者,每個消費者將獲得相同數量的訊息。
生產者:
Send.cs程式碼:
/// <summary>
/// 工作佇列模式
/// </summary>
public static void WorkerSendMsg()
{
string queueName = "worker_order";//佇列名
//建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//建立通道
using (var channel = connection.CreateModel())
{
//建立佇列
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
for ( var i=0;i<10;i++)
{
string message = $"Hello RabbitMQ MessageHello,{i+1}";
var body = Encoding.UTF8.GetBytes(message);
//傳送訊息到rabbitmq
channel.BasicPublish(exchange: "", routingKey: queueName, mandatory: false, basicProperties: null, body);
Console.WriteLine($"傳送訊息到佇列:{queueName},內容:{message}");
}
}
}
}
引數durable:true,需要持久化,實際專案中肯定需要持久化的,不然重啟RabbitMQ資料就會丟失了。
執行效果:
寫入10條資料,有持久化標識D。
消費端:
Recevie程式碼:
public static void WorkerConsumer()
{
string queueName = "worker_order";
var connection = RabbitMQHelper.GetConnection();
{
//建立通道
var channel = connection.CreateModel();
{
//建立佇列
channel.QueueDeclare(queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
//prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多於 N 個訊息,也確保了消費速度和效能
channel.BasicQos(prefetchSize: 0, prefetchCount:1, global: false);
int i = 1;
int index = new Random().Next(10);
consumer.Received += (model, ea) =>
{
//處理業務
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{i},消費者:{index},佇列{queueName}消費訊息長度:{message.Length}");
channel.BasicAck(ea.DeliveryTag, false); //訊息ack確認,告訴mq這條佇列處理完,可以從mq刪除了
Thread.Sleep(1000);
i++;
};
channel.BasicConsume(queueName,autoAck:false, consumer);
}
}
}
BasicQos引數解析:
prefetchSize:每條訊息大小,一般設為0,表示不限制。
prefetchCount:1,作用限流,告訴RabbitMQ不要同時給一個消費者推送多於N個訊息,消費者會把N條訊息快取到本地一條條消費,如果不設,RabbitMQ會進可能快的把訊息推到客戶端,導致客戶端記憶體升高。設定合理可以不用頻繁從RabbitMQ 獲取能提升消費速度和效能,設的太多的話則會增大本地記憶體,需要根據機器效能合理設定,官方建議設為30。
global:是否為全域性設定。
這些限流設定針對消費者autoAck:false時才有效,如果是自動Ack的,限流不生效。
執行兩個消費者,效果:
可以看到消費者號的標識,8,2,8,2是平均的,一個消費者5個,RabbitMQ上也能看到有2個消費者,Unacked數是2,因為每個客戶端的限流數是1。
工作佇列模式也是很常用的佇列模式。
(3)釋出訂閱模式
Pulish/Subscribe,無選擇接收訊息,一個訊息生產者,一個交換機(交換機型別為fanout),多個訊息佇列,多個消費者。稱為釋出/訂閱模式
在應用中,只需要簡單的將佇列繫結到交換機上。一個傳送到交換機的訊息都會被轉發到與該交換機繫結的所有佇列上。很像子網廣播,每臺子網內的主機都獲得了一份複製的訊息。
生產者P只需把訊息傳送到交換機X,繫結這個交換機的佇列都會獲得一份一樣的資料。
應用場景:適合於用同一份資料來源做不同的業務。
生產者程式碼:
/// <summary>
/// 釋出訂閱, 扇形佇列
/// </summary>
public static void SendMessageFanout()
{
//建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//建立通道
using (var channel = connection.CreateModel())
{
string exchangeName = "fanout_exchange";
//建立交換機,fanout型別
channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout);
string queueName1 = "fanout_queue1";
string queueName2 = "fanout_queue2";
string queueName3 = "fanout_queue3";
//建立佇列
channel.QueueDeclare(queueName1, false, false, false);
channel.QueueDeclare(queueName2, false, false, false);
channel.QueueDeclare(queueName3, false, false, false); //把建立的佇列繫結交換機,routingKey不用給值,給了也沒意義的
channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "");
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "");
channel.QueueBind(queue: queueName3, exchange: exchangeName, routingKey: "");
//向交換機寫10條訊息
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Fanout {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: "", null, body);
Console.WriteLine($"傳送Fanout訊息:{message}");
}
}
}
}
執行程式碼:
向交換機發送10條訊息,則繫結這個交換機的3個佇列都會有10條訊息。
消費端的程式碼和工作佇列的一樣,只需知道佇列名即可消費,宣告時要和生產者的宣告一樣。
(4)路由模式(推薦使用)
在釋出/訂閱模式的基礎上,有選擇的接收訊息,也就是通過 routing 路由進行匹配條件是否滿足接收訊息。
上圖是一個結合日誌消費級別的配圖,在路由模式它會把訊息路由到那些 binding key 與 routing key 完全匹配的 Queue 中,此模式也就是 Exchange 模式中的direct
模式。
生產者P傳送資料是要指定交換機(X)和routing傳送訊息 ,指定的routingKey=error,則佇列Q1和佇列Q2都會有一份資料,如果指定routingKey=into,或=warning,交換機(X)只會把訊息發到Q2佇列。
生產者程式碼:
/// <summary>
/// 路由模式,點到點直連佇列
/// </summary>
public static void SendMessageDirect()
{
//建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//建立通道
using (var channel = connection.CreateModel())
{
//宣告交換機物件,fanout型別
string exchangeName = "direct_exchange";
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct);
//建立佇列
string queueName1 = "direct_errorlog";
string queueName2 = "direct_alllog";
channel.QueueDeclare(queueName1, true, false, false);
channel.QueueDeclare(queueName2, true, false, false); //把建立的佇列繫結交換機,direct_errorlog佇列只繫結routingKey:error
channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: "error");
//direct_alllog佇列繫結routingKey:error,info
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "info");
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: "error");
//向交換機寫10條錯誤日誌和10條Info日誌
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Direct {i + 1} error Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: "error", null, body);
Console.WriteLine($"傳送Direct訊息error:{message}"); string message2 = $"RabbitMQ Direct {i + 1} info Message";
var body2 = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: "info", null, body2);
Console.WriteLine($"info:{message2}"); }
}
}
}
這裡建立一個direct型別的交換機,兩個路由key,一個error,一個info,兩個佇列,一個佇列只繫結error,一個佇列繫結error和info,向error和info各發10條訊息。
執行程式碼:
檢視RabbitMQ管理介面,direct_errorlog佇列10條,而direct_alllog有20條,因為direct_alllog佇列兩個routingKey的訊息都進去了。
點進去看下兩個佇列繫結的交換機和routingKey
消費者程式碼:
消費者和工作佇列一樣,只需根據佇列名消費即可,這裡只消費direct_errorlog佇列作示例
public static void DirectConsumer()
{
string queueName = "direct_errorlog";
var connection = RabbitMQHelper.GetConnection();
{
//建立通道
var channel = connection.CreateModel();
{
//建立佇列
channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);
var consumer = new EventingBasicConsumer(channel);
///prefetchCount:1來告知RabbitMQ,不要同時給一個消費者推送多於 N 個訊息,也確保了消費速度和效能
///global:是否設為全域性的
///prefetchSize:單條訊息大小,通常設0,表示不做限制
//是autoAck=false才會有效
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: true);
int i = 1;
consumer.Received += (model, ea) =>
{
//處理業務
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
Console.WriteLine($"{i},佇列{queueName}消費訊息長度:{message.Length}");
channel.BasicAck(ea.DeliveryTag, false); //訊息ack確認,告訴mq這條佇列處理完,可以從mq刪除了
i++;
};
channel.BasicConsume(queueName, autoAck: false, consumer);
}
}
}
普通場景中推薦使用路由模式,因為路由模式有交換機,有路由key,能夠更好的拓展各種應用場景。
(5)主題模式
topics(主題)模式跟routing路由模式類似,只不過路由模式是指定固定的路由鍵 routingKey,而主題模式是可以模糊匹配路由鍵 routingKey,類似於SQL中 = 和 like 的關係。
P 表示為生產者、 X 表示交換機、C1C2 表示為消費者,紅色表示佇列。
topics 模式與 routing 模式比較相近,topics 模式不能具有任意的 routingKey,必須由一個英文句點號“.”分隔的字串(我們將被句點號“.”分隔開的每一段獨立的字串稱為一個單詞),比如 "lazy.orange.a"。topics routingKey 中可以存在兩種特殊字元"*"與“#”,用於做模糊匹配,其中“*”用於匹配一個單詞,“#”用於匹配多個單詞(可以是零個)。
以上圖為例:
如果傳送訊息的routingKey設定為:
aaa.orange.rabbit,那麼訊息會路由到Q1與Q2,
routingKey=aaa.orange.bb的訊息會路由到Q1,
routingKey=lazy.aa.bb.cc的訊息會路由到Q2;
routingKey=lazy.aa.rabbit的訊息會路由到 Q2(只會投遞給Q2一次,雖然這個routingKey 與 Q2 的兩個 bindingKey 都匹配);
沒匹配routingKey的訊息將會被丟棄。
生產者程式碼:
public static void SendMessageTopic()
{
//建立連線
using (var connection = RabbitMQHelper.GetConnection())
{
//建立通道
using (var channel = connection.CreateModel())
{
//宣告交換機物件,fanout型別
string exchangeName = "topic_exchange";
channel.ExchangeDeclare(exchangeName, ExchangeType.Topic);
//佇列名
string queueName1 = "topic_queue1";
string queueName2 = "topic_queue2";
//路由名
string routingKey1 = "*.orange.*";
string routingKey2 = "*.*.rabbit";
string routingKey3 = "lazy.#";
channel.QueueDeclare(queueName1, true, false, false);
channel.QueueDeclare(queueName2, true, false, false); //把建立的佇列繫結交換機,routingKey指定routingKey
channel.QueueBind(queue: queueName1, exchange: exchangeName, routingKey: routingKey1);
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey2);
channel.QueueBind(queue: queueName2, exchange: exchangeName, routingKey: routingKey3);
//向交換機寫10條訊息
for (int i = 0; i < 10; i++)
{
string message = $"RabbitMQ Direct {i + 1} Message";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchangeName, routingKey: "aaa.orange.rabbit", null, body);
channel.BasicPublish(exchangeName, routingKey: "lazy.aa.rabbit", null, body);
Console.WriteLine($"傳送Topic訊息:{message}");
}
}
}
}
這裡演示了 routingKey為aaa.orange.rabbit,和lazy.aa.rabbit的情況,第一個匹配到Q1和Q2,第二個匹配到Q2,所以應該Q1是10條,Q2有20條,
執行後看rabbitMQ介面:
(6)RPC模式
與上面其他5種所不同之處,該模式是擁有請求/回覆的。也就是有響應的,上面5種都沒有。
RPC是指遠端過程呼叫,也就是說兩臺伺服器A,B,一個應用部署在A伺服器上,想要呼叫B伺服器上應用提供的處理業務,處理完後然後在A伺服器繼續執行下去,把非同步的訊息以同步的方式執行。
客戶端(C)宣告一個排他佇列自己訂閱,然後傳送訊息到RPC佇列同時也把這個排他佇列名也在訊息裡傳進去,服務端監聽RPC佇列,處理完業務後把處理結果傳送到這個排他佇列,然後客戶端收到結果,繼續處理自己的邏輯。
RPC的處理流程:
- 當客戶端啟動時,建立一個匿名的回撥佇列。
- 客戶端為RPC請求設定2個屬性:replyTo:設定回撥佇列名字;correlationId:標記request。
- 請求被髮送到rpc_queue佇列中。
- RPC伺服器端監聽rpc_queue佇列中的請求,當請求到來時,伺服器端會處理並且把帶有結果的訊息傳送給客戶端。接收的佇列就是replyTo設定的回撥佇列。
- 客戶端監聽回撥佇列,當有訊息時,檢查correlationId屬性,如果與request中匹配,那就是結果了。
服務端程式碼:
public class RPCServer
{
public static void RpcHandle()
{ var connection = RabbitMQHelper.GetConnection();
{
var channel = connection.CreateModel();
{
string queueName = "rpc_queue";
channel.QueueDeclare(queue: queueName, durable: false,
exclusive: false, autoDelete: false, arguments: null);
channel.BasicQos(0, 1, false);
var consumer = new EventingBasicConsumer(channel);
channel.BasicConsume(queue: queueName,
autoAck: false, consumer: consumer);
Console.WriteLine("【服務端】等待RPC請求..."); consumer.Received += (model, ea) =>
{
string response = null; var body = ea.Body.ToArray();
var props = ea.BasicProperties;
var replyProps = channel.CreateBasicProperties();
replyProps.CorrelationId = props.CorrelationId; try
{
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"【服務端】接收到資料:{ message},開始處理");
response = $"訊息:{message},處理完成";
}
catch (Exception e)
{
Console.WriteLine("錯誤:" + e.Message);
response = "";
}
finally
{
var responseBytes = Encoding.UTF8.GetBytes(response);
channel.BasicPublish(exchange: "", routingKey: props.ReplyTo,
basicProperties: replyProps, body: responseBytes);
channel.BasicAck(deliveryTag: ea.DeliveryTag,
multiple: false);
}
};
}
}
} }
客戶端:
public class RPCClient
{
private readonly IConnection connection;
private readonly IModel channel;
private readonly string replyQueueName;
private readonly EventingBasicConsumer consumer;
private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>();
private readonly IBasicProperties props; public RPCClient()
{
connection = RabbitMQHelper.GetConnection(); channel = connection.CreateModel();
replyQueueName = channel.QueueDeclare().QueueName;
consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties();
var correlationId = Guid.NewGuid().ToString();
props.CorrelationId = correlationId; //給訊息id
props.ReplyTo = replyQueueName;//回撥的佇列名,Client關閉後會自動刪除 consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
//監聽的訊息Id和定義的訊息Id相同代表這條訊息服務端處理完成
if (ea.BasicProperties.CorrelationId == correlationId)
{
respQueue.Add(response);
}
}; channel.BasicConsume(
consumer: consumer,
queue: replyQueueName,
autoAck: true);
} public string Call(string message)
{
var messageBytes = Encoding.UTF8.GetBytes(message);
//傳送訊息
channel.BasicPublish(
exchange: "",
routingKey: "rpc_queue",
basicProperties: props,
body: messageBytes);
//等待回覆
return respQueue.Take();
} public void Close()
{
connection.Close();
}
}
執行程式碼:
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
//啟動服務端,正常邏輯是在另一個程式
RPCServer.RpcHandle();
//例項化客戶端
var rpcClient = new RPCClient();
string message = $"訊息id:{new Random().Next(1, 1000)}";
Console.WriteLine($"【客服端】RPC請求中,{message}");
//向服務端傳送訊息,等待回覆
var response = rpcClient.Call(message);
Console.WriteLine("【客服端】收到回覆響應:{0}", response);
rpcClient.Close();
Console.ReadKey();
}
測試效果:
z執行完,客服端close後,可以接著自己的下一步業務處理。
總結:
以上便是RabbitMQ的6中模式在.net core中實際使用,其中(1)簡單佇列,(2)工作佇列,(4)路由模式,(6)RPC模式的交換機型別都是direct,(3)釋出訂閱的交換機是fanout,(5)topics的交換機是topic。正常場景用的是direct,預設交換機也是direct型別的,推薦用(4)路由模式,因為指定交換機名比起預設的交換機會容易擴充套件場景,其他的交換機看業務場景所需使用。
下面位置可以看到交換機型別,amq.開頭那幾個是內建的,避免交換機過多可以直接使用。