從零開始搭建物聯網平臺(6):訊息的持久化
遇到的問題:
查看了EMQ文件發現並不提供訊息的持久化功能,MQTT協議是按照裝置一直線上設計的,資料都是儲存在記憶體裡的,但是考慮到使用者上傳感測器資料不可能接收了就扔掉,那樣就沒法檢視歷史資料了,所以使用者上傳的訊息必須要能夠儲存下來,以便檢視歷史資料,這樣一來持久化功能就需要我們自己來實現。
另外還會出現一個問題,當兩個設備註冊的主題名一樣的時候,不能分出是哪一個裝置發出的訊息,在接收訂閱訊息的時候發現沒辦法獲取到傳送訊息的clientID,而且其他裝置也可以訂閱到任意裝置的訊息,對於敏感資訊來說存在安全性。
解決方法:
初步打算是,使用者需要在後臺註冊自己的裝置和資料流資訊,後臺會對所有註冊的資訊進行訂閱接收到訊息後,後臺會把訊息寫入到對應的表中,另外裝置釋出主題只能使用(clientID/主題名)命名方式,以便後臺能夠區分是哪一個裝置傳送過來的訊息。對於MQTT瞭解還是不夠深,只能使用這樣的笨辦法來解決了,以後若是找到其他的方法在進行改進。
解決問題:
首先需要通過python建立mqtt連線監聽所有註冊的主題資訊,這裡使用了paho-mqtt庫來實現,為了方便以後的呼叫將其封裝成一個類,最開始的時候想把一些常用的操作也封裝進去,單獨測試完全可以,但是一旦放到Django請求中處理的時候,mqtt能夠正常返回成功資訊,但是實際上並沒有正確執行,這一點始終沒有找到原因,最終只能簡化,只包含最基礎的功能。
class MqClient(object): def __init__(self, client_id, username, password): self.client = client.Client(client_id=client_id, clean_session=True) # 初始化,clean_session為false的時候EMQ會儲存訂閱狀態,可以不再次訂閱 self.client.username_pw_set(username, password) # 設定連線使用者名稱 self.client.on_connect = self.on_connect self.client.on_message = self.on_message self._client_status = False # 連線狀態 self._cloop = None self._connect() # 例項化會自動連線 def _connect(self, host="your IP ", port=1883, keepalive=60): """連線伺服器""" self.client.connect_async(host, port, keepalive) # 開啟執行緒執行 self._cloop = threading.Thread(target=self.client.loop_start()) self._cloop.start() def on_connect(self, client, userdata, flags, rc): """連線成功的回撥函式""" # 修改客戶端狀態 if rc == 0: self._client_status = True def init_sub(self): # 讀取資料庫中所有的已經註冊過的topic並且訂閱 for i in models.Device.objects.all(): for j in i.dev_stream.all(): self.client.subscribe(str(i.device_id) + '/' + j.name, j.qos) @staticmethod def on_message(client, userdata, msg): client_id = msg.topic.split('/')[0] stream = msg.topic.split('/')[1] data = msg.payload.decode() # 接收訂閱資訊寫入到資料庫中 models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add( models.Data.objects.create(data=data))
有了封裝好的類,現在我們需要做的是:在Django專案啟動完成之後自動執行監聽任務的,最開始的時候打算放到setting或者__init__裡面,但是因為類裡面封裝了model操作,那時候專案還沒有載入完model會報錯,所以最終新建了一個app,然後放到其下的urls,這樣當專案啟動完成的時候就會自動載入了。
from utils.mqtt_client import MqClient
MQClient = MqClient(your client ID, username, password)
MQClient.init_sub()
接下來測試一下實時新增訂閱的功能,先從urls檔案匯入示例化之後的物件,呼叫client的subscribe方法
from mqtt.urls import MQClient
class Test(APIView):
def post(self, request):
topic = request.POST.get('topic')
qos = int(request.POST.get('qos'))
if topic:
MQClient.client.subscribe(topic,qos)
return HttpResponse("ok")
最後就剩下把資料存入資料庫中了,這個操作已經寫在那個類中了。簡單說明一下 ,當paho接收了mqtt請求的時候會產生一個回撥,執行下面這個函式,接收到的類容包含在msg中,msg主要有topic和payload兩個屬性,topic是訂閱的主題名,payload則是具體的訊息內容,按照之前的規定,主題名為client/stream,對topic內容拆分獲取到client_id和stream,最後就是資料庫的插入操作了,涉及到多表操作,簡單點說就是,先插入一個data資料,然後根據client_id和stream來確定stream,最後再通過add方法將兩者關聯起來,這樣就完成了訊息的儲存了。
def on_message(client, userdata, msg):
client_id = msg.topic.split('/')[0]
stream = msg.topic.split('/')[1]
data = msg.payload.decode()
# 接收訂閱資訊寫入到資料庫中
models.DataStream.objects.filter(device__device_id=client_id).filter(name=stream).first().data.add(
models.Data.objects.create(data=data))
測試一下,資料庫裡面已經準備一些client和stream資料,還是使用EMQ的websocket來測試,傳送主題為123456/hum,訊息內容為654321,在來看一下資料庫中資料是否插入成功。
更新:
最終對這一部分做了修改,沒有將MQTT相關的東西放到Django裡面,獨立出來了,這樣也方便日後的擴充套件和管理,資料庫操作改用了sqlachemy實現,其他內容基本不變