搭建RabbitMQ簡單通用的直連方法

如果還沒有MQ環境,可以參考上一篇的部落格,在windows系統上的rabbitmq環境搭建。如果使用docker環境,可以直接百度一下,應該就一個語句就可以搞定。使用windows環境安裝mq有關的教程地址:

https://www.cnblogs.com/weskynet/p/14877932.html

接下來開始.net core操作Rabbitmq有關的內容。我打算使用比較簡單的單機的direct直連模式,來演示一下有關操作,基本套路差不多。

首先,我在我的package包專案上面,新增對RabbitMQ.Client的引用:

在Common資料夾下,新建類庫專案 Wsk.Core.RabbitMQ,並且引用package專案:

在啟動專案下的appsettings配置檔案裡面,新增一個訪問RabbitMQ的配置資訊:

配置部分程式碼:

"MQ": [
{
"Host": "127.0.0.1", // MQ安裝的實際伺服器IP地址
"Port": 5672, // 服務埠號
"User": "wesky", // 使用者名稱
"Password": "wesky123", // 密碼
"ExchangeName": "WeskyExchange", // 設定一個Exchange名稱,
"Durable": true // 是否啟用持久化
}
]

然後,在實體類專案下,新建實體類MqConfigInfo,用於把讀取的配置資訊賦值到該實體類下:

實體類程式碼:

public class MqConfigInfo
{
public string Host { get; set; }
public int Port { get; set; }
public string User { get; set; }
public string Password { get; set; }
public string ExchangeName { get; set; }
public bool Durable { get; set; }
}

在剛剛新建的RabbitMQ類庫專案下面,引用該實體類庫專案,以及APppSettings專案。然後新建一個類,叫做ReadMqConfigHelper,以及它的interface介面,並且提供一個方法,叫ReadMqConfig,用來進行讀取配置資訊使用:

讀取配置資訊類程式碼:

public class ReadMqConfigHelper:IReadMqConfigHelper
{
private readonly ILogger<ReadMqConfigHelper> _logger;
public ReadMqConfigHelper(ILogger<ReadMqConfigHelper> logger)
{
_logger = logger;
}
public List<MqConfigInfo> ReadMqConfig()
{
try
{
List<MqConfigInfo> config = AppHelper.ReadAppSettings<MqConfigInfo>(new string[] { "MQ" }); // 讀取MQ配置資訊
if (config.Any())
{
return config;
}
_logger.LogError($"獲取MQ配置資訊失敗:沒有可用資料集");
return null;
}
catch (Exception ex)
{
_logger.LogError($"獲取MQ配置資訊失敗:{ex.Message}");
return null;
}
}
}

接著,新建類MqConnectionHelper以及介面IMqConnectionHelper,用於做MQ連線、建立生產者和消費者等有關操作:

然後,新增一系列建立連線所需要的靜態變數:

然後,設定兩個消費者佇列,用來測試。以及新增生產者連線有關的配置和操作:

然後,建立消費者連線方法:

其中,StartListener下面提供了事件,用於手動確認訊息接收。如果設定為自動,有可能導致訊息丟失:

然後,新增訊息釋出方法:

在interface接口裡面,新增有關的介面,用於等下依賴注入使用:

