1. 程式人生 > >物聯網Mqtt協議使用(1)

物聯網Mqtt協議使用(1)

 

 

MQTT是輕量級基於代理的釋出/訂閱的訊息傳輸協議,設計思想是開放、簡單、輕量、易於實現。這些特點使它適用於受限環境。特別是資源受限的嵌入式平臺,該協議的特點有:

  1. 使用釋出/訂閱訊息模式,提供一對多的訊息釋出,解除應用程式耦合。
  2. 對負載內容遮蔽的訊息傳輸。
  3. 使用 TCP/IP 提供網路連線。
  4. 小型傳輸,開銷很小(固定長度的頭部是 2 位元組),協議交換最小化,以降低網路流量。
  5. 使用 Last Will 和 Testament 特性通知有關各方客戶端異常中斷的機制。

MQTT提供了三種訊息釋出服務質量:

“至多一次”,訊息釋出完全依賴底層 TCP/IP 網路。會發生訊息丟失或重複。這一級別可用於如下情況,環境感測器資料,丟失一次讀記錄無>所謂,因為不久後還會有第二次傳送。

“至少一次”,確保訊息到達,但訊息重複可能會發生。

“只有一次”,確保訊息到達一次。這一級別可用於如下情況,在計費系統中,訊息重複或丟失會導致不正確的結果。

協議裡還有2個主要的角色:

·         client,客戶端,又包括髮布訊息客戶端和訂閱訊息客戶端

·         broker,伺服器端

即如下圖所示,訂閱者先向伺服器訂閱訊息主題,當釋出者釋出訊息給伺服器時,訂閱者就會從伺服器訂閱得到訊息值。

  • MQTT伺服器搭建

EMQTT是分散式開源物聯網MQTT訊息伺服器 ,具有標準協議,開放原始碼,高併發低延時,分佈叢集橋接,多資料庫儲存,私有云部署,專業團隊支援。各種作業系統平臺下EMQTT原始碼下載連線:http://www.emqtt.com/downloads

本次在使用阿里雲ECS伺服器搭建broker,伺服器系統為centos7,下載安裝使用流程如下所示:

從 http://emqtt.com/downloads/下載centos7下的原始碼
unzip centos7
cd cemqttd/
 ./bin/emqttd console //啟動MQTT 脫離終端不能工作
 ./bin/emqttd start  //啟動MQTT,開啟守護程序模式,脫離終端還在工作

伺服器工作後,會監聽四個埠,1883,8883,8083,18083,對應的服務內容如下:

1883    ======  MQTT協議埠 ,以後使用MQTT客服端就需要連線TCP 1883埠
8883    ======  MQTT(SSL)埠,這是MQTT加密安全埠
8083    ======  MQTT(WebSocket), HTTP API埠
18083   ======  Dashboard管理控制檯埠  

現在在瀏覽器中開啟終端,輸入 伺服器ip:18083就會進入dash終端,預設使用者名稱admin 密碼  public

使用mosquitoo體驗一下MQTT協議的使用,使用emqtt作為伺服器,mosquitto作為客戶端使用(mosquitoo安裝使用可以檢視相關文件,這裡不做 介紹)

  • 客戶端首先要訂閱主體,主題為"mqtt"
mosquitto_sub -t mqtt –h  [伺服器IP]
  • 再開啟一個終端作為釋出端,指定主題"mqtt"推送訊息,推送內容為"hello world"
mosquitto_pub -h localhost –t mqtt -m "hello world"

在訂閱了"mqtt"的訂閱端會接收到訊息可以在emqtt中檢視到相關主題,以及該主題對應的訊息內容

  • 嵌入式平臺上的MQTT客戶端移植

PAHO是一個開源的MQTT客戶端的庫,支援Java, C/C++, GoLang, Python C語言包下載,

嵌入式平臺使用的包下載:

git clone https://github.com/eclipse/paho.mqtt.embedded-c.git

主要包含了三個主要目錄,MQTTPacket 資料夾是一些MQTT包的初始化,MQTTClient資料夾是C++使用的API,MQTTClient-c資料夾是C語言的相關API。在本次移植目標是nanopi-m1開發板

