1. 程式人生 > >從零開始搭建物聯網平臺(6):訊息的持久化

從零開始搭建物聯網平臺(6):訊息的持久化

遇到的問題:

查看了EMQ文件發現並不提供訊息的持久化功能,MQTT協議是按照裝置一直線上設計的,資料都是儲存在記憶體裡的,但是考慮到使用者上傳感測器資料不可能接收了就扔掉,那樣就沒法檢視歷史資料了,所以使用者上傳的訊息必須要能夠儲存下來,以便檢視歷史資料,這樣一來持久化功能就需要我們自己來實現。

另外還會出現一個問題,當兩個設備註冊的主題名一樣的時候,不能分出是哪一個裝置發出的訊息,在接收訂閱訊息的時候發現沒辦法獲取到傳送訊息的clientID,而且其他裝置也可以訂閱到任意裝置的訊息,對於敏感資訊來說存在安全性。

解決方法:

初步打算是,使用者需要在後臺註冊自己的裝置和資料流資訊,後臺會對所有註冊的資訊進行訂閱接收到訊息後,後臺會把訊息寫入到對應的表中,另外裝置釋出主題只能使用(clientID/主題名)命名方式,以便後臺能夠區分是哪一個裝置傳送過來的訊息。對於MQTT瞭解還是不夠深,只能使用這樣的笨辦法來解決了,以後若是找到其他的方法在進行改進。

解決問題:

首先需要通過python建立mqtt連線監聽所有註冊的主題資訊,這裡使用了paho-mqtt庫來實現,為了方便以後的呼叫將其封裝成一個類,最開始的時候想把一些常用的操作也封裝進去,單獨測試完全可以,但是一旦放到Django請求中處理的時候,mqtt能夠正常返回成功資訊,但是實際上並沒有正確執行,這一點始終沒有找到原因,最終只能簡化,只包含最基礎的功能。

class MqClient(object):
    def __init__(self, client_id, username, password):
        self.client = client.Client(client_id=client_id,
                                    clean_session=True)  # 初始化,clean_session為false的時候EMQ會儲存訂閱狀態,可以不再次訂閱
        self.client.username_pw_set(username, password)  # 設定連線使用者名稱
        self.client.on_connect = self.on_connect
        self.client.on_message = self.on_message
        self._client_status = False  # 連線狀態
        self._cloop = None
        self._connect()  # 例項化會自動連線

    def _connect(self, host="your IP ", port=1883, keepalive=60):
        """連線伺服器"""
        self.client.connect_async(host, port, keepalive)
        # 開啟執行緒執行
        self._cloop = threading.Thread(target=self.client.loop_start())
        self._cloop.start()

    def on_connect(self, client, userdata, flags, rc):
        """連線成功的回撥函式"""
        # 修改客戶端狀態
        if rc == 0:
            self._client_status = True

    def init_sub(self):
        # 讀取資料庫中所有的已經註冊過的topic並且訂閱
        for i in models.Device.objects.all():
            for j in i.dev_stream.all():
                self.client.subscribe(str(i.device_id) + '/' + j.name, j.qos)

    @staticmethod
    def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收訂閱資訊寫入到資料庫中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

有了封裝好的類,現在我們需要做的是:在Django專案啟動完成之後自動執行監聽任務的,最開始的時候打算放到setting或者__init__裡面,但是因為類裡面封裝了model操作,那時候專案還沒有載入完model會報錯,所以最終新建了一個app,然後放到其下的urls,這樣當專案啟動完成的時候就會自動載入了。

from utils.mqtt_client import MqClient

MQClient = MqClient(your client ID, username, password)
MQClient.init_sub()

接下來測試一下實時新增訂閱的功能,先從urls檔案匯入示例化之後的物件,呼叫client的subscribe方法

from mqtt.urls import MQClient
class Test(APIView):
    def post(self, request):
        topic = request.POST.get('topic')
        qos = int(request.POST.get('qos'))
        if topic:
            MQClient.client.subscribe(topic,qos)
        return HttpResponse("ok")

最後就剩下把資料存入資料庫中了,這個操作已經寫在那個類中了。簡單說明一下 ,當paho接收了mqtt請求的時候會產生一個回撥,執行下面這個函式,接收到的類容包含在msg中,msg主要有topic和payload兩個屬性,topic是訂閱的主題名,payload則是具體的訊息內容,按照之前的規定,主題名為client/stream,對topic內容拆分獲取到client_id和stream,最後就是資料庫的插入操作了,涉及到多表操作,簡單點說就是,先插入一個data資料,然後根據client_id和stream來確定stream,最後再通過add方法將兩者關聯起來,這樣就完成了訊息的儲存了。

def on_message(client, userdata, msg):
        client_id = msg.topic.split('/')[0]
        stream = msg.topic.split('/')[1]
        data = msg.payload.decode()
        # 接收訂閱資訊寫入到資料庫中
        models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
            models.Data.objects.create(data=data))

測試一下,資料庫裡面已經準備一些client和stream資料,還是使用EMQ的websocket來測試,傳送主題為123456/hum,訊息內容為654321,在來看一下資料庫中資料是否插入成功。

 

data表
stream表
stream和data關聯表

 

更新:

最終對這一部分做了修改,沒有將MQTT相關的東西放到Django裡面,獨立出來了,這樣也方便日後的擴充套件和管理,資料庫操作改用了sqlachemy實現,其他內容基本不變