1. 程式人生 > >C# Mqtt 斷線重連

C# Mqtt 斷線重連

在通過 MqttClient 客戶端連線之後,在服務端服務重啟時,客戶端如果沒有重連機制,則無法再接收到訂閱的訊息。

使用的 Mqtt 元件為:M2Mqtt.Net.dll

一些特性發現

(1)如果提供的服務端地址是不可解析的,會引發異常無法例項化 MqttClient 物件。
(2)Connect 無法連線時會引發異常,IsConnected 為 false。
(3)服務端斷開會觸發客戶端的 ConnectionClosed 事件,IsConnected 為 false。
(4)重新 Connect 需要重新 Subscribe 訂閱主題。
(5)MqttClient.Subscribe 第一個引數為訂閱主題陣列,第二個為相應的 qosLevel,兩個陣列長度必須一致,否則會異常。

重連流程控制

主要程式碼實現

(1)執行緒主體

// 自動重連主體
private void _TryContinueConnect()
{
    if (IsConnected) return;

    Thread retryThread = new Thread(new ThreadStart(delegate
    {
        while (_MqttClient == null || !_MqttClient.IsConnected)
        {
            if (_ToClose) break;

            if (_MqttClient == null)
            {
                _BuildClient();
                Thread.Sleep(3000);
                continue;
            }

            try
            {
                _TryCount++;
                _Connect(); 
            }
            catch (Exception ce)
            {
                Debug.WriteLine("re connect exception:" + ce.Message);
            }

            // 如果還沒連線不符合結束條件則睡2秒
            if (!_MqttClient.IsConnected)
            {
                Thread.Sleep(2000);
            }
        }
    }));
            
    retryThread.Start();
}

(2)例項化部分

// 例項化客戶端
private void _BuildClient()
{
    try
    {
        _MqttClient = new MqttClient(_MqttServer);
    }
    catch (Exception e)
    {
        Debug.WriteLine("build client error:" + e.Message);
        return;
    }

    // 訊息到達事件繫結
    _MqttClient.MqttMsgPublishReceived += client_MqttMsgPublishReceived;

    // 連線斷開事件繫結
    _MqttClient.ConnectionClosed += (sender, e) =>
    {
        if (!_ToClose)
        {
            // 嘗試重連
            _TryContinueConnect();
        }
    };
}

(3)嘗試連線部分

// 發起一次連線,連線成功則訂閱相關主題 
private void _Connect()
{
    if (String.IsNullOrEmpty(_MqttUsername))
    {
        var b = _MqttClient.Connect(_MqttClientId);
    }
    else
    {
        var b = _MqttClient.Connect(_MqttClientId, _MqttUsername, _MqttUserpass);
    } 

    if (_MqttClient.IsConnected)
    {
        _MqttClient.Subscribe(new string[] { "topic1", "topic2" },
            new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE, MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
    }
}

實測效果不錯,其中延時時間可以適當調整