1. 程式人生 > >Tkinter開發采用串口通信的上位機軟件(3)

Tkinter開發采用串口通信的上位機軟件(3)

main daemon mon 功能 creat 使用 進行 結果 sid

本博客的所有原創文章采用創作公用版協議。要求署名、非商業用途和保持一致。要求署名必須包含我的網名(geokai)以及文章來源(選擇博客首地址或者具體博文地址)。

商業性使用須預先征得本人同意(發送Email到 [email protected]

18年下半年太忙了,直接停止軟件的開發計劃。在18年最後幾天使用python自帶的Tkinter框架簡單的先把軟件功能實現出來了。占時把這一期的標題改成Tkinter開發上位機軟件。

先說一下軟件實現的功能把

1)獲取二氧化碳傳感器探頭的數據,使用到pyserial,crcmod庫

2)使用matplolib進行實時繪圖,使用到matplotlib庫

3)定時將數據回傳到郵箱,使用到email,smtplib庫

這裏只放出最核心部分的代碼

導入關鍵的庫

#導入數值GUI框架
import tkinter as tk
from tkinter import scrolledtext 

#導入繪圖包
from matplotlib.backends.backend_tkagg import (
    FigureCanvasTkAgg, NavigationToolbar2Tk)
# Implement the default Matplotlib key bindings.
#from matplotlib.backend_bases import key_press_handler
from matplotlib.figure import Figure
import matplotlib.dates as mdates

#導入數學計算包
import pandas as pd
import numpy as np

#導入系統包
import threading
import time
from datetime import datetime,timedelta
import serial.tools.list_ports 
import crcmod

#導入網絡包,郵件發送
import smtplib  
from email.mime.multipart import MIMEMultipart 
from email.mime.text import MIMEText  
from email.mime.image import MIMEImage 
from email.header import Header 

GUI框架,其中包括繪圖部分

