1. 程式人生 > >基於無線傳輸的溫溼度採集系統上位機開發

基於無線傳輸的溫溼度採集系統上位機開發

執行環境
Linux系統,Python語言
實現功能
從微控制器串列埠接收採集到的溫度和溼度,將資料存到資料庫中,並實時顯示在折線圖上。
使用軟體
Pycharm
使用python庫
資料庫層:pymysql
資料視覺化層:matplotlib
串列埠通訊層:pyserial

實現流程
在這裡插入圖片描述

串列埠通訊模組

串列埠所在位置:"/dev/ttyUSB0"
波特率:9600
超時時間:timeout=None,由於是實時監測,所以將timeout設定為None,只有資料傳輸過來的時候才執行後面程式。
傳輸位元組:bytesize=8

import random
import serial
import time
from Humiture1.WriteData import Mysqlclass


class Serailset:

    def __init__(self):
        self.mc=Mysqlclass()
        self.ser=serial.Serial("/dev/ttyUSB0",9600,timeout=None,bytesize=8)//初始化配置
	
	#關閉串列埠
    def close(self):
        self.ser.close()
	
	#讀取串列埠資料,每次讀取一行
    def read(self):
        return self.ser.readline()


if __name__ == '__main__':
    ser=Serailset()
    print(ser.insertMysql())
ser.close()

資料庫層
使用pymysql模組與資料庫進行互動,將資料庫操作封裝成一個類,方便操作。

import pymysql as mysql
import time
import random

#連線資料庫
db_config = {
    'host':'localhost',
    'user':'root',
    'passwd':'',
    'db':'Data',
    'port':3306,
    'charset':'utf8'
}

class Mysqlclass:
	
	#初始化,連線資料庫,建立遊標
    def __init__(self):
        try:
            self.db_config=db_config
            self.conn=mysql.connect(**self.db_config)
            self.cur=self.conn.cursor()
        except Exception as e:
            print("連線資料庫失敗")
        else:
            print("連線資料庫成功")
	
	#關閉連線,關閉遊標
    def close(self):
        self.cur.close()
        self.conn.close()



    #往A資料表存入資料
    def insertsql_A(self,list):
        li=[]
        li.append(tuple(list))
        insert_sql='INSERT INTO humitures_A(measuring_time,temperature,humidity) VALUES (%s,%s,%s)'
        self.cur.executemany(insert_sql,li)
        last_id_A=int(self.cur.lastrowid)
        self.conn.commit()
        return last_id_A

#往B資料表存入資料
    def insertsql_B(self, list):
        li = []
        li.append(tuple(list))
        insert_sql = 'INSERT INTO humitures_B(measuring_time,temperature,humidity) VALUES (%s,%s,%s)'
        self.cur.executemany(insert_sql, li)
        last_id_B=int(self.cur.lastrowid)
        self.conn.commit()
        return last_id_B

	#讀取資料表中最大行,方便訪問最新插入的資料
    def read_maxcount(self,site):
        self.cur.execute('select count(*) from humitures_%s' %(site))
        return self.cur.fetchall()[0][0]


    #讀取資料
    def readData_A(self,after):
        time.sleep(0.5)
        self.cur.execute('select * from humitures_A where id between %s and %s' %(after-10,after))
        self.conn.commit()
        return self.cur.fetchall()


    def readData_B(self,after):
        time.sleep(0.5)
        self.cur.execute('select * from humitures_B where id between %s and %s' %(after-10,after))
        self.conn.commit()
        return self.cur.fetchall()

資料視覺化

import matplotlib.pyplot as plt
from Humiture1.WriteData import Mysqlclass
import numpy as np
import time

class PictureShow:
    
    def __init__(self):
        self.mc = Mysqlclass()
        self.fig = plt.figure()
        plt.ion()

    def showA(self,last_id_A):
        ax1=plt.subplot(211)   #實現多幅圖建立
        plt.grid(True)
        after=last_id_A
        tuple=self.mc.readData_A(after)
        x1_datelist = [str(i[1])[11:] for i in tuple]
        y1_tmplist=[i[2] for i in tuple]
        y2_humlist=[i[3] for i in tuple]
    

        ax1.plot(x1_datelist,y1_tmplist,c='red')
        ax1.set_title("Site_A_Temperature")
        ax1.set_ylabel('Temperature(C)', fontsize=16)
        ax2=ax1.twinx()       #繫結ax1和ax2實現雙Y軸折線圖
        ax2.plot(x1_datelist, y2_humlist, c='blue')
        ax2.set_xlabel('Time(s)', fontsize=16)
        ax2.set_ylabel('humiture(%)', fontsize=16)


    def showB(self,last_id_B):
        ax3=plt.subplot(212)
        plt.grid(True)
        after = last_id_B
        tuple=self.mc.readData_B(after)

        x2_datelist = [str(i[1])[11:] for i in tuple]
        y3_tmplist = [i[2] for i in tuple]
        y4_humlist = [i[3] for i in tuple]

        ax3.plot(x2_datelist, y3_tmplist, c='red')
        ax3.set_title("Site_B_Temperature")
        ax3.set_ylabel('Temperature(C)', fontsize=16)

        ax4 = ax3.twinx()
        ax4.plot(x2_datelist, y4_humlist, c='blue')
        ax4.set_xlabel('Time(s)', fontsize=16)
        ax4.set_ylabel('humiture(%)', fontsize=16)
        self.fig.autofmt_xdate()


    def showWindow(self,last_id_A,last_id_B):
        plt.clf()  # 清除畫布
        self.showA(last_id_A)
        self.showB(last_id_B)
        plt.pause(0.001)


主操作
從串列埠中讀取資料,判斷A、B兩地的資料,A地的資訊存入資料庫中的A資料表,B地的資訊存入資料庫中B地的資料表。每插入一條資訊折線圖模組從資料庫中讀取一條最新插入的資料,在折線圖上進行顯式。

import time
from Humiture1.WriteData import Mysqlclass
from Humiture1.showpicture import PictureShow
from Humiture1.Serailset import Serailset

class Manager:
    def __init__(self):
        self.mc=Mysqlclass()
        self.ps=PictureShow()
        self.ser=Serailset()
	

    def insertMysqlandshow(self):
        while True:
            infotuple = []
            info = str(self.ser.read())
            date = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            infotuple.append(date)
            infotuple.append(info[8:10])
            infotuple.append(info[15:17])
            last_id_A = int(self.mc.read_maxcount("A"))
            last_id_B = int(self.mc.read_maxcount("B"))
            if info[2] == "A":
                self.mc.insertsql_A(infotuple)
                self.ps.showWindow(last_id_A, last_id_B)
                print(info)
            elif info[2] == "B":
                self.mc.insertsql_B(infotuple)
                self.ps.showWindow(last_id_A, last_id_B)
                print(info)


if __name__ == '__main__':
    mg=Manager()
    mg.insertMysqlandshow()