1. 程式人生 > >使用Surging Mqtt 開發基於WS的MqttClient客戶端

使用Surging Mqtt 開發基於WS的MqttClient客戶端

ges model directory 一個 protoc zookeepe mat dot data-

原文:使用Surging Mqtt 開發基於WS的MqttClient客戶端

最近一段時間由於要做一套智能設備系統,而有幸了解到Surging中的Mqtt broker,學習了很多東西本篇文章基於Surging開源的.netcore項目有興趣的朋友可點擊此處進行了解。話不多說我們來基於Surging 中的WS與MqttClient結合來開發服務端MqttClient的使用。

準備工作

開發環境: Visual Studio 2017 15.9.5

.netCore版本:2.2.102(目前Surging已經升級至netcore 2.2版本)

surging項目下載地址 https://github.com/dotnetcore/surging

開始工作

接口部分

新建類庫Surging.IModuleServices.MqttWithWS

添加引用Surging.Core.Protocol.WS

新建文件夾Model 並創建類MqttClientOption.cs 此類為讀取surgingSettings.json中配置的MqttClient的相關參數MqttClientOption的代碼如下

public class MqttClientOption
    {
        public string ClientID { get; set; }
        public string MqttClientConnection { get
; set; } = ""; public string MqttClientUserName { get; set; } public string MqttClientPassword { get; set; } public int Port { get; set; } public int KeepAlivedTime { get; set; } public List<string> Topics { get; set; } = new List<string>();
public bool CleanSession { get; set; } }

surgingSettings.json的MqttClient配置代碼參見底部Surging.MqttClientWithWsServices.Server中的配置

根據Surging作者創建的例子來創建一個接口IChatService.cs

using Surging.Core.CPlatform.Ioc;
using Surging.Core.CPlatform.Runtime.Client.Address.Resolvers.Implementation.Selectors.Implementation;
using Surging.Core.CPlatform.Runtime.Server.Implementation.ServiceDiscovery.Attributes;
using Surging.Core.CPlatform.Support.Attributes;
using Surging.Core.Protocol.WS.Attributes;
using Surging.IModuleServices.MqttWithWS.Models;
using System;
using System.Threading.Tasks;

namespace Surging.IModuleServices.MqttWithWS
{
    [ServiceBundle("Api/{Service}")]
    [BehaviorContract(IgnoreExtensions = true)]
    public interface IChatService: IServiceKey
    {
        [Command(ShuntStrategy = AddressSelectorMode.HashAlgorithm)]
        Task RunMqttClient(MqttClientOption mqttClientOption);
    }
}

結構如圖所示

技術分享圖片

接口實現部分

新建類庫Surging.Modules.WSWithMqtt

添加引用Surging.IModuleServices.MqttWithWS

添加依賴項:MQTTnet

由於要獲取surgingSettings.json中的配置項,因此需要創建一個文件夾Configurations並在文件夾下創建SurgingMqttBuilderExtendsions.cs類代碼如下:

using Autofac;
using Microsoft.Extensions.Configuration;
using Surging.Core.CPlatform;
using Surging.Core.ServiceHosting.Internal;
using Surging.IModuleServices.MqttWithWS;
using Surging.IModuleServices.MqttWithWS.Models;
using System;
using System.Collections.Generic;
using System.Text;

namespace Surging.Modules.WSWithMqtt.Configurations
{
  public static class SurgingMqttBuilderExtendsions
    {
        public static IServiceHostBuilder UseMqttClient(this IServiceHostBuilder builder)
        {
            builder.MapServices(collection =>
            {
                MqttClientOption mqttClientOption = new MqttClientOption();
                var section = AppConfig.GetSection("MqttClient");
                if (section.Exists())
                    mqttClientOption = section.Get<MqttClientOption>();
                collection.Resolve<IChatService>().RunMqttClient(mqttClientOption);
            });
            return builder;
        }
    }
}

由於此服務基於WSServiceBase 因此我創建了MqttWSServieBase繼承自WSServiceBase用於創建MqttClient客戶端,代碼如下:

using MQTTnet;
using MQTTnet.Client;
using Surging.Core.Protocol.WS;
using Surging.IModuleServices.MqttWithWS.Models;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace Surging.Modules.WSWithMqtt
{
    public abstract class MqttWSServieBase : WSServiceBase
    {
        protected IMqttClientOptions _mqttClientOptions;
        protected IMqttClient _mqttClient;
        protected IEnumerable<TopicFilter> _topicFilters;
        public Task RunMqttClientBase(MqttClientOption mqttClientOption)
        {
            var clientOptions = new MqttClientOptionsBuilder();
            if (mqttClientOption != null)
            {
                clientOptions.WithClientId(mqttClientOption.ClientID + Guid.NewGuid().ToString("N"))
                .WithCleanSession(mqttClientOption.CleanSession);
                clientOptions.WithTcpServer(mqttClientOption.MqttClientConnection, mqttClientOption.Port);
                if (!string.IsNullOrWhiteSpace(mqttClientOption.MqttClientUserName))
                    clientOptions.WithCredentials(mqttClientOption.MqttClientUserName, mqttClientOption.MqttClientPassword);
                clientOptions.WithKeepAlivePeriod(TimeSpan.FromSeconds(mqttClientOption.KeepAlivedTime));
            }
            _mqttClientOptions = clientOptions.Build();
            IList<TopicFilter> filters = new List<TopicFilter>();
            if (mqttClientOption != null)
            {
                foreach (var item in mqttClientOption.Topics)
                {
                    var topicFilerbuilder = new TopicFilterBuilder();
                    topicFilerbuilder.WithTopic(item);
                    filters.Add(topicFilerbuilder.Build());
                }
            }
            _topicFilters = filters;
            _mqttClient = new MqttFactory().CreateMqttClient();
            return Task.CompletedTask;
        }
    }
}

接口繼承類ChatService,此類用於連接WS,並通過此連接對Surging的 Mqtt Broker進行發布訂閱。 代碼如下:

using MQTTnet;
using MQTTnet.Client;
using Surging.IModuleServices.MqttWithWS;
using Surging.IModuleServices.MqttWithWS.Models;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Threading.Tasks;
using WebSocketCore;

namespace Surging.Modules.WSWithMqtt
{
    public class ChatService : MqttWSServieBase, IChatService
    {
        private static readonly ConcurrentDictionary<string, string> _users = new ConcurrentDictionary<string, string>();
        private static readonly ConcurrentDictionary<string, string> _clients = new ConcurrentDictionary<string, string>();
        private string _name;
        private string _to;


        protected override void OnMessage(MessageEventArgs e)
        {
            if (_clients.ContainsKey(ID))
            {
                Dictionary<string, object> model = new Dictionary<string, object>();
                model.Add("name", _to);
                model.Add("data", e.Data);
                //var result = ServiceLocator.GetService<IServiceProxyProvider>()
                //     .Invoke<object>(model, "api/chat/SendMessage").Result;

            }
        }

        protected override void OnOpen()
        {
            _name = Context.QueryString["name"];
            _to = Context.QueryString["to"];
            if (!string.IsNullOrEmpty(_name))
            {
                _clients[ID] = _name;
                _users[_name] = ID;
            }
        }
        public Task SendMessage(string name, string data)
        {
            if (_users.ContainsKey(name))
            {
                this.GetClient().SendTo($"hello,{name},{data}", _users[name]);
            }
            return Task.CompletedTask;
        }
        public async Task RunMqttClient(MqttClientOption mqttClientOption)
        {
           await base.RunMqttClientBase(mqttClientOption);
            _mqttClient.ApplicationMessageReceived += _mqttClient_ApplicationMessageReceived;
            _mqttClient.Connected += _mqttClient_Connected;
            _mqttClient.Disconnected += _mqttClient_Disconnected;
            await _mqttClient.ConnectAsync(_mqttClientOptions);
        }
        #region mqttClient
        /// <summary>
        /// 接收消息
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void _mqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)
        {
            if (e.ApplicationMessage != null)
            {

            }
        }
        /// <summary>
        /// 斷開連接
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private async void _mqttClient_Disconnected(object sender, MqttClientDisconnectedEventArgs e)
        {
            Console.WriteLine("mqtt客戶端與服務端斷開連接!");
            await Task.Delay(TimeSpan.FromSeconds(5));
            try
            {
                await _mqttClient.ConnectAsync(_mqttClientOptions);
            }
            catch
            {
                Console.WriteLine("mqtt客戶端與服務端嘗試連接失敗!");
            }
        }
        /// <summary>
        /// 連接成功
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void _mqttClient_Connected(object sender, MqttClientConnectedEventArgs e)
        {
            _mqttClient.SubscribeAsync(_topicFilters);
        }
        #endregion
    }
}

結構如圖

技術分享圖片

創建控制臺應用程序,用於項目啟動

新建控制臺程序Surging.MqttClientWithWsServices.Server將Surging.Services.Server中的配置文件cacheSettings.json、eventBusSettings.json、log4net.config、NLog.config、以及surgingSettings.json文件都拷貝到新建的這個控制臺程序中。

添加依賴項:Autofac Autofac.Extensions.DependencyInjection System.Text.Encoding.CodePages Microsoft.Extensions.Logging Microsoft.Extensions.Logging.Console

