1. 程式人生 > >使用python監控HDFS檔案的增量【優化中】

使用python監控HDFS檔案的增量【優化中】

目錄

1、需求和步驟

2、專案結構

3、專案程式碼

       3.1建表語句 hdfs_Ctreate_table

       3.2刪除檔案記錄 hdfs_delete_file_record.py

       3.3檔案路徑的小時監控 hdfs_path_Monitor.py

       3.4檔案路徑的天監控 hdfs_path_Monitor_day.py

       3.5檔案大小記錄 hdfs_size.py

       3.6mysql連線資訊

       3.7mysql工具類 mysqlHelper.py

       3.8工具類utils.py

4、結果展示

————————————————————————————-

1、需求和步驟

需求:
1、獲取HDFS每個資料夾每小時和每天的增量
2、定時刪除HDFS任務歷史記錄

需求1步驟:
1、獲取到HDFS所有檔名稱,參考:hdfs_size.py
2、採用遞迴獲取檔案大小,參考:hdfs_path_Monitor.py
3、每天計算檔案大小,參考:hdfs_path_Monitor_day.py

需求2步驟:
1、查一個月前的所有檔案儲存到hdfs_delete_file_record中,標記狀態為0
2、通過mysql查詢標記為0的資料,獲取對應路徑
3、通過hadoop fs -rm -r -skipTrash + path 進行刪除
4、刪除之後標記為1

2、專案結構

這裡寫圖片描述

3、專案程式碼

3.1建表語句 hdfs_Ctreate_table


 CREATE TABLE hdfs_delete_file_record(
     date VARCHAR(25) not null,
     p_name VARCHAR(255) not null,
     status VARCHAR(5) not null,
     primary key (date,p_name)
 )ENGINE=InnoDB DEFAULT CHARSET=utf8;

 CREATE TABLE hdfs_path_Monitor(
     id INT
not NULL, p_id INT not NULL, p_name VARCHAR(255) not null, date VARCHAR(25) not null, dir_kb float not null, hour_increase float not null, day_increase float not null, primary key (id,p_id,p_name,date) )ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE hdfs_path_Monitor_temp( id INT not NULL, p_id INT not NULL, p_name VARCHAR(255) not null, date VARCHAR(25) not null, dir_kb float not null, hour_increase float not null, day_increase float not null, primary key (id,p_id,p_name,date) )ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE hdfs_size( id INT not NULL, p_name VARCHAR(255) not null, date VARCHAR(25) not null, dir_byte float not null, primary key (id,p_name,date) )ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.2刪除檔案記錄 hdfs_delete_file_record.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

'''
1、查一個月前的所有檔案儲存到hdfs_delete_file_record中,標記狀態為0
2、通過mysql查詢標記為0的資料,獲取對應路徑
3、通過hadoop fs -rm -r -skipTrash + path 進行刪除
4、刪除之後標記為1
'''

import os
import time
import utils
import mysqlHelper

'''30天前的時間'''
monthAgo = str(utils.getBeforeDay(30))[0:16]

'''歷史路徑的列表'''
L = []

# hadoop fs -rm -r -skipTrash   不進入回收站
def cleanOldLog():
    values=mysqlHelper.mysqlSearch('select * from hdfs_delete_file_record where STATUS=0')
    for line in values:
        os.popen("hadoop fs -rm -r -skipTrash " + line[1])
        print line[1]
        mysqlHelper.mysqlInsert("update hdfs_delete_file_record set STATUS=1 where p_name = '%s'  " % (line[1]))

def cleanApplicationHistory():
    logs = os.popen('hdfs dfs -ls /user/spark/applicationHistory')
    args = []

    mysqlHelper.mysqlInsert("DELETE FROM hdfs_delete_file_record where status = '0'")

    for line in logs:
        lines = line.replace('\n', "").split(" ")
        length = len(lines)

        '''檔案時間'''
        date = str(lines[length - 3]) + " " + str(lines[length - 2])

        if date > monthAgo:
            continue

        '''檔案路徑'''
        filePath = lines[length - 1]
        L.append(filePath)

        sql = "insert into hdfs_delete_file_record (`date`, p_name , status) values (%s, %s, %s)"
        args.append((date, filePath, "0"))

    mysqlHelper.mysqlBatchInset(sql , args)

def main():
    StartTime = time.time()
    cleanApplicationHistory()
    cleanOldLog()
    mysqlHelper.mysqlClose()
    EndTime = time.time()
    print "所需要的時間為  : " + str(EndTime - StartTime)

if __name__ == "__main__":
    main()

3.3檔案路徑的小時監控 hdfs_path_Monitor.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-


