1. 程式人生 > >使用Python編寫指令碼將MQTT資料轉存至InfluxDB

使用Python編寫指令碼將MQTT資料轉存至InfluxDB

前言

之前使用Rabbitmq部署了一個簡單的MQTT伺服器,暫未做使用者隔離,也部署了InfluxDB時序資料庫,但是並不能直接通過配置將MQTT伺服器的資料轉存至時序資料庫中,於是我決定自己寫指令碼實現下.

準備

開啟shell使用
pip install influxdb安裝InluxDB所需模組
pip install paho-mqtt安裝Rabbmq所需模組

原始碼

# coding=utf-8
import json
import random
import threading
import os
import paho.mqtt.client as mqtt
import
time from influxdb import InfluxDBClient from my_lib.code_handle.code_handle import auto_code class Mqtt_handle: topic_sub='$dp' topic_pub='$info' counts = 0 clientID = '' for i in range(0, 2): clientID = clientID.join(str(random.uniform(0, 1))) mqtt_client = mqtt.Client(clientID) DB_client = InfluxDBClient(self._host, 8086
, '', '', 'mydb') # 初始化 def __init__(self, host, port): self._host = host self._port = port self.mqtt_client.on_connect = self._on_connect # 設定連線上伺服器回撥函式 self.mqtt_client.on_message = self._on_message # 設定接收到伺服器訊息回撥函式 def connect(self, username=None, password=None)
:
self.mqtt_client.username_pw_set(username, password) self.mqtt_client.connect(self._host, self._port, 60) # 連線伺服器,埠為1883,維持心跳為60秒 def publish(self, data): self.mqtt_client.publish(self.topic_pub, data) def loop(self, timeout=None): thread = threading.Thread(target=self._loop, args=(timeout,)) thread.start() def _loop(self, timeout=None): if not timeout: self.mqtt_client.loop_forever() else: self.mqtt_client.loop(timeout) def _on_connect(self, client, userdata, flags, rc): local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) with open('./run.log', 'a+')as f: f.write('@Run ' + local_time + ' Connected with result code :' + str(rc)) client.subscribe(self.topic_sub) def _on_message(self, client, userdata, msg): # 從伺服器接受到訊息後回撥此函式 data_json = auto_code(str(msg.payload)) if self._is_json(data_json): data_list = [json.loads(data_json)] #如果符合InfluxDB格式就轉存至資料庫 if 'measurement' in data_list[0] and 'tags' in data_list[0] and 'fields' in data_list[0]: try: DB_client.write_points(data_list) self.counts += 1 local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) #//**********//記錄一個上傳日誌 with open('./upload.log', 'a+')as f: f.write('Success,counts:' + str(self.counts) + ' Time:' + local_time + '\n') except Exception as e: with open('./upload.log', 'a+')as f: f.write(e.message) f.write('\nTopic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n\n') #//**********// #如果接受到停止指令就停止程式並記錄一個停止日誌 elif data_list[0].has_key('cmd') and data_list[0]['cmd'] == 'exit': print '\[email protected]_handle Exit\n' with open('./run.log', 'a')as f: local_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) f.write( '@Stop ' + local_time + ' Topic:' + auto_code(str(msg.topic)) + " Msg:" + data_json + '\n') os._exit(0)#停止程式 #解析JSON前先判斷資料 是否是JSON格式,避免程式崩潰 def _is_json(self, data): try: json.loads(data) except ValueError: return False return True if __name__ == '__main__': local_host = '127.0.0.1' DB_client = InfluxDBClient(local_host, 8086, '', '', 'mydb') # 初始化 mqtt_client = Mqtt_handle(local_host, 1883) mqtt_client.connect('influxdb', 'influxdb') mqtt_client.loop()

寫一個簡單的Shell執行一下指令碼:

#!/bin/bash
nohup python /home/ubuntu/app/py/mqttDB/mqtt_handle.py &

相關推薦

使用Python編寫指令碼MQTT資料InfluxDB

前言 之前使用Rabbitmq部署了一個簡單的MQTT伺服器,暫未做使用者隔離,也部署了InfluxDB時序資料庫,但是並不能直接通過配置將MQTT伺服器的資料轉存至時序資料庫中,於是我決定自己寫指令碼實現下. 準備 開啟shell使用 pip in

Python指令碼Redis資料到Mysql列表中

目錄 一、思路 三、總結 一、思路        連線指定的redis和mysql資料庫,從redis中取出資料,然後存到mysql中,中間會遇到幾個問題,在下面的程式碼片段中指出 二、程式碼實現 # coding=utf-8 import js

Flume kafka 中的資料到 HDFS 中

flume1.8 kafka Channel + HDFS sink(without sources) 將 kafka 中的資料轉存到 HDFS 中, 用作離線計算, flume 已經幫我們實現了, 新增配置檔案, 直接啟動 flume-ng 即可. The Kafka channel can be

Oracle用定時任務儲存過程資料到歷史表,提高查詢速度

