Esp8266 進階之路26【高階篇】RTOS移植分析 MQTT 實現過程,實現移植 MQTT協議在 esp8266 rtos實時系統,可斷線重連。(附帶Demo)
一、前言;
esp8266
的實時系統rtos
是後面才出來支援的,其最後的呼叫也是呼叫樂鑫提供的API
介面,所以,如果你已經玩轉了NONOS
下的程式設計,那麼移植rtos
程式碼是非常迅捷的,因為你已經對其的API
介面非常熟悉,當然了,熟透一款晶片開發,當然不是一天半天的事情,需要長時間的積累。那麼本博文是基於
rtos
的MQTT
協議的實現,優化了官方的程式碼示範,而且帶你走一走MQTT
協議的世界。
二、MQTT
的常識;
眾所周知,
MQTT
是一種輕捷快速的協議,基於TCP
之上,所以為長連線
的一種協議,非常適合那些短小訊息傳送的資料互動的用途,比如APP
的推送新聞用途,最常見的用在我們現在物聯網領域;畢竟是小且快;
- 在進行彼此通訊時候,必須確保底層提供了有序、可靠、雙向連線的網路連線。比如可以建立
TCP/TLS
連線。所以基本的通訊如下:
- 那麼裝置之間怎麼樣通訊呢?這就涉及到一些術語;要想指定某一個裝置收到此條訊息,那麼就必須根據
topic
主題來識別,這個是伺服器的事情了;下面列下一些常見的專用名詞:
①:
ClientID
客戶端唯一標識,服務端用於關聯一個Session。
只能包含這些 大寫字母,小寫字母 和 數字(0-9a-zA-Z),23個字元以內,同一時間內 Server 和同一個 ClientID 只能保持一個 TCP 連線,再次連線會踢掉前一個連線的客戶端。②:
Keep Alive
顧名思義,目的是保持長連線的可靠性,以及雙方對彼此是否線上的確認。
客戶端在Connect的時候設定 Keep Alive 時長。如果服務端在 1.5 * KeepAlive 時間內沒有收到客戶端的報文,它必須斷開客戶端的網路連線。③:
Will
遺囑,遺願;遺囑訊息(Will Message)儲存在服務端,當網路連線關閉時,服務端必須釋出這個遺囑訊息,所以被形象地稱之為遺囑,可用於通知異常斷線。
④:
retain
0: 服務端不能儲存這個訊息,也不能移除或替換任何 現存的保留訊息 。
1: 服務端必須儲存這個應用訊息和它的QoS等級,以便它可以被分發給未來的訂閱者,所以如果後面未來有客戶端訂閱了這個主題,那麼這個客戶端一上線就會收到此訊息。⑤:
qos
0: 【最多一次】 沒有回覆,不需要儲存。有可能丟失(網路異常斷開,業務層繁忙或者錯誤) 。
1: 【至少一次】確保訊息到達,但訊息重複可能會發生。
2: 【只有一次】確保訊息到達一次;⑥:
poyload
用來傳輸使用者的資料,最大允許 256MB ,釋出的訊息的
Payload
允許為空。在很多場合下,代表將持久訊息(或者遺囑訊息)清空;格式為UTF-8
編碼;
三、官方核心程式碼;
- 樂鑫已經針對
rtos
移植了eclipse
的標準的paho mqtt
,在官方的GitHub
已經看到了原始碼:點我檢視,這個庫非常出名,很多嵌入式的晶片都是移植這個庫。
#define MQTT_CLIENT_THREAD_NAME "mqtt_client_thread"
#define MQTT_CLIENT_THREAD_STACK_WORDS 2048
#define MQTT_CLIENT_THREAD_PRIO 8
LOCAL xTaskHandle mqttc_client_handle;
static void messageArrived(MessageData* data)
{
printf("Message arrived: %s\n", data->message->payload);
}
static void mqtt_client_thread(void* pvParameters)
{
printf("mqtt client thread starts\n");
MQTTClient client;
Network network;
//指定快取區的大小
unsigned char sendbuf[80], readbuf[80] = {0};
int rc = 0, count = 1;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
pvParameters = 0;
//初始化TCP連線
NetworkInit(&network);
//初始化客戶端,注意後面都是傳送和接收資料的快取區,一定要加大這個快取區的大小;否則後面傳送不成功!
MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf, sizeof(readbuf));
char* address = MQTT_BROKER;
//底層的TCP開始連線
if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) {
printf("Return code from network connect is %d\n", rc);
}
//後天任務:如果這個不成功執行,就不會自動進去回撥方法:messageArrived;
#if defined(MQTT_TASK)
if ((rc = MQTTStartTask(&client)) != pdPASS) {
printf("Return code from start tasks is %d\n", rc);
} else {
printf("Use MQTTStartTask\n");
}
#endif
//定義mqtt版本: 3 = 3.1 , 4 = 3.1.1
connectData.MQTTVersion = 3;
//定義客戶端ID(必須唯一): 大夥們可以定義mac地址作為ID
connectData.clientID.cstring = "ESP8266_sample";
//定義連線的賬戶名,這個根據伺服器的選型來弄;【可有可無】
connectData.username.cstring= "admin";
//定義連線的賬戶名密碼,這個根據伺服器的選型來弄;【可有可無】
connectData.password.cstring="admin123456";
//定義連線心跳;
connectData.keepAliveInterval = 40;
//清楚會話
connectData.cleansession = true;
//連線MQTT伺服器
if ((rc = MQTTConnect(&client, &connectData)) != 0) {
printf("Return code from MQTT connect is %d\n", rc);
} else {
printf("MQTT Connected\n");
}
//訂閱主題 MQTTSubscribe --->【ESP8266/sample/pub】
if ((rc = MQTTSubscribe(&client, "ESP8266/sample/pub", 2, messageArrived)) != 0) {
printf("Return code from MQTT subscribe is %d\n", rc);
} else {
printf("fuck MQTT subscribe to topic \"ESP8266/sample/pub\"\n");
}
//死迴圈,時隔一秒傳送一則訊息
while (count++) {
//初始化一則訊息結構體
MQTTMessage message;
char payload[80];
message.qos = QOS2;
message.retained = 0;
message.payload = payload;
sprintf(payload,
"{\"uuid\":\"dsaasdad22\",\"token\":\"saddsa412\",\"ver\":1.0,\"statusCode\":0,\"skill\":%d}",
count);
message.payloadlen = strlen(payload);
printf("MQTT publish to payloadlen :%s\n", message.payload);
if ((rc = MQTTPublish(&client, "ESP8266/sample/pub", &message)) != 0) {
printf("Return code from MQTT publish is %d\n", rc);
} else {
printf(
"MQTT publish topic \"ESP8266/sample/pub\", message number is %d\n",
count);
}
vTaskDelay(1000 / portTICK_RATE_MS); //send every 1 seconds
}
printf("mqtt_client_thread going to be deleted\n");
vTaskDelete(NULL);
return;
}
四、二次修改完善斷開連線;
- 這個庫和樂鑫自己做的那份
NONOS
程式碼不一樣,這個是不會自動重連伺服器的,假如你的路由器突然沒了外網,導致這個連線不成功,那麼就會永遠釋出不了訊息;所以優化如下,程式碼略多,主要原理:
1、通過判斷是否釋出訊息成功的標誌,是否重新連線伺服器和訂閱主題;
2、 如果把釋出訊息的任務獨立開來,就相當於開啟了新的執行緒,我看了一些高質量的程式碼,都是建立一則訊息佇列,處於阻塞等待,直到有訊息要釋出,則在此死迴圈內釋出,如果不釋出,那麼重新連線則釋出;
3、連線和訂閱主題的程式碼都是在死迴圈內的,但是初始化客戶端的程式碼千萬別在死迴圈內,因為這個初始化相當於開闢了記憶體,會佔據記憶體,多次了連線了 ,就相當於開闢多個記憶體了!
static void Task_MqttClient_Connect(void* pvParameters) {
bool isNeedQueue = true;
Network network;
unsigned char sendbuf[2048], readbuf[2048] = { 0 };
int rc = 0, count = 0;
MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
pvParameters = 0;
NetworkInit(&network);
MQTTClientInit(&client, &network, 30000, sendbuf, sizeof(sendbuf), readbuf,
sizeof(readbuf));
//!!!!!不要把初始化放在裡面
for (;;) {
//判斷是否已經獲取了路由器分配的IP
while (wifi_station_get_connect_status() != STATION_GOT_IP) {
vTaskDelay(1000 / portTICK_RATE_MS);
}
char* address = MQTT_SERVICE;
connectData.MQTTVersion = 3;
connectData.clientID.cstring = checkTopic;
connectData.username.cstring = MQTT_USER_NAME;
connectData.password.cstring = MQTT_USER_PAW;
connectData.keepAliveInterval = 40;
connectData.cleansession = true;
if ((rc = NetworkConnect(&network, address, MQTT_PORT)) != 0) {
printf("MClouds NetworkConnect connect is %d\n", rc);
}
if ((rc = MQTTStartTask(&client)) != pdPASS) {
printf("Return code from start tasks is %d\n", rc);
} else {
printf("Use MQTTStartTask\n");
}
if ((rc = MQTTConnect(&client, &connectData)) != 0) {
printf("[SY] MClouds connect is %d\n", rc);
network.disconnect(&network);
vTaskDelay(1000 / portTICK_RATE_MS);
}
if ((rc = MQTTSubscribe(&client, subTopic, QOS0, MessageArrived))
!= 0) {
printf("[SY] MClouds sub fail is %d\n", rc);
network.disconnect(&network);
vTaskDelay(1000 / portTICK_RATE_MS);
}
printf("MQTT subscribe to topic -> %s\n", subTopic);
xQueueReset(MqttMessageQueueHandler);
while (1) {
char payload[2048];
struct esp_mqtt_msg_type *pMsg;
printf("MqttMessageQueueHandler waitting ..\n");
//阻塞等待
xQueueReceive(MqttMessageQueueHandler, &pMsg, portMAX_DELAY);
sprintf(payload, "%s", pMsg->allData);
//printf("MQTT publish payload: %s\n", payload);
os_printf(" [SY] 1 MQTT get freeHeap: %d\n",system_get_free_heap_size());
MQTTMessage message;
message.qos = QOS0;
message.retained = false;
message.payload = (void*) payload;
message.payloadlen = strlen(payload) + 1;
if ((rc = MQTTPublish(&client, pubTopic, &message)) != 0) {
printf("Return code from MQTT publish is %d\n", rc);
} else {
printf("MQTT publish succeed ..\n");
}
if (rc != 0) {
break;
}
}
network.disconnect(&network);
}
printf("mqtt_client_thread going to be deleted\n");
vTaskDelete(NULL);
return;
}
(注意要填寫伺服器地址,還要熟悉
rtos
的訊息佇列。)
1.一定要用最新版的SDK
包的工程,而且要看博文前面的刨坑的連線裡面的庫檔案是否更新到您的工程。
2. 由於下面的硬體程式碼連結不可以修改了,大家下載之後,修改下靜態庫檔案和上面的Task_MqttClient_Connect
方法即可。之後通過不斷輪詢伺服器是否斷開,如果是則傳送訊息重連即可。
3.目前2018.8.27
為止,v2.0.0的SDK的MQTT還是蠻穩定的。斷開連線的可能性較低。