編譯安裝流程 ,指定CC指定交叉編譯器,編譯後在原始碼目錄下的 build/output/下生成共享庫和相關測試程式、

在開發板上掛接網路檔案系統,將此目錄下的共享庫拷貝到/lib目錄下,其中的sample程式中有兩個可執行檔案,pub0sub1 qos0pub。拷貝庫之後發現可以執行。使用find 命令搜尋整個原始碼找到原始檔,發現在MQTTPacket/samples/pub0sub1.c MQTTPacket/samples/qos0pub.c。經過多次測試,發現後面直接修改這兩個原始檔,然後實現我們的應用直接簡明多了,一開始準備編譯,安裝,然後自己寫程式交叉編譯就連線libpaho這個庫,可是編譯老是出現問題。pub0sub1.c裡面是先訂閱一個主題 ,然後推送這個主題,訂閱部分就會收到這個主題的內容

make CC=/samba_share/nanopi_4.14/4.9.3/bin/arm-linux-gcc     #交叉編譯,CC指定交叉編譯器
mount -t nfs  192.168.1.145:/samba_share/nanopi_4.14/ /mnt   #掛載NFS

[email protected]:/mnt/paho.mqtt.embedded-c/build/output# ls           #build目錄下生成相關庫和可執行文檔案
libpaho-embed-mqtt3c.so      samples
libpaho-embed-mqtt3c.so.1    test
libpaho-embed-mqtt3c.so.1.0
[email protected]:/mnt/paho.mqtt.embedded-c/build/output#cp *.so.*  -rfd   /lib #將庫拷貝到目標檔案系統的lib目錄

   修改pub0sub1.c中的伺服器地址,改成之前搭建的MQTT伺服器域名,重新使用上面的命令編譯,執行時出現,sockerr 懷疑是閘道器和dns沒設定。設定nanopi-m1的閘道器和DNS伺服器

echo nameserver 61.128.128.68 > /etc/resolv.conf
route add default gw 192.168.1.1

先為了方便測試,在nanopi上推送訊息。python paho是一個符合MQTT v3.1協議的客戶端,paho-python可連線MQTT代理伺服器、釋出訊息、訂閱訊息和獲得推送訊息。

在ubuntu主機上搭建python paho來進行訂閱測試,ubuntu上搭建python paho過程如下所示:

 git clone https://github.com/eclipse/paho.mqtt.python.git   //下載python  paho原始碼包
 python setup.py install   //安裝 python  paho到ubuntu系統目錄        

python使用程式設計簡單,非常方便我們的整個系統的測試,整個過程是連線到我們之前搭建的MQTT伺服器後,就訂閱主題為"test",訂閱後就不斷等待發布端釋出"test"主題,使用在開發板上sample下的qos0pub來進行推送訊息

#coding=utf-8
import paho.mqtt.client as mqtt
import time
HOST = "47.106.117.13"
PORT = 1883

def client_loop():
    client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
    client = mqtt.Client(client_id)    # ClientId不能重複,所以使用當前時間
    #client.username_pw_set("admin", "123456")  # 必須設定,否則會返回「Connected with result code 4」
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(HOST, PORT, 60) 
    client.loop_forever()

def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    client.subscribe("test")

def on_message(client, userdata, msg):
    print(msg.topic+" "+msg.payload.decode("utf-8"))

if __name__ == '__main__':
    client_loop()

分別執行訂閱客戶端和釋出客戶端的程式,當使用nanopi開發板每釋出一次訊息時,ubuntu主機端就會接受到這條訊息。

釋出:


[email protected]:/mnt/paho.mqtt.embedded-c/build/output/samples# ./qos0pub
Sending to hostname 47.106.117.13 port 1883
Successfully published

訂閱:

[email protected]:~/MQTTclient/mqtt-python$ python mqtt_recv.py 
Connected with result code 0
test mqtt_payload
test mqtt_payload

先使用nanopi上訂閱訊息,主題為"chat",ubuntu主機發布主題為"chat"的訊息

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include "MQTTPacket.h"
#include "transport.h"

/* This is in order to get an asynchronous signal to stop the sample,
as the code loops waiting for msgs on the subscribed topic.
Your actual code will depend on your hw and approach*/
#include <signal.h>

