1. 程式人生 > >Esp8266 進階之路26【高階篇】RTOS移植分析 MQTT 實現過程,實現移植 MQTT協議在 esp8266 rtos實時系統,可斷線重連。(附帶Demo)

Esp8266 進階之路26【高階篇】RTOS移植分析 MQTT 實現過程,實現移植 MQTT協議在 esp8266 rtos實時系統,可斷線重連。(附帶Demo)

一、前言;

  • esp8266的實時系統rtos是後面才出來支援的,其最後的呼叫也是呼叫樂鑫提供的API介面,所以,如果你已經玩轉了NONOS下的程式設計,那麼移植rtos程式碼是非常迅捷的,因為你已經對其的API介面非常熟悉,當然了,熟透一款晶片開發,當然不是一天半天的事情,需要長時間的積累。

  • 那麼本博文是基於rtosMQTT協議的實現,優化了官方的程式碼示範,而且帶你走一走MQTT協議的世界。

二、MQTT的常識;

眾所周知,MQTT是一種輕捷快速的協議,基於TCP之上,所以為長連線的一種協議,非常適合那些短小訊息傳送的資料互動的用途,比如APP的推送新聞用途,最常見的用在我們現在物聯網領域;畢竟是小且快;

  • 在進行彼此通訊時候,必須確保底層提供了有序、可靠、雙向連線的網路連線。比如可以建立TCP/TLS連線。所以基本的通訊如下:

這裡寫圖片描述

  • 那麼裝置之間怎麼樣通訊呢?這就涉及到一些術語;要想指定某一個裝置收到此條訊息,那麼就必須根據topic主題來識別,這個是伺服器的事情了;下面列下一些常見的專用名詞:

①:ClientID

客戶端唯一標識,服務端用於關聯一個Session。
只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字元以內,同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 連線,再次連線會踢掉前一個連線的客戶端。

②:Keep Alive

顧名思義,目的是保持長連線的可靠性,以及雙方對彼此是否線上的確認。
客戶端在Connect的時候設定 Keep Alive 時長。如果服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網路連線。

③:Will

遺囑,遺願;遺囑訊息(Will Message)儲存在服務端,當網路連線關閉時,服務端必須釋出這個遺囑訊息,所以被形象地稱之為遺囑,可用於通知異常斷線。

④:retain

0: 服務端不能儲存這個訊息,也不能移除或替換任何 現存的保留訊息 。
1: 服務端必須儲存這個應用訊息和它的QoS等級,以便它可以被分發給未來的訂閱者,所以如果後面未來有客戶端訂閱了這個主題,那麼這個客戶端一上線就會收到此訊息。

⑤:qos

0: 【最多一次】 沒有回覆,不需要儲存。有可能丟失(網路異常斷開,業務層繁忙或者錯誤) 。
1: 【至少一次】確保訊息到達,但訊息重複可能會發生。
2: 【只有一次】確保訊息到達一次;

⑥:poyload

用來傳輸使用者的資料,最大允許 256MB ,釋出的訊息的 Payload允許為空。在很多場合下,代表將持久訊息(或者遺囑訊息)清空;格式為UTF-8編碼;

三、官方核心程式碼;

  • 樂鑫已經針對rtos移植了eclipse的標準的paho mqtt,在官方的GitHub已經看到了原始碼:點我檢視,這個庫非常出名,很多嵌入式的晶片都是移植這個庫。

#define MQTT_CLIENT_THREAD_NAME         "mqtt_client_thread"
#define MQTT_CLIENT_THREAD_STACK_WORDS  2048
#define MQTT_CLIENT_THREAD_PRIO         8

LOCAL xTaskHandle mqttc_client_handle;

static void messageArrived(MessageData* data)
{
    printf("Message arrived: %s\n", data->message->payload);
}