'''
備註:使用【樹】的結構記錄父子節點
1、使用遞迴查出每層目錄結構
2、使用字典記錄父節點和子節點
   2.1、使用getHDFSSize()方法,去除路徑下面的檔案避免多次查詢
   2.2、規定層級避免多次查詢
   2.3、符合條件的路徑批量插入到mysql中
   2.4、符合條件的路徑進入List中,通過遞迴的方法繼續獲取路徑
3、每小時資料進入mysql後,通過hourComputer()方法進行計算
'''

import os
import time
import mysqlHelper
import utils

'''全域性變數系統時間 LocalTime 2017-07-28 14:37:47'''
LocalTime = str(utils.getnowTime())[0:16]

BeforeOneHour = str(utils.getBeforeHour(1))[0:16]

FileNames = set()

'''獲取目錄的曾層級'''
hierarchy = 4

'''p_id的字典'''
data_dict = {}

'''堆方法'''


def digui(list):
    if (len(list) == 0) == True:
        return 1
    else:
        gethdfs(list)


'''
有一個細節,當path下面沒有檔案或者path是一個檔案,
則只會放回的引數為本身路徑,所有過濾掉這一層
'''


def gethdfs(pathSet):
    print pathSet
    for path in pathSet:
        L = set()
        lines = os.popen('hdfs dfs -du -h %s ' % path)
        args = []
        for line in lines:
            if (line.replace('\n', "").split(' ').pop() == path) == False:
                num = line.split(' ')[0]
                tags = line.split(' ')[1]
                # 檔案大小
                fileSize = convertSize(num, tags)
                # 檔案路徑
                filePath = line.replace('\n', "").split(' ').pop()

                # 判斷是否大於層級
                if len(filePath.split("/")) > hierarchy:
                    continue

                A = set()
                A.add(filePath)
                # 判斷是否是檔案
                if len((A.difference(FileNames))) == 0:
                    continue

                # print A
                # 節點自增ID
                Node_ID = add()
                mysqlTup = (Node_ID, filePath)

                # 通過字典獲取父節點路徑
                data_dict[mysqlTup[1]] = Node_ID
                p_list = mysqlTup[1].split('/')
                # print "p_list ----->", type(p_list), p_list
                length = len(p_list)
                p_str = '/'.join(p_list[:length - 1]) if isinstance(p_list, list) and len(p_list) > 0 else None
                # print "p_str ----->", p_str
                p_id = data_dict.get(p_str, 1)

                global sql
                # 批量插入語句
                sql = "insert into hdfs_path_Monitor (id, p_id, p_name, `date`, dir_kb, hour_increase, day_increase) values (%s,%s, %s, %s, %s, %s, %s)"
                args.append((mysqlTup[0], p_id, mysqlTup[1], LocalTime, fileSize, '0', '0'))
                L.add(line.replace('\n', "").split(' ').pop())
            else:
                return 1

        # 批量語句的執行
        mysqlHelper.mysqlBatchInset(sql, args)

        if digui(L) == 1:
            continue


'''自增函式'''


def add(x=1):
    try:
        add.sum += x
    except AttributeError:
        add.sum = x

    return add.sum


def convertSize(num, tags):
    totals = 0
    if tags == "G":
        totals = float(str(num)) * 1024 * 1024
    elif tags == "M":
        totals = float(num) * 1024
    elif tags == "K":
        totals = float(num)
    return totals


'''獲取所有檔名稱列表'''


def getHDFSSize():
    results = mysqlHelper.mysqlSearch("select * from hdfs_size")
    for row in results:
        p_name = row[1]
        # 列印結果
        FileNames.add(p_name)


'''每小時計算'''


def hourComputer():
    ''' 每個小時增長計算方法,結果資料插入到臨時表 '''
    sql = '''
               INSERT INTO hdfs_path_Monitor_temp(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
               select h3.id as id, h3.p_id as p_id, h3.p_name as p_name,h3.date as date, h3.dir_kb as dir_kb,ABS((h3.dir_kb - h2.dir_kb)) as
               hour_increase,h3.day_increase as day_increase
               from hdfs_path_Monitor h2
               RIGHT JOIN hdfs_path_Monitor h3 on h3.p_name = h2.p_name
               where h2.date=\'%s\' and h3.date=\'%s\'
          ''' % (BeforeOneHour, LocalTime)

    mysqlHelper.mysqlInsert(sql)

    '''刪除hdfs_path_Monitor中為當前時間的資料'''
    delete_Monitor_table = "DELETE from hdfs_path_Monitor where date=\'%s\'" % (LocalTime)

    mysqlHelper.mysqlInsert(delete_Monitor_table)

    '''將hdfs_path_Monitor_temp表中的資料插入到hdfs_path_Monitor'''
    insert_Monitor_table = '''
               INSERT INTO hdfs_path_Monitor(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
               SELECT *
               FROM hdfs_path_Monitor_temp
    '''
    mysqlHelper.mysqlInsert(insert_Monitor_table)

    '''刪除臨時表的資料'''
    insert_Temp_table = "DELETE from hdfs_path_Monitor_temp where date=\'%s\'" % (LocalTime)

    mysqlHelper.mysqlInsert(insert_Temp_table)


