1. 程式人生 > >通過集群的方式解決基於MQTT協議的RabbitMQ消息收發

通過集群的方式解決基於MQTT協議的RabbitMQ消息收發

其中 enc msg received 127.0.0.1 結束 技術 tms gpu

在完成了基於AMQP協議的RabbitMQ消息收發後,我們要繼續實現基於MQTT協議的RabbitMQ消息收發。

由於C#的RabbitMQ.Client包中只實現了基於AMQP協議的消息收發功能的封裝,所以要實現基於MQTT協議的收發,我們要下載新的包。

在NuGet的解決方案中,我們選擇了簡單實用的M2Mqtt。

關於M2Mqtt的資料,可以參考: https://m2mqtt.wordpress.com/ https://github.com/eclipse/paho.mqtt.m2mqtt

消費者代碼:

技術分享圖片
using System;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages; namespace MQTTDemo { class Client { static void Main() { // create client instance MqttClient client = new MqttClient("127.0.0.1"); // register to message received client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
string clientId = Guid.NewGuid().ToString(); client.Connect(clientId); client.Subscribe(new string[] { "test" }, new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE }); } static void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e) {
string msg = System.Text.Encoding.Default.GetString(e.Message); Console.WriteLine(msg); } } }
View Code

生產者代碼:

技術分享圖片
using System;
using System.Text;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

namespace MQTTServer
{
    class Server
    {
        static void Main()
        {
            // create client instance 
            MqttClient client = new MqttClient("127.0.0.1");

            string clientId = Guid.NewGuid().ToString();
            client.Connect(clientId);

            client.Publish("test", Encoding.UTF8.GetBytes("hello"), MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, false);

            Console.WriteLine("Publish!!!");

            Console.ReadKey();
            client.Disconnect();
        }
    }
}
View Code

消費者監聽的隊列名會基於產生的Guid進行前後封裝,“test”表示的是topic值,選擇QOS_LEVEL_AT_MOST_ONCE而不是QOS_LEVEL_EXACTLY_ONCE是因為測試發現QOS_LEVEL_EXACTLY_ONCE消息會被收到多次(我也不知道為啥)。

消費者監聽的隊列會在消費者程序結束後自動刪除,生產者不產生隊列。

在rabbitmq-plugins enable rabbitmq_mqtt之後,我們就可以愉快地通過MQTT收發消息了。

然而,我們發現只能通過127.0.0.1和localhost訪問RabbitMQ服務器,而本機IP訪問失敗。

查閱了大量資料後,我發現這是由於rabbitmq默認的config中有這麽一段文字,所以我們之能在localhost中訪問服務器。

%% The default "guest" user is only permitted to access the server
%% via a loopback interface (e.g. localhost).
%% {loopback_users, [<<"guest">>]},

所以我們取消了{loopback_users, []}的註釋

%% Uncomment the following line if you want to allow access to the
%% guest user from anywhere on the network.
%% {loopback_users, []},

值得註意的是,由於我們在config中僅僅取消了一行註釋,所以這段代碼是整個代碼塊的最後一行。於是我們應該將句末的逗號一同去掉。

然而,我發現怎麽更改默認啟動的rabbitmq對應的comfig文件,都無法成功地使用我更改後的config文件,察看了log發現用的是不存在的rabbitmq.conf文件。

修改成rabbitmq.conf後服務啟動失敗,所以我放棄了直接在默認啟動服務中更改。

由於之前配置過rabbitmq集群,所以我打算采用集群的方式解決問題。

操作可以參考https://www.cnblogs.com/lucifer1997/p/9324130.html,其中我將ClusterNode1改為了mqtt,同時在rabbitmq-mqtt.config中對{loopback_users, []}進行了更改。

如果要修改默認的mqtt用戶、密碼、虛擬用戶、交換機信息,可以參照http://www.rabbitmq.com/mqtt.html在rabbitmq-mqtt.config中進行修改。

在命令行操作之前先把原來開啟的rabbitmq_mqtt停用,避免兩個服務同時監聽1883端口導致報錯。 rabbitmq-plugins disable rabbitmq_mqtt

同時在操作了rabbitmq-plugins-mqtt enable rabbitmq_management之後執行rabbitmq-plugins-mqtt enable rabbitmq_mqtt。

如此就可以在集群後實現遠程MQTT收發,同時還可以實現AMQP與MQTT之間的收發。

通過集群的方式解決基於MQTT協議的RabbitMQ消息收發