static void mqtt_client_thread(void* pvParameters)
{
    printf("mqtt client thread starts\n");
    MQTTClient client;
    Network network;
    //指定快取區的大小
    unsigned char sendbuf[80], readbuf[80] = {0};
    int rc = 0, count = 1;
    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
    pvParameters = 0;

    //初始化TCP連線
    NetworkInit(&network);
    //初始化客戶端,注意後面都是傳送和接收資料的快取區,一定要加大這個快取區的大小;否則後面傳送不成功!
    MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));

    char* address = MQTT_BROKER;
    //底層的TCP開始連線
    if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) {
        printf("Return code from network connect is %d\n", rc);
    }

//後天任務:如果這個不成功執行,就不會自動進去回撥方法:messageArrived;
#if defined(MQTT_TASK)

    if ((rc = MQTTStartTask(&client)) != pdPASS) {
        printf("Return code from start tasks is %d\n", rc);
    } else {
        printf("Use MQTTStartTask\n");
    }

#endif

    //定義mqtt版本:  3 = 3.1 , 4 = 3.1.1
    connectData.MQTTVersion = 3;
    //定義客戶端ID(必須唯一): 大夥們可以定義mac地址作為ID
    connectData.clientID.cstring = "ESP8266_sample";
    //定義連線的賬戶名,這個根據伺服器的選型來弄;【可有可無】
    connectData.username.cstring= "admin";
    //定義連線的賬戶名密碼,這個根據伺服器的選型來弄;【可有可無】
    connectData.password.cstring="admin123456";
    //定義連線心跳;
    connectData.keepAliveInterval = 40;
    //清楚會話
    connectData.cleansession = true;

    //連線MQTT伺服器
    if ((rc = MQTTConnect(&client, &connectData)) != 0) {
        printf("Return code from MQTT connect is %d\n", rc);
    } else {
        printf("MQTT Connected\n");
    }

    //訂閱主題 MQTTSubscribe --->【ESP8266/sample/pub】
    if ((rc = MQTTSubscribe(&client, "ESP8266/sample/pub", 2, messageArrived)) != 0) {
        printf("Return code from MQTT subscribe is %d\n", rc);
    } else {
        printf("fuck MQTT subscribe to topic \"ESP8266/sample/pub\"\n");
    }

    //死迴圈,時隔一秒傳送一則訊息
    while (count++) {
        //初始化一則訊息結構體
        MQTTMessage message;
        char payload[80];
        message.qos = QOS2;
        message.retained = 0;
        message.payload = payload;
        sprintf(payload,
                "{\"uuid\":\"dsaasdad22\",\"token\":\"saddsa412\",\"ver\":1.0,\"statusCode\":0,\"skill\":%d}",
                count);
        message.payloadlen = strlen(payload);

        printf("MQTT publish to payloadlen :%s\n",  message.payload);

        if ((rc = MQTTPublish(&client, "ESP8266/sample/pub", &message)) != 0) {
            printf("Return code from MQTT publish is %d\n", rc);
        } else {
            printf(
                    "MQTT publish topic \"ESP8266/sample/pub\", message number is %d\n",
                    count);
        }

        vTaskDelay(1000 / portTICK_RATE_MS);  //send every 1 seconds
    }

    printf("mqtt_client_thread going to be deleted\n");
    vTaskDelete(NULL);
    return;
}

四、二次修改完善斷開連線;

  • 這個庫和樂鑫自己做的那份NONOS程式碼不一樣,這個是不會自動重連伺服器的,假如你的路由器突然沒了外網,導致這個連線不成功,那麼就會永遠釋出不了訊息;所以優化如下,程式碼略多,主要原理:

1、通過判斷是否釋出訊息成功的標誌,是否重新連線伺服器和訂閱主題;

2、 如果把釋出訊息的任務獨立開來,就相當於開啟了新的執行緒,我看了一些高質量的程式碼,都是建立一則訊息佇列,處於阻塞等待,直到有訊息要釋出,則在此死迴圈內釋出,如果不釋出,那麼重新連線則釋出;