def main():
    if LocalTime[11:16] == "00:00":
        exit()

    StartTime = time.time()
    path = "/"
    pathSet = set()
    pathSet.add(path)

    mysqlHelper.mysqlInsert("delete from hdfs_path_Monitor where DATE =\'%s\';" % (LocalTime))
    mysqlHelper.mysqlInsert("insert into hdfs_path_Monitor values(%s, %s, \'%s\', \'%s\', %s, %s, %s)" % (
        add(), 0, path, LocalTime, 0.00, 0.00, 0.00))
    getHDFSSize()
    gethdfs(pathSet)
    hourComputer()
    mysqlHelper.mysqlClose()
    EndTime = time.time()

    print "所需要的時間為  : " + str(EndTime - StartTime) + "插入的數量為 :" + str(add() - 1)


if __name__ == "__main__":
    main()

3.4檔案路徑的天監控 hdfs_path_Monitor_day.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-


import os
import time
import mysqlHelper
import utils

'''全域性變數系統時間 LocalTime 2017-07-28 14:37:47'''
LocalTime = str(utils.getnowTime())[0:16]

BeforeOneDay = str(utils.getBeforeHour(24))[0:16]

def hourComputer():

    sql = '''
               INSERT INTO hdfs_path_Monitor_temp(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
               select h3.id as id, h3.p_id as p_id, h3.p_name as p_name,h3.date as date, h3.dir_kb as dir_kb,h3.hour_increase as
               hour_increase,ABS((h3.hour_increase - h2.hour_increase)) as day_increase
               from hdfs_path_Monitor h2
               RIGHT JOIN hdfs_path_Monitor h3 on h3.p_name = h2.p_name
               where h2.date=\'%s\' and h3.date=\'%s\'
          ''' %( BeforeOneDay , LocalTime )

    mysqlHelper.mysqlInsert(sql)

    delete_Monitor_table = "DELETE from hdfs_path_Monitor where date=\'%s\'" %(LocalTime)

    mysqlHelper.mysqlInsert(delete_Monitor_table)

    insert_Monitor_table = '''
               INSERT INTO hdfs_path_Monitor(id,p_id,p_name,date,dir_kb,hour_increase,day_increase)
               SELECT *
               FROM hdfs_path_Monitor_temp
    '''
    mysqlHelper.mysqlInsert(insert_Monitor_table)

    insert_Temp_table = "DELETE from hdfs_path_Monitor_temp where date=\'%s\'" %(LocalTime)

    mysqlHelper.mysqlInsert(insert_Temp_table)

def main():
    StartTime = time.time()
    hourComputer()
    mysqlHelper.mysqlClose()
    EndTime = time.time()
    print "所需要的時間為  : " + str(EndTime - StartTime)

if __name__ == "__main__":
    main()

3.5檔案大小記錄 hdfs_size.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import os
import time
import mysqlHelper
import utils

'''
1、通過命令(hdfs dfs -ls -R /)獲取到所有檔案的全部內容
2、將檔案按條處理,空格切分,第一個欄位的第一個字元為橫杆‘-’,則為檔案
3、通過mysql批量插入
'''

LocalTime = utils.getnowTime()

def gethdfsSize():
    lines = os.popen('hdfs dfs -ls -R / ')
    args = []

    mysqlHelper.mysqlInsert("delete from hdfs_size")
    for line in lines:
        str = line.replace('\n', "").split(' ')
        filePath = str.pop()
        dir_byte = str[len(str) - 3]

        sql = "insert into hdfs_size (id, p_name, `date`, dir_byte) values (%s, %s, %s, %s)"
        if ((str[0][0] == "-") == True):
            args.append((add(), filePath, LocalTime, dir_byte))

    mysqlHelper.mysqlBatchInset(sql, args)


def add(x=1):
    try:
        add.sum += x
    except AttributeError:
        add.sum = x

    return add.sum