#定義GUI界面及功能
class Application(tk.Tk):

    def __init__(self):
        ‘‘‘初始化‘‘‘
     
        self.createWidgets()

    def createWidgets(self):
        ‘‘‘設置繪圖區‘‘‘
        self.fig = Figure(figsize=(10,5), dpi=100)
        self.ax_co2 = self.fig.add_subplot(1,1,1)
        self.canvas = FigureCanvasTkAgg(self.fig, master=self)
        self.canvas.get_tk_widget()
        self.canvas._tkcanvas.place(x=0, y=0, width=1024, height=350)#pack(side=tk.TOP, fill=tk.BOTH, expand=1)



        ‘‘‘設置二氧化碳的數據接收區‘‘‘
        self.log_co2 = scrolledtext.ScrolledText(self, font=("Calibri", 8), background=‘#ffffff‘)
        self.log_co2.place(x=720, y=450, width=300, height=60)   
        self.log_co2.insert(tk.END,‘Strat\r\n‘)

        self.log_co2_neat = scrolledtext.ScrolledText(self, font=("Calibri", 10), background=‘#ffffff‘)
        self.log_co2_neat.place(x=720, y=530, width=300, height=100)   
        self.log_co2_neat.insert(tk.END,‘Strat\r\n‘)

        self.log_co2_col_name=tk.Label(bg=‘gray‘, font=("Calibri", 10),  justify=tk.LEFT, anchor=tk.W, 
            text=‘Time\t\tCO2(ppm)‘)
        self.log_co2_col_name.place(x=720, y=510, width=300, height=20)



        
        ‘‘‘設置按鈕區‘‘‘    
        self.bt_connect_str = tk.StringVar()
        if self.trans_data_status==False:
            self.bt_connect_str.set(‘開始傳輸數據‘)
        else:
            self.bt_connect_str.set(‘停止傳輸數據‘)
        self.bt_connect = tk.Button(self, textvariable=self.bt_connect_str, command=self.ActivateTrans)
        self.bt_connect.pack(side=tk.LEFT , anchor=tk.S)  
        self.Draw() # 繪圖


        ‘‘‘設置狀態區‘‘‘
        self.lb_port_co2_status=tk.Label(bg=‘red‘, width=10, height=1, text=‘CO2 Status‘)
        self.lb_port_co2_status.pack(side=tk.RIGHT , anchor=tk.S)
        

        

    def ActivateTrans(self):
        ‘‘‘
        點擊數據傳輸按鈕後激活數據傳輸
        1)激活一個從串口獲取二氧化碳數據的線程
        3)激活郵件發送線程
        4)運行以上三個線程,並判斷是否正確連接串口,並顯示串口連接狀態
        ***註意以上三個線程的功能較為復雜,使用了單獨的Thread類進行了繼承,因此停止線程
            采用定義在類裏面的Stop()方法
        5)激活文本數據刷新線程
        6)激活繪圖區刷新線程
        ***以上兩個線程僅有單獨的函數並且封裝在窗體類下,直接采用Threading類進行定義,所
            以需要註意停止需采用threading.Event()方法進行停止
        ‘‘‘
        if self.trans_data_status==False:
            self.trans_thread_co2 = Thread_CO2(0, "Thread_CO2_1")
            self.trans_thread_co2.setDaemon(True)
            self.trans_thread_co2.start()


            if self.trans_thread_co2.port_available==True:
                self.lb_port_co2_status.config(bg=‘green‘)
                print(‘CO2 Port Failed to Connect‘)


            
            self.email_thread_event = threading.Event()
            self.email_thread = threading.Thread(target = SendEmail, args=(EMAIL_RESEND_INTERVAL, self.email_thread_event))
            self.email_thread.start()

            self.refresh_thread_event = threading.Event()
            self.refresh_thread = threading.Thread(target = self.RefreshThread, args=(1, self.refresh_thread_event))
            self.refresh_thread.start()

            self.redraw_thread_event = threading.Event()
            self.redraw_thread = threading.Thread(target = self.ReDrawThread, args=(10, self.redraw_thread_event))
            self.redraw_thread.start()

            self.trans_data_status=True
            self.bt_connect_str.set(‘停止傳輸數據‘)
        else:
            self.trans_thread_co2.Stop()
            self.lb_port_co2_status.config(bg=‘red‘)
            self.lb_port_hg_status.config(bg=‘red‘)
            self.trans_data_status=False
            self.bt_connect_str.set(‘開始傳輸數據‘)

            self.email_thread_event.set()
            self.refresh_thread_event.set()
            self.redraw_thread_event.set()
            # self.email_thread.join(0)

    def RefreshThread(self, time_interval, stop_event):
        ‘‘‘
        原始數據刷新程序
        ‘‘‘
        while(not stop_event.is_set() ):
            print(‘refresh‘)
            self.RefreshDataText()
            #pinrt(time_interval)
            time.sleep(time_interval)
    
    def RefreshDataText(self):
        ‘‘‘
        判斷是否有新的數據並顯示在文本框中
        
        ‘‘‘
        text = self.log_co2.get(0.0, tk.END).splitlines()
        # print(raw_trans_data_co2)
        if len(raw_trans_data_co2)>0 :
            if raw_trans_data_co2[-1]==text[-2]:
                pass
            else:
                self.log_co2.insert(tk.END, raw_trans_data_co2[-1]+‘\r\n‘)
        self.log_co2.see(tk.END)
        #print(np.array(compiled_data_co2.iloc[-1]))

        if len(compiled_data_co2)>0:
            one_data = np.array(compiled_data_co2.iloc[-1])
            #print(one_data)
            one_data = str(one_data[0]) + ‘\t\t‘ + str(one_data[1])       
            text = self.log_co2_neat.get(0.0, tk.END).splitlines()
            if one_data==text[-2]:
                print(‘same‘)
            else:
                self.log_co2_neat.insert(tk.END, one_data + ‘\r\n‘)
            self.log_co2_neat.see(tk.END)



    def AdjustScale(self,_):
        ‘‘‘
        調整繪圖區坐標軸範圍
        ‘‘‘

    def ReDrawThread(self, time_interval, stop_event):
        ‘‘‘
        繪圖區刷新程序
        ‘‘‘
        while(not stop_event.is_set()):
            try:
                self.Draw()
            except:
                pass
            time.sleep(time_interval)


    def Draw(self):
        ‘‘‘
        實時繪圖程序
        TODO:
        1)個人認為使用matplotlib的這種繪圖方式效率有些底下,是否采用諸如Animation的
            動態繪圖功能改善繪圖性能有待檢驗
        ‘‘‘
        #判斷是否有有效數據
        if len(compiled_data_co2)>0 or len(compiled_data_hg)>0:               
            #由於二氧化碳數據量太大,選擇最後16000條數據,足夠保證最大3天的顯示量,降低繪圖負擔
            #註意原始數據中時間數據最好經過to_datetime函數規整一遍,以免造成數據錯誤
            co2_x_data = pd.to_datetime(compiled_data_co2.iloc[-16000:,0])
            co2_y_data = compiled_data_co2.iloc[-16000:,1]
            # co2_xlim_min = datetime.strptime(co2_x_data.iloc[-1], ‘%Y-%m-%d %H:%M:%S‘) - self.fig_xlim_delta
            co2_xlim_min = co2_x_data.iloc[-1] - self.fig_xlim_delta
            print( co2_y_data.min())

            self.ax_co2.clear()
            self.ax_co2.xaxis.set_major_formatter(mdates.DateFormatter(‘%m-%d\n%H:%M‘))
            self.ax_co2.xaxis.set_major_locator(mdates.AutoDateLocator())
            self.ax_co2.scatter(co2_x_data.values,co2_y_data.values, s = 1, c=‘green‘)
            self.ax_co2.set_xlim(co2_xlim_min,co2_x_data.iloc[-1]+ self.fig_xlim_delta/9)
            self.ax_co2.set_ylim(co2_y_data.min(), co2_y_data.min()+self.fig_co2_ylim_delta)
            self.ax_co2.set_ylabel(‘$CO_2(ppm)$‘)
            self.ax_co2.grid(linestyle=‘--‘)
            
          
            self.fig.savefig(‘D:/figure.png‘)


            self.canvas.draw()

    
    def _quit(self):
        ‘‘‘退出‘‘‘
        if self.trans_data_status==True:
            self.ActivateTrans()    
            
        self.quit()     # 停止 mainloop
        self.destroy()  # 銷毀所有部件

  