添加引用

技術分享圖片

修改Program.cs

using Autofac;
using Microsoft.Extensions.Logging;
using Surging.Core.Caching.Configurations;
using Surging.Core.CPlatform;
using Surging.Core.CPlatform.Configurations;
using Surging.Core.CPlatform.Utilities;
using Surging.Core.Nlog;
using Surging.Core.ProxyGenerator;
using Surging.Core.ServiceHosting;
using Surging.Core.ServiceHosting.Internal.Implementation;
using Surging.Modules.WSWithMqtt.Configurations;
using System;
using System.Text;

namespace Surging.MqttClientWithWsServices.Server
{
    class Program
    {
        static void Main(string[] args)
        {
            Encoding.RegisterProvider(CodePagesEncodingProvider.Instance);
            var host = new ServiceHostBuilder()
                .RegisterServices(builder =>
                {
                    builder.AddMicroService(option =>
                    {
                        option.AddServiceRuntime()
                        .AddClientProxy()
                            .AddRelateService()
                            .AddConfigurationWatch();
                        builder.Register(p => new CPlatformContainer(ServiceLocator.Current));
                    });
                })
                .ConfigureLogging(logger =>
                {
                    logger.AddConfiguration(
                        Surging.Core.CPlatform.AppConfig.GetSection("Logging"));
                })
                .UseNLog(LogLevel.Debug, "NLog.config")
                .UseServer(options => { })
                .UseConsoleLifetime()
                .UseMqttClient()
                .UseProxy()
                .Configure(build =>
                    build.AddCacheFile("${cachepath}|cacheSettings.json", optional: false, reloadOnChange: true))
                .Configure(build =>
                    build.AddCPlatformFile("${surgingpath}|surgingSettings.json", optional: false, reloadOnChange: true))
                .UseStartup<Startup>()
                .Build();

            using (host.Run())
            {
                Console.WriteLine($"服務端啟動成功,{DateTime.Now}。");
            }
        }
    }
}

註意添加是需要加一段代碼.UseMqttClient()用於啟動MqttClient。

添加Startup.cs 此類與Surging.Services.Server中的Startup.cs一致。

using Autofac;
using Autofac.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Surging.Core.Caching.Configurations;
using Surging.Core.CPlatform.Utilities;
using System.IO;
namespace Surging.MqttClientWithWsServices.Server
{
    public class Startup
    {

        public Startup(IConfigurationBuilder config)
        {
            config.SetBasePath(Directory.GetCurrentDirectory());
            ConfigureCache(config);
        }

        public IContainer ConfigureServices(ContainerBuilder builder)
        {
            var services = new ServiceCollection();
            ConfigureLogging(services);
            builder.Populate(services);
            ServiceLocator.Current = builder.Build();
            return ServiceLocator.Current;
        }

        public void Configure(IContainer app)
        {

        }

        #region 私有方法
        /// <summary>
        /// 配置日誌服務
        /// </summary>
        /// <param name="services"></param>
        private void ConfigureLogging(IServiceCollection services)
        {
            services.AddLogging();
        }


        /// <summary>
        /// 配置緩存服務
        /// </summary>
        private void ConfigureCache(IConfigurationBuilder build)
        {
            build
                .AddCacheFile("cacheSettings.json", optional: false);
        }
        #endregion
    }
}

現在基本上已經完事了,下面再說下surgingSettings.json配置信息為了避免與Surging.Services.Server的端口重復因此配置如下:

{
  "Surging": {
    "Ip": "${Surging_Server_IP}",
    "WatchInterval": 30,
    "Port": "${Surging_Server_Port}|100",
    "MappingIp": "${Mapping_ip}",
    "MappingPort": "${Mapping_Port}",
    "Token": "true",
    "WanIp": "${Mapping_WanIp}",
    "Libuv": true,
    "MaxConcurrentRequests": 20,
    "ExecutionTimeoutInMilliseconds": 30000,
    "Protocol": "${Protocol}|None", //Http、Tcp、None
    "RootPath": "${RootPath}|",
    "Ports": {
      "HttpPort": "${HttpPort}|2801",
      "MQTTPort": "${MQTTPort}|971",
      "WSPort": "${WSPort}|961"
    },
    "RequestCacheEnabled": false,
    "Packages": [
      {
        "TypeName": "EnginePartModule",
        "Using": "${UseEngineParts}|DotNettyModule;NLogModule;MessagePackModule;ServiceProxyModule;ConsulModule;EventBusRabbitMQModule;CachingModule;"
      }
    ]
  }, //如果引用多個同類型的組件,需要配置Packages,如果是自定義按需引用,無需配置Packages
  "Consul": {
    "ConnectionString": "${Register_Conn}|127.0.0.1:8500", // "127.0.0.1:8500",
    "SessionTimeout": "${Register_SessionTimeout}|50",
    "RoutePath": "${Register_RoutePath}",
    "ReloadOnChange": true,
    "EnableChildrenMonitor": false
  },
  "Swagger": {
    "Version": "${SwaggerVersion}|V1", // "127.0.0.1:8500",
    "Title": "${SwaggerTitle}|Surging Demo",
    "Description": "${SwaggerDes}|surging demo",
    "Contact": {
      "Name": "API Support",
      "Url": "https://github.com/dotnetcore/surging",
      "Email": "[email protected]"
    },
    "License": {
      "Name": "MIT",
      "Url": "https://github.com/dotnetcore/surging/blob/master/LICENSE"
    }
  },
  "EventBus_Kafka": {
    "Servers": "${EventBusConnection}|localhost:9092",
    "MaxQueueBuffering": "${MaxQueueBuffering}|10",
    "MaxSocketBlocking": "${MaxSocketBlocking}|10",
    "EnableAutoCommit": "${EnableAutoCommit}|false",
    "LogConnectionClose": "${LogConnectionClose}|false",
    "OffsetReset": "${OffsetReset}|earliest",
    "GroupID": "${EventBusGroupID}|surgingdemo"
  },
  "WebSocket": {
    "WaitTime": 2,
    "KeepClean": false,
    "Behavior": {
      "IgnoreExtensions": true,
      "EmitOnPing": false
    }
  },
  "EventBus": {
    "EventBusConnection": "${EventBusConnection}|192.168.1.127",
    "EventBusUserName": "${EventBusUserName}|guest",
    "EventBusPassword": "${EventBusPassword}|guest",
    "VirtualHost": "${VirtualHost}|/",
    "MessageTTL": "${MessageTTL}|30000",
    "RetryCount": "${RetryCount}|1",
    "FailCount": "${FailCount}|3",
    "prefetchCount": "${PrefetchCount}|0",
    "BrokerName": "${BrokerName}|surging_demo",
    "Port": "${EventBusPort}|32671"
  },
  "Zookeeper": {
    "ConnectionString": "${Zookeeper_ConnectionString}|127.0.0.1:2181",
    "SessionTimeout": 50,
    "ReloadOnChange": true
  },
  "Logging": {
    "Debug": {
      "LogLevel": {
        "Default": "Information"
      }
    },
    "Console": {
      "IncludeScopes": true,
      "LogLevel": {
        "Default": "${LogLevel}|Debug"
      }
    },
    "LogLevel": {
      "Default": "${LogLevel}|Debug"
    }
  },
  "MqttClient": {
    "ClientID": "${MqttClientID}|serverclientid",
    "MqttClientConnection": "${MqttClientConnection}|127.0.0.1",
    "MqttClientUserName": "admin",
    "MqttClientPassword": "123456",
    "Port": 97,
    "KeepAlivedTime": 60,
    "CleanSession": true,
    "Topics": [ "test1","test2" ]
  }
}

 總的Server代碼如圖所示:

技術分享圖片

此時代碼開發階段結束。我們可以設置多項目啟動,啟動項目前確定你的 Consul能夠正常使用。本地啟動Consul 可以通過控制臺來做測試就OK

1、Surging.Services.Server 2、Surging.MqttClientWithWsServices.Server即(紅框標註內容):

技術分享圖片

說到此處,想必大家都知道怎麽使用SurgingWS、MqttClient 與MqttBroker進行連接了。寫出這段代碼主要是針對於Surging不了解,或者攝入不深的人,能直接快速的使用本代碼讓用戶與設備間可以正常通訊。

註意如果使用Docker編排,或者Rancher編排Surging Broker 或者 WSMqttClient 時如果涉及到多個編排我們需要進行相應的邏輯判斷。

近期由於個人需求,需要把設備在線狀態通知給各個DBMqttClient端,用於保存現有設備狀態。在不處理設備連接時,我們就可以知道設備是否在線,是否有異常。異常時長等。如有此需求可在下方留言。此代碼近期可能會貢獻給Surging,讓Surging更加強大。寫這篇文章呢,主要目的是再沒有看懂作者代碼的情況下,可以盡情使用MqttBroker的功能。次處只是引用了WS與MqttClient,其實可以以此為參考部署更多的MqttClient,比如數據保存與服務通訊分開等。在此感謝Surging作者在業余時間為我們做了這麽好的開源項目,願Surging越來越好。

使用Surging Mqtt 開發基於WS的MqttClient客戶端