def main():
    StartTime = time.time()
    gethdfsSize()
    mysqlHelper.mysqlClose()
    EndTime = time.time()
    print "所需要的時間為  : " + str(EndTime - StartTime) + "插入的數量為 :" + str(add() - 1)


if __name__ == "__main__":
    main()

3.6mysql連線資訊

# database source
[downdb]
host = xxx.xxx.xxx.xxx
port = 3306
user = funnel
pass = [email protected]<2wsx
dbName = user_privileges


[ondb]
host = xxx.xxx.xxx.xxx
port = 3306
user = funnel
pass = [email protected]<2wsx
dbName = bi_data

3.7mysql工具類 mysqlHelper.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-

import ConfigParser
import codecs
import pymysql

DBSource = "ondb"

cp = ConfigParser.SafeConfigParser()
with codecs.open('myapp.conf', 'r', encoding='utf-8') as f:
    cp.readfp(f)
'''mysql的連線'''
conn = pymysql.connect(host=cp.get(DBSource, 'host'), user=cp.get(DBSource, 'user'), password=cp.get(DBSource, 'pass'),
                       database=cp.get(DBSource, 'dbName'), use_unicode=True)
'''mysql的遊標'''
cursor = conn.cursor()


def mysqlInsert(sql):
    print "遊標 ---> ", type(cursor), cursor
    print "sql ---> ", type(sql), sql
    cursor.execute(sql)
    conn.commit()


def mysqlSearch(sql):
    cursor.execute(sql)
    return cursor.fetchall()


def mysqlBatchInset(sql, args):
    cursor.executemany(sql, args)
    conn.commit()


def mysqlClose():
    conn.close()


if __name__ == "__main__":

    sql = "select * from hdfs_path_Monitor limit 10 "

    # 使用 fetchone() 方法獲取一條資料庫。
    values = mysqlSearch(sql)

    for line in values:
        print line

    mysqlClose()

3.8 工具類utils.py

#!/usr/bin/python
# -*- coding: UTF-8 -*-
import time
from datetime import datetime, timedelta


def getBeforeDay(num):
    return str(datetime.now() + timedelta(days=-num))[0:19]


def getLastDay(num):
    return str(datetime.now() + timedelta(days=num))[0:19]


def getBeforeHour(num):
    return str(datetime.now() + timedelta(hours=-num))[0:19]


def getLastHour(num):
    return str(datetime.now() + timedelta(hours=num))[0:19]


def getnowTime():
    return str(datetime.now())[0:19]


if __name__ == "__main__":
    print str(getBeforeHour(1))[0:16]

4、結果展示

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

       如果您喜歡我寫的博文,讀後覺得收穫很大,不妨小額贊助我一下,讓我有動力繼續寫出高質量的博文,感謝您的讚賞!!!

相關推薦

使用python監控HDFS檔案增量優化

目錄 1、需求和步驟 2、專案結構 3、專案程式碼        3.1建表語句 hdfs_Ctreate_table        3.2刪除檔案記錄 hdfs_delete_file_record.py        3.3檔案路徑的小時監控

python 模擬casio復數計算器施工

定義 cas 處理 施工 取模 你會 運算 問題 數字 1. 前期準備 對於括號有這樣的規則: ——只有( 可以多於) 的個數,此時在行尾補) 。 ——取模|...|不能嵌套。(在casio中沒有這個問題,因為每按一次取模鍵,你會得到兩個|,所以可以定義他們的大小) 計

優化Java檔案上傳資料庫(並儲存本地)、word轉pdf並進行頁面預覽

上一篇檔案上傳【點選跳轉】,是將路徑等檔案資訊存進log_file臨時表,內容二進位制存入資料庫Test表,這種邏輯是在呼叫資料庫表Test內容展示時,判斷檔案為word(轉換成pdf)還是pdf(直接展示)。 上一篇連結:連結地址。 下面進一步優化: 具體邏輯

spark streaming監控HDFS檔案目錄

叢集環境:CDH5.8.0 / spark1.6.0 / scala2.10.4基於Scala的基本使用方式如下:package com.egridcloud.sparkstreaming import org.apache.hadoop.conf.Configuratio

python讀取HDFS檔案

浪費了“黃金五年”的Java程式設計師,還有救嗎? >>>   

斯坦福tensorflow課程 課後作業代碼更新

solution lob cor exc ons build ops 代碼 eth Assignment1   Problem1:     Ops excercises   Problem2:     Task1:Improving the accuracy of our

20171025alert(1) to win 腳本渲染自建

rip doc function 最簡 turn cti fire func cape 遊戲誤人生,一下午玩了將近四個小時的三國殺,後悔不已,然後重新拾起xss challenge,突發奇想,自己構建渲染後的html。 從最簡單的開始。 自動檢測htm

