1. 程式人生 > >python學習之模塊:xlsxwriter (二)

python學習之模塊:xlsxwriter (二)

代碼 add odin 列表 readlines 生成圖表 excel文件 sel title

實戰訓練: 讀取日誌文件中的時間,CPU利用率,生成直線圖

輸入: ********.log

代碼如下:

import gzip
import os
import re
import sys
import time
import xlsxwriter


# 定義數據字典容器
data_set = {
    time: [],                # 橫軸時間坐標
    cpu_usage: [],           # CPU使用率
    "memory_usage": [],        # 物理內存使用率
    "virtual_memory": []       #
虛擬內存使用率 } items_set = { 1: "CPU Usage", 2: "Memory Usage", 3: Virtual Memory, } def parse_vmstat(content): ‘‘‘ 解析 服務器資源使用率 vmstat :param content: :return: ‘‘‘ # vmstat[主機名, CPU占用率, 物理內存占用率, 物理內存容量(MB), 可用物理內存(MB), {虛擬內存占有率},虛擬內存容量(MB), 可用虛擬內存(MB)] #
I060:9-10 01:49:48.149(32499|32991)vmstat[SZX1000331725, 3, 76, 31587, 7714, 70, 35681, 10791] try: # 此處for循環的作用就是循環解析,把每一行的 時間、CPU使用率、物理內存使用率、虛擬內存使用率 都寫入 全局的 date_set 數據容器 for line in content: if -1 != line.find("vmstat"): # 此行是服務器內存使用信息 result = re.findall(r
\[.*\], line) a = "" values = [] cpu = 0 memory = 0 virtual_memory = 0 if len(result) > 0: temp = result[0] a = temp[1:-1] values = a.split(,) cpu = int(values[1].strip()) memory = int(values[2].strip()) if len(values) > 7: virtual_memory = int(values[5].strip()) # 解析時間 pos_start = line.find(:) pos_end = line.find(".") time = line[pos_start + 1:pos_end] data_set[time].append(time) data_set[cpu_usage].append(cpu) data_set[memory_usage].append(memory) data_set[virtual_memory].append(virtual_memory) except Exception as ex: print("parse_vmstat report error : %s" % ex) def write_excel_grpah(workbook, filename, item): sheet_name = ‘‘ if filename.find(.trace.): sheet_name = "sysmonitor_perf_" + filename[-14:] else: sheet_name = filename try: worksheet = workbook.add_worksheet(sheet_name) except Exception as ex: print("add worksheet failed. file name is %s, error is : %s" % (sheet_name, ex)) # 寫列名 worksheet.write("A1", "Time") worksheet.write("B1", "CPU usage") worksheet.write("C1", "Memory usage") worksheet.write("D1", "Virtual Memory") time_range = "Time range: " + data_set[time][0] + " to " + data_set[time][len(data_set[time])-1] worksheet.write("G1", time_range) worksheet.write_column(A2, data_set[time]) worksheet.write_column(B2, data_set[cpu_usage]) worksheet.write_column(C2, data_set[memory_usage]) worksheet.write_column(D2, data_set[virtual_memory]) try: if 1 == item: # Create a new Chart object. chart_cpu = workbook.add_chart({type: line}) chart_cpu.add_series( {"name": "", categories: = + sheet_name + !$A$2:$A$ + str(len(data_set[time])), values: = + sheet_name + !$B$2:$B$ + str(len(data_set[cpu_usage])), line: {color: "#1874CD", width: 1.5}} ) chart_cpu.set_x_axis({name: "Time"}) chart_cpu.set_y_axis({name: "Value"}) chart_cpu.set_title({name: items_set[item]}) worksheet.insert_chart("G3", chart_cpu, {x_offset: 0, y_offset: 0, x_scale: 5, y_scale: 2}) elif 2 == item: # Create a new Chart object. chart_memory = workbook.add_chart({type: line}) chart_memory.add_series( {"name": "", categories: = + sheet_name + !$A$2:$A$ + str(len(data_set[time])), values: = + sheet_name + !$C$2:$C$ + str(len(data_set[memory_usage])), line: {color: "#1874CD", width: 1.5}} ) chart_memory.set_x_axis({name: "Time"}) chart_memory.set_y_axis({name: "value"}) chart_memory.set_title({name: items_set[item]}) worksheet.insert_chart("G3", chart_memory, {x_offset: 0, y_offset: 0, x_scale: 2, y_scale: 2}) elif 3 == item: # Create a new Chart object. chart_vmemory = workbook.add_chart({type: line}) chart_vmemory.add_series( {"name": "", categories: = + sheet_name + !$A$2:$A$ + str(len(data_set[time])), values: = + sheet_name + !$D$2:$D$ + str(len(data_set[virtual_memory])), line: {color: "#1874CD", width: 1.5}} ) chart_vmemory.set_x_axis({name: "Time"}) chart_vmemory.set_y_axis({name: "value"}) chart_vmemory.set_title({name: items_set[item]}) worksheet.insert_chart("G3", chart_vmemory, {x_offset: 0, y_offset: 0, x_scale: 3, y_scale: 2}) except Exception as ex: print("hand chart report error :" % ex) def compress_gz(file_name): ‘‘‘ 解壓縮 gz 文件 :param file_name: :return: ‘‘‘ # 獲取文件的名稱,去掉 log_file_name = file_name.replace(".gz", "") # 創建gzip對象 g_file = gzip.open(file_name, "rb") # gzip對象用read()打開後,寫入open()建立的文件裏。 open(log_file_name, "wb+").write(g_file.read()) # 關閉gzip對象 g_file.close() def get_all_logfile_list(path): ‘‘‘ 根據傳入的日誌文件路徑,獲取該目錄下所有的文件到內存(包括 .log / .txt/ .trace 等等文件) :param path: 傳入待解析日誌的路徑 :return: 改路徑下所有log文件列表 ‘‘‘ # 將輸入路徑中的所有 \ 替換成 /, 這樣就不會有轉義字符的問題 new_path = "" if -1 != path.find(\\): new_path = path.replace(\\, /) file_list = os.listdir(new_path) log_list = [] for file in file_list: if file.endswith(".gz"): compress_gz(file) log_list.append(file.replace(.gz, ‘‘)) continue if os.path.isfile(new_path + / + file): log_list.append(file) return log_list def parse_log_sysmonitor_perf(path, item): ‘‘‘ 根據傳入的日誌文件路徑,查找並讀取所有的 iMAP.imapsysmonitor_perf 日誌文件 :param path: :return: ‘‘‘ # 設置傳入的路徑為當前的路徑 os.chdir(path) log_list = get_all_logfile_list(path) sysmonitor_perf_list = [] for log_file in log_list: if log_file.startswith("iMAP.imapsysmonitor_perf"): sysmonitor_perf_list.append(log_file) if os.path.isfile(sys_perf_graph.xlsx): os.remove(sys_perf_graph.xlsx) workbook = xlsxwriter.Workbook(sys_perf_graph.xlsx) output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "************begin :" print(output) # 循環打開文件 for file in sysmonitor_perf_list: try: f_log = open(file, "r", encoding=utf-8) except Exception as result: print(result) output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "begin to parse log file : %s" % file print(output) # 讀取文件內容 content = f_log.readlines() # 關閉文件 f_log.close() # 解析 iMAP.imapsysmonitor_perf.trace 文件 parse_vmstat(content) output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "begin to write excel file..." print(output) # 寫數據到Excel文件並生成圖表 write_excel_grpah(workbook, file, item) # 清空全局變量 data_set[time] = [] data_set[cpu_usage] = [] data_set[memory_usage] = [] data_set[virtual_memory] = [] output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "write data to excel file finish." print(output) workbook.close() output = "%4d-%02d-%02d %02d:%02d:%02d: " % time.localtime()[0:6] + "************ finish" print(output) if __name__ == "__main__": path = input("please input log path: ") if os.path.exists(path): # 設置傳入的路徑為當前的路徑 os.chdir(path) print("the directory path is normal") else: print("the directory path does not exist") sys.exit() print("1.cpu_usage") print("2.memory_usage") print("3.virtual_memory") print("4.exit") number = input("please select : ") if number not in (1, 2, 3, 4): print("input wrong, exit") sys.exit() if 4 == number: sys.exit() else: parse_log_sysmonitor_perf(path, number)

程序運行效果:

技術分享圖片

技術分享圖片

python學習之模塊:xlsxwriter (二)