int toStop = 0;

void cfinish(int sig)
{
	signal(SIGINT, NULL);
	toStop = 1;
}

void stop_init(void)
{
	signal(SIGINT, cfinish);
	signal(SIGTERM, cfinish);
}
/* */

int main(int argc, char *argv[])
{
	MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
	int rc = 0;
	int mysock = 0;
	unsigned char buf[200];
	int buflen = sizeof(buf);
	int msgid = 1;
	MQTTString topicString = MQTTString_initializer;
	int req_qos = 0;
	char* payload = "mypayload";
	int payloadlen = strlen(payload);
	int len = 0;
	char *host = "47.106.117.13";
	int port = 1883;

	stop_init();
	mysock = transport_open(host, port);
	if(mysock < 0)
	{	
		//return mysock;
		printf("mysock err!\n");
		return mysock;
	}
	printf("Sending to hostname %s port %d\n", host, port);

	data.clientID.cstring = "me";
	data.keepAliveInterval = 20;
	data.cleansession = 1;

	len = MQTTSerialize_connect(buf, buflen, &data);
	rc = transport_sendPacketBuffer(mysock, buf, len);

	/* wait for connack */
	if (MQTTPacket_read(buf, buflen, transport_getdata) == CONNACK)
	{
		unsigned char sessionPresent, connack_rc;

		if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0)
		{
			printf("Unable to connect, return code %d\n", connack_rc);
			goto exit;
		}
	}
	else
		goto exit;

	/* subscribe */
	topicString.cstring = "chat";
	len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos);

	rc = transport_sendPacketBuffer(mysock, buf, len);
	if (MQTTPacket_read(buf, buflen, transport_getdata) == SUBACK) 	/* wait for suback */
	{
		unsigned short submsgid;
		int subcount;
		int granted_qos;

		rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen);
		if (granted_qos != 0)
		{
			printf("granted qos != 0, %d\n", granted_qos);
			goto exit;
		}
	}
	else
		goto exit;

	/* loop getting msgs on subscribed topic */
	topicString.cstring = "chat";
	while (!toStop)
	{
		/* transport_getdata() has a built-in 1 second timeout,
		your mileage will vary */
		if (MQTTPacket_read(buf, buflen, transport_getdata) == PUBLISH)
		{
			unsigned char dup;
			int qos;
			unsigned char retained;
			unsigned short msgid;
			int payloadlen_in;
			unsigned char* payload_in;
			int rc;
			MQTTString receivedTopic;

			rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
					&payload_in, &payloadlen_in, buf, buflen);
			printf("message arrived %.*s\n", payloadlen_in, payload_in);
		}
	}

	printf("disconnecting\n");
	len = MQTTSerialize_disconnect(buf, buflen);
	rc = transport_sendPacketBuffer(mysock, buf, len);

exit:
	transport_close(mysock);
	return 0;
}

pub.py:

#coding=utf-8
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import time

HOST = "47.106.117.13"
PORT = 1883
def on_connect(client, userdata, flags, rc):
    print("Connected with result code "+str(rc))
    #client.subscribe("test")

def on_message(client, userdata, msg):
    print(msg.topic+" "+msg.payload.decode("utf-8"))

if __name__ == '__main__':
    client_id = time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
    # client = mqtt.Client(client_id)    # ClientId不能重複,所以使用當前時間
    # client.username_pw_set("admin", "123456")  # 必須設定,否則會返回「Connected with result code 4」
    # client.on_connect = on_connect
    # client.on_message = on_message
    # client.connect(HOST, PORT, 60)
    # client.publish("test", "你好 MQTT", qos=0, retain=False)  # 釋出訊息

    publish.single("chat", "你好 MQTT", qos = 1,hostname=HOST,port=PORT, client_id=client_id)

每次在ubuntu主機上釋出chat主題的訊息時,nanopi就會接收這個訊息

[email protected]:/mnt/paho.mqtt.embedded-c/build/output/samples# ./pub0sub1
Sending to hostname 47.106.117.13 port 1883
message arrived 你好 MQTT

下一講,將深入paho相關釋出訂閱相關的API。。。