CSS - 移動端 常見小bug整理與解決方法總結更新

mic ros class clas 問題 像素 css strong 常見問題 常見問題總結與整理系列~ 1. border一像素在手機上看著有點粗的問題: 原理是因為:1px在手機上是使用2dp進行渲染的 換成 border: 0.5像素?是不行的!

20171115nmap 使用腳本爆破telnet密碼

pts style logs images lan clas ip add bbbb 提升 今天老黑走出了低谷,設置了懲罰機制後效率提升了很多,現在寫一個使用nmap檢測目標主機漏洞和利用漏洞的文章,話不多說,直接開始! 0x01:環境介紹   主機A:系統 - k

《Linux運維架構師課程 - 門徒班》招生

linux運維課程簡介 阿良的課程內容主要以企業核心技術為講解對象,避免過多在企業中很少用的技術,從而減少學習負擔,這樣就可以把精力主要花費在更重要的技術上, 而不像其他培訓機構那樣,講很多高大上的技術名詞,其中可能50%的知識在工作中都用不到,學員抓不住重點,時間長了就忘了。 所以,阿良的教學模

《從Docker到Kubernetes企業應用實戰課程 - 集訓班》招生

docker課程簡介 本課程是一個Docker技術集訓班,實戰為主,幫助你快速掌握這門主流的技術,能勝任Docker相關工作,同時為簡歷添上靚麗一筆。 Docker是一個開源的應用容器引擎,將項目及依賴打包到一個容器中,然後發布到任意Linux服務器上。 Docker主要特點:開箱即用,快速

圖()小白專場: 哈利·波特的考試

題目: 這門課是用魔咒將一種動物變成另一種動物。例如將貓變成老鼠的魔咒是haha,將老鼠變成魚的魔咒是hehe,把貓變成魚,魔咒lalala。反方向變化的魔咒就是簡單地將原來的魔咒倒過來念,例如ahah可以將老鼠變成貓。 只允許帶一隻動物,考察把這隻動物變成任意一隻指定動物的本事。於是

圖()最短路徑問題

1、最短路徑問題的抽象 在網路中,求兩個不同頂點之間的所有路徑中,邊的權值之和最小的那一條路徑 這條路徑就是兩點之間的最短路徑(Shortest Path) 第一個頂點為源點(Source) 最後一個頂點為終點(Destination)

Python 基本資料型別之編碼問題

content 字符集 常見的字符集舉例 位元組和字串之間的轉換 編碼問題 1. 字符集 a="A" a=b"A" # 計算機中儲存只是0101二進位制程式碼 # 字符集: 一堆字元的集合,用來制定當前的字元對映成計算機中儲存的ascii規則

Python 基本資料型別之格式化問題

content **%**進行格式化 format進行格式化 f常量進行格式化 1. %進行格式化 # 有幾個%佔位,後面就跟著幾個變數或者值,按照順序對應 # 在python2的時候使用的比較多 # %s-----str(變數) # %d %f

android全面屏顯示不全解決方案更新...

一、宣告最大螢幕縱橫比(官方適配方案) Android官方提供了適配方案,即提高App所支援的最大螢幕縱橫比,實現很簡單,在AndroidManifest.xml中可做如下配置: <meta-data android:name="android.max_aspect"

AJAX和from-上傳檔案示例django專案

專案簡述 本Django專案為測試例項專案,用於學習測試。 分別用三種Django檔案上傳方式(form方式、jQuery+jQuery.ajax方式、原生JS+原生ajax方式)做上傳功能示例 檔案檔案釋義 form_upload.htmlform上傳檔案靜態頁面 jquery_ajax_upl

Python&Selenium 資料驅動unittest+ddt

一、摘要 本博文將介紹Python和Selenium做自動化測試的時候,基於unittest框架,藉助ddt實現資料驅動 二、測試程式碼 # encoding = utf-8 """ __title__ = '' __author__ = 'davieyang' __mtime__ = '2018

人臉識別,解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取 人臉識別解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取

原 【人臉識別】解析MS-Celeb-1M人臉資料集及FaceImageCroppedWithAlignment.tsv檔案提取 2018年09月19日 13:11:54

ctf sql注入關鍵詞繞過積累

                寫在前面:這個部落格知識點來源於個人ctf練習比賽中積累的知識點及網路中各個部落格的總結點,這裡做測試和記錄0x00 sql注入理解    SQL注入能使攻擊者繞過認證機制,完全控制遠端伺服器上的資料庫。 SQL是結構化查詢語言的簡稱,它是訪問資料庫的事實標準。目前,大多數We