一、定義儲存過程 CREATE OR REPLACE PROCEDURE Sync_INFO_HISTORY IS BEGIN insert into depart_passenger_info

python指令碼——同一個資料夾下的相同檔名的不同檔案分開

需求:一個資料夾下有相同檔名的兩種格式的檔案,且數量相等,我的兩種檔案格式是:jpg和tif.rbox.txt,想要把這兩種檔案分別放到兩個資料夾裡面 例如:將789資料夾下的兩種檔案分別放到456資料夾和000資料夾下(原來的456資料夾和000資料夾是空的) 程式

mongoDB資料化為json---Python實現

前提背景 我們知道,mongoDB資料庫表中的一條資料(document)在呈現的時候,很像json。在平時的使用中,有時候會有這樣的需求:我們需要將資料庫中的資料讀出來,並將其傳送(例如ajax請求)到前端頁面去解析呈現。顯然此時,為了更容易解析,我們需要將

慢日誌到數據庫

execute ins com mys spl 轉存 open for mes import re import MySQLdb host=‘10.76.45.7‘ port=3306 user=‘test‘ password=‘test‘ dbName=‘test‘

Java 輸入一行以空格分隔字元作為輸入資料為陣列形式並輸出

用java寫一些演算法題目的時候需要輸入一些資料,像C或者CPP都可以有專用的輸入函式進行輸入,在Java裡需要稍微麻煩一些,具體程式碼如下: import java.util.Scanner; public class Main{ public static void main(

cifar10資料成圖片

#將cifar10轉成圖片 import numpy as np import matplotlib.image as plimg from PIL import Image import pickle as p def load_CIFAR_batch(filename):

mnist資料成圖片

from tensorflow.examples.tutorials.mnist import input_data import scipy.misc import os # 注意儲存的路徑 mnist = input_data.read_data_sets("MNIST_DATA", o

Android Gson json資料double 數值為0.0的問題

今天上午改需求 遇到一個奇葩的Bug   返回的為double 型別  於是在實體類裡寫private double space; get  set略。。。 但是顯示的時候為0.0  於是將實體類的double 改為String 

BIGEMPA如何高程資料成南方CASS的DAT格式

需要的工具 1. BIGEMPA地圖下載器(全能版已授權)   2. Global Mapper 14   第一步:將下載好的高程資料DEM直接拖到global mapper中(如何下載高程DEM?),如下圖所示:  

如何高程資料成南方CASS的DAT格式

釋出時間:2018-01-17 版權: 需要的工具 第一步:將下載好的高程資料DEM直接拖到global mapper中(如何下載高程DEM?),如下圖所示: 第二步:將DEM資料儲存為高程點的文字檔案,如下圖: 點選後,出現下圖:

FFmpeg YUV資料RGB

只要開始初始化一次,結束後釋放就好,中間可以迴圈轉碼  AVFrame *m_pFrameRGB,*m_pFrameYUV; uint8_t *m_rgbBuffer,*m_yuvBuffer; struct SwsContext *m_img_convert_ctx; void i

BIGEMAP 如何高程資料成南方CASS的DAT格式

需要的工具 1. BIGEMPA地圖下載器(全能版已授權)  2. Global Mapper 14   第一步:將下載好的高程資料DEM直接拖到global mapper中如下圖所示:   第二步:將DEM資料儲存為高程點的文字

batch指令碼proto檔案化為js

要進入proto的資料夾 開啟cmd 輸入dir *.proto > aj.text 開啟aj.text 刪除多餘的空行和沒有的行(不含檔名的行) 新建一個demo.bat @Echo Off Setlocal Enabledelayedexpansi

【深度學習框架Caffe學習與應用】第三課 圖片資料化為LMDB資料``

1.將圖片資料轉化為LMDB資料 第一步:建立圖片檔案列表清單,一般為一個txt檔案,一行一張圖片 我在caffe/data/目錄下新建一個test_data的資料夾,裡面放訓練集及資料集

BIGEMAP如何高程資料成南方CASS的DAT格式

1、添加了ArcGis切片快取的conf.xml檔案中的各級比例引數; 2、修改了天地圖四川請求地址; 3、針對向量匯出成csv格式(地理座標)時,可以設定座標格式(度分秒等) 4、增加了通過離線掃碼方式使用下載器,同時可以新增一鍵離線地圖服務; 5、優化了7引數計算的準確性; 6、修改了從工具箱中

Access資料庫資料到MySql資料庫中

目錄 一、Navicat自帶匯入Access(*.mdb)資料的方式 二、藉助ODBC當然Access資料 1. 建立ODBC資料來源 2. 通過Navicat匯入資料 3. 新增鍵等 使用Navicat 8 for MySql來匯入資料,Access是2003版本的

oracle 海量資料插入分割槽表

某普通表T,由於前期設計不當沒有分割槽,如今幾年來的資料量已達9億+, 空間佔用大約350G,線上重定義為分割槽表不現實,故採取申請時間視窗停此表應用,改造為分割槽表。1.建立分割槽表-- Create table 建立分割槽表T_PART,分割槽從14年6月開始。creat