3、連線和訂閱主題的程式碼都是在死迴圈內的,但是初始化客戶端的程式碼千萬別在死迴圈內,因為這個初始化相當於開闢了記憶體,會佔據記憶體,多次了連線了 ,就相當於開闢多個記憶體了!


static void Task_MqttClient_Connect(void* pvParameters) {

    bool isNeedQueue = true;

    Network network;
    unsigned char sendbuf[2048], readbuf[2048] = { 0 };
    int rc = 0, count = 0;
    MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
    pvParameters = 0;
    NetworkInit(&network);
    MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf,
            sizeof(readbuf));

    //!!!!!不要把初始化放在裡面
    for (;;) {

        //判斷是否已經獲取了路由器分配的IP
        while (wifi_station_get_connect_status() != STATION_GOT_IP) {
            vTaskDelay(1000 / portTICK_RATE_MS);
        }

        char* address = MQTT_SERVICE;
        connectData.MQTTVersion = 3;
        connectData.clientID.cstring = checkTopic;
        connectData.username.cstring = MQTT_USER_NAME;
        connectData.password.cstring = MQTT_USER_PAW;
        connectData.keepAliveInterval = 40;
        connectData.cleansession = true;

        if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) {
            printf("MClouds NetworkConnect connect is %d\n", rc);
        }

        if ((rc = MQTTStartTask(&client)) != pdPASS) {
            printf("Return code from start tasks is %d\n", rc);
        } else {
            printf("Use MQTTStartTask\n");
        }

        if ((rc = MQTTConnect(&client, &connectData)) != 0) {
            printf("[SY] MClouds connect is %d\n", rc);
            network.disconnect(&network);
            vTaskDelay(1000 / portTICK_RATE_MS);
        }

        if ((rc = MQTTSubscribe(&client, subTopic, QOS0, MessageArrived))
                != 0) {
            printf("[SY] MClouds sub fail is %d\n", rc);
            network.disconnect(&network);
            vTaskDelay(1000 / portTICK_RATE_MS);
        }

        printf("MQTT subscribe to topic -> %s\n", subTopic);
        xQueueReset(MqttMessageQueueHandler);

        while (1) {

            char payload[2048];

            struct esp_mqtt_msg_type *pMsg;
            printf("MqttMessageQueueHandler waitting ..\n");

            //阻塞等待
            xQueueReceive(MqttMessageQueueHandler, &pMsg, portMAX_DELAY);
            sprintf(payload, "%s", pMsg->allData);
            //printf("MQTT  publish payload: %s\n", payload);
            os_printf(" [SY] 1 MQTT get freeHeap: %d\n",system_get_free_heap_size());


            MQTTMessage message;
            message.qos = QOS0;
            message.retained = false;
            message.payload = (void*) payload;
            message.payloadlen = strlen(payload) + 1;

            if ((rc = MQTTPublish(&client, pubTopic, &message)) != 0) {
                printf("Return code from MQTT publish is %d\n", rc);
            } else {
                printf("MQTT publish succeed ..\n");
            }

            if (rc != 0) {
                break;
            }

        }
        network.disconnect(&network);
    }

    printf("mqtt_client_thread going to be deleted\n");
    vTaskDelete(NULL);
    return;

}

(注意要填寫伺服器地址,還要熟悉rtos的訊息佇列。)
1.一定要用最新版的SDK包的工程,而且要看博文前面的刨坑的連線裡面的庫檔案是否更新到您的工程。
2. 由於下面的硬體程式碼連結不可以修改了,大家下載之後,修改下靜態庫檔案和上面的Task_MqttClient_Connect方法即可。之後通過不斷輪詢伺服器是否斷開,如果是則傳送訊息重連即可。
3.目前2018.8.27為止,v2.0.0的SDK的MQTT還是蠻穩定的。斷開連線的可能性較低。