連線類部分的程式碼:

  public class MqConnectionHelper:IMqConnectionHelper
{ private readonly ILogger<MqConnectionHelper> _logger;
public MqConnectionHelper(ILogger<MqConnectionHelper> logger)
{
_logger = logger; _connectionReceiveFactory = new IConnectionFactory[_costomerCount];
_connectionReceive = new IConnection[_costomerCount];
_modelReceive = new IModel[_costomerCount];
_basicConsumer = new EventingBasicConsumer[_costomerCount]; } /*
備註:使用陣列的部分,是給消費端用的。目前生產者只設置了一個,消費者可能存在多個。
當然,有條件的還可以上RabbitMQ叢集進行處理,會更好玩一點。
*/
private static IConnectionFactory _connectionSendFactory; //RabbitMQ工廠 傳送端
private static IConnectionFactory[] _connectionReceiveFactory; //RabbitMQ工廠 接收端 private static IConnection _connectionSend; //連線 傳送端
private static IConnection[] _connectionReceive; //連線 消費端 public static List<MqConfigInfo> _mqConfig; // 配置資訊 private static IModel _modelSend; //通道 傳送端
private static IModel[] _modelReceive; //通道 消費端 private static EventingBasicConsumer[] _basicConsumer; // 事件 /* 設定兩個routingKey 和 佇列名稱,用來做測試使用*/
public static int _costomerCount = 2;
public static string[] _routingKey = new string[] {"WeskyNet001","WeskyNet002" };
public static string[] _queueName = new string[] { "Queue001", "Queue002" }; /// <summary>
/// 生產者初始化連線配置
/// </summary>
public void SendFactoryConnectionInit()
{
_connectionSendFactory = new ConnectionFactory
{
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
};
} /// <summary>
/// 生產者連線
/// </summary>
public void SendFactoryConnection()
{ if (null != _connectionSend && _connectionSend.IsOpen)
{
return; // 已有連線
}
_connectionSend = _connectionSendFactory.CreateConnection(); // 建立生產者連線 if (null != _modelSend && _modelSend.IsOpen)
{
return; // 已有通道
}
_modelSend = _connectionSend.CreateModel(); // 建立生產者通道 _modelSend.ExchangeDeclare(_mqConfig.FirstOrDefault().ExchangeName, ExchangeType.Direct); // 定義交換機名稱和型別(direct) } /// <summary>
/// 消費者初始化連線配置
/// </summary>
public void ReceiveFactoryConnectionInit()
{
var factories = new ConnectionFactory
{
HostName = _mqConfig.FirstOrDefault().Host,
Port = _mqConfig.FirstOrDefault().Port,
UserName = _mqConfig.FirstOrDefault().User,
Password = _mqConfig.FirstOrDefault().Password
}; for (int i = 0; i < _costomerCount; i++)
{
_connectionReceiveFactory[i] = factories; // 給每個消費者繫結一個連線工廠
}
} /// <summary>
/// 消費者連線
/// </summary>
/// <param name="consumeIndex"></param>
/// <param name="exchangeName"></param>
/// <param name="routeKey"></param>
/// <param name="queueName"></param>
public void ConnectionReceive(int consumeIndex, string exchangeName, string routeKey, string queueName)
{
_logger.LogInformation($"開始連線RabbitMQ消費者:{routeKey}"); if (null != _connectionReceive[consumeIndex] && _connectionReceive[consumeIndex].IsOpen)
{
return;
}
_connectionReceive[consumeIndex] = _connectionReceiveFactory[consumeIndex].CreateConnection(); // 建立消費者連線 if (null != _modelReceive[consumeIndex] && _modelReceive[consumeIndex].IsOpen)
{
return;
}
_modelReceive[consumeIndex] = _connectionReceive[consumeIndex].CreateModel(); // 建立消費者通道 _basicConsumer[consumeIndex] = new EventingBasicConsumer(_modelReceive[consumeIndex]); _modelReceive[consumeIndex].ExchangeDeclare(exchangeName, ExchangeType.Direct); // 定義交換機名稱和型別 與生產者保持一致 _modelReceive[consumeIndex].QueueDeclare(
queue: queueName, //訊息佇列名稱
durable: _mqConfig.FirstOrDefault().Durable, // 是否可持久化,此處配置在檔案中,預設全域性持久化(true),也可以自定義更改
exclusive: false,
autoDelete: false,
arguments: null
); // 定義消費者佇列 _modelReceive[consumeIndex].QueueBind(queueName, exchangeName, routeKey); // 佇列繫結給指定的交換機 _modelReceive[consumeIndex].BasicQos(0, 1, false); // 設定消費者每次只接收一條訊息 StartListener((model, ea) =>
{
byte[] message = ea.Body.ToArray(); // 接收到的訊息 string msg = Encoding.UTF8.GetString(message); _logger.LogInformation($"佇列{queueName}接收到訊息:{msg}");
Thread.Sleep(2000); _modelReceive[consumeIndex].BasicAck(ea.DeliveryTag, true);
}, queueName, consumeIndex); } /// <summary>
/// 消費者接收訊息的確認機制
/// </summary>
/// <param name="basicDeliverEventArgs"></param>
/// <param name="queueName"></param>
/// <param name="consumeIndex"></param>
private static void StartListener(EventHandler<BasicDeliverEventArgs> basicDeliverEventArgs, string queueName, int consumeIndex)
{
_basicConsumer[consumeIndex].Received += basicDeliverEventArgs;
_modelReceive[consumeIndex].BasicConsume(queue: queueName, autoAck: false, consumer: _basicConsumer[consumeIndex]); // 設定手動確認。 } /// <summary>
/// 訊息釋出
/// </summary>
/// <param name="message"></param>
/// <param name="exchangeName"></param>
/// <param name="routingKey"></param>
public static void PublishExchange(string message, string exchangeName, string routingKey = "")
{
byte[] body = Encoding.UTF8.GetBytes(message);
_modelSend.BasicPublish(exchangeName, routingKey, null, body);
} }

現在,我把整個Wsk.Core.RabbitMQ專案進行新增到依賴注入:

然後,在啟動專案裡面的初始化服務裡面,新增對MQ連線的初始化以及連線,並且傳送兩條訊息進行測試:

啟用程式,提示傳送成功:

開啟RabbitMQ頁面客戶端,可以看見新增了一個交換機WeskyExchange:

點進去可以看見對應的流量走勢:

關閉程式,現在新增消費者的初始化和連線,然後重新發送:

可見傳送訊息成功,並且消費者也成功接收到了訊息。開啟客戶端檢視一下:

在WeskyExchange交換機下,多了兩個佇列,以及佇列歸屬的RoutingKey分別是WeskyNet001和WeskyNet002。以及在Queue目錄下,多了兩個佇列的監控資訊:

為了看出點效果,我們批量發訊息試一下:

然後啟動專案,我們看一下監控效果。先是交換機頁面的監控:

然後是佇列1的監控:

現在換一種寫法,在消費者那邊加個延遲:

並且生產者的延遲解除:

再啟動一下看看效果:

會發現佇列訊息被堵塞,必須在執行完成以後,才可以解鎖。而且生產者這邊並不需要等待,可以看見訊息一次性全發出去了,可以繼續執行後續操作:

以上就是關於使用Direct模式進行RabbitMQ收發訊息的內容,傳送訊息可以在其他類裡面或者方法裡面,直接通過靜態方法進行傳送;接收訊息,啟動了監聽,就可以一直存活。如果有興趣,也可以自己嘗試Fanout、Topic等不同的模式進行測試,以及可以根據不同的機器,進行配置成收發到不同伺服器上面進行通訊。