二氧化碳數據的傳輸

class Thread_CO2 (threading.Thread):
    ‘‘‘
    接收CO2數據的線程
    該CO2探頭的購買鏈接https://m.tb.cn/h.3phrPcr

    ‘‘‘
    def __init__(self, threadID, name):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.port_available = False

        com_list = serial.tools.list_ports.comports()
        for port in com_list:
            print(port.device) #返回端口號如COM3
            print(port.description) #返回設備名字
            print(port.pid) #返回設備在計算機上的位置         
            if port.pid==29987:
                port_num_co2=port.device
                self.port_available = True

        self.ser_co2=serial.Serial()
        if self.port_available==True:
            self.ser_co2.port=port_num_co2
            self.ser_co2.baudrate=19200
            self.ser_co2.parity=serial.PARITY_EVEN
            self.ser_co2.timeout=0.5

        self.__runing_flag=True

    def run(self):
        print ("開始線程:" + self.name)
        if self.port_available==False:
            if DEBUG_MODE==True:
                while(self.__runing_flag):
                    self.FakeData()
                    time.sleep(1)
            return 1
        if not self.ser_co2.is_open:
            self.ser_co2.open()
        print(self.ser_co2)

        while(self.__runing_flag):
            self.GetData()
            time.sleep(CO2_REFRESH_INTERVAL)
        
    def GetData(self):
        ‘‘‘
        獲取CO2數據需要註意進制的轉換,以及最終的CRC16校驗
        CRC的校驗使用CRCMOD庫,不同類型的CRC均可以采用此庫進行計算
        其中特別要關註poly這個參數,參考http://www.ip33.com/crc.html
        在該網站查詢CRC多項式,並在開頭補1
        ‘‘‘
        retry_time= 10
        while(retry_time):
            request_code_co2=[]
            #首先配置需要發送的信息,Serial庫接收直接以0-255的int值
            #因此需要將16進制字符串轉換為10進制整數
            for i in ‘15 04 13 8B 00 01 46 70 ‘.split():
                request_code_co2.append(int(i,16))
            # print(request_code_co2)
            if not self.ser_co2.is_open:
                self.ser_co2.open()
            self.ser_co2.write(request_code_co2)
            #獲取的格式為b‘‘,byte型
            temp =self.ser_co2.readline()
            #print(temp)

            #定義CRC,並計算CRC
            crc16_func = crcmod.Crc(poly=0x18005, rev=True, initCrc=0xFFFF, xorOut=0x0000)
            crc16_func.update(temp[:-2])
            
            co2_conc=[]

            #CRC的計算結果為hex型,采用bytes.fromhex()轉換為byte再與傳輸的最後兩位byte對比,註意順序
            #如果獲取成功就退出,沒有成功則重復,最多10次
            #TODO:如果多詞未獲取成功,未來需要加入一個錯誤信息日誌
            if bytes.fromhex(crc16_func.hexdigest()) == temp[-2:][::-1]:
                print(temp[-4:-2])
                co2_conc = (int.from_bytes(temp[-4:-2], byteorder=‘big‘, signed=False))
                break
            retry_time-=1
        
        #raw_trans_data_co2用來顯示文本信息,需要將DateTime和獲取的16進制值轉換成str類型,否則文本框無法顯示
        #TODO:這個語句應該可以優化
        co2_one_data_raw = str([temp.hex()[x*2:x*2+2] for x in range(len(temp.hex())//2) ])
        co2_one_data_time = (datetime.now().strftime(‘%Y-%m-%d %H:%M:%S‘))
        raw_trans_data_co2.append(str(co2_one_data_time) + co2_one_data_raw)
        try:
            if co2_conc>=0:
                compiled_data_co2.loc[len(compiled_data_co2)] = [co2_one_data_time, co2_conc]
        except:
            pass
        with open(‘D:/raw_data_CO2.txt‘, mode=‘a‘) as f:
            f.write(str(co2_one_data_time))
            f.write(‘\t‘)
            f.write(str(co2_conc))
            f.write(‘\r\n‘)
            
        #print(compiled_data_co2)
    
    def FakeData(self):
        #產生偽數據


    def Stop(self):
        if self.ser_co2.is_open and self.port_available==True:
            self.ser_co2.close()
        print(self.ser_co2)
        self.__runing_flag=False

  

Tkinter開發采用串口通信的上位機軟件(3)