1. 程式人生 > >Python模擬Linux的Crontab, 寫個任務計劃需求

Python模擬Linux的Crontab, 寫個任務計劃需求

cront tst 擴展 sin bec gre etime aps ron

Python模擬Linux的Crontab, 寫個任務計劃需求

來具體點

  

需求:
    執行一個程序, 程序一直是運行狀態, 這裏假設是一個函數

    當程序運行30s的時候, 需要終止程序, 可以用python, c, c++語言實現

    擴展需求:
        當1分鐘以後, 需要重新啟動程序

def process_2():
# 時間幾點執行
# 執行process_1
# 開始監聽--計時
# 當時間超過多少s以後, 強制結束
#(註意進程裏面的任務執行進程是否還存在, 舉個例子, 進程的任務是在創建一個進程, 那強制結束的是任務的進程, 而你創建的進程呢?)
pass

技術分享圖片
#!/usr/bin/env python
# -*- coding=utf-8 -*-


import sys, time, multiprocessing, datetime, os

# -----<自定義Error>----- #
class MyCustomError(Exception):
    def __init__(self, msg=None, retcode=4999):
        self.retcode = int(retcode)
        try:
            if not
msg: msg = "Setting time is less than the current time Error. " except: msg = "Unknown Error!!!" Exception.__init__(self, self.retcode, msg) # -----<格式化時間變成時間戳>----- # class FormatDatetime(): ‘‘‘ Use method: FormatDatetime.become_timestamp("2018-08-21 13:19:00")
‘‘‘ @staticmethod def become_timestamp(dtdt): # 將時間類型轉換成時間戳 if isinstance(dtdt, datetime.datetime): timestamp = time.mktime(dtdt.timetuple()) return timestamp elif isinstance(dtdt, str): if dtdt.split(" ")[1:]: a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d %H:%M:%S") timestamp = time.mktime(a_datetime.timetuple()) else: a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d") timestamp = time.mktime(a_datetime.timetuple()) return timestamp elif isinstance(dtdt, float): return dtdt # -----<計時器>----- # def timer(arg): ‘‘‘ use method: timer(14) :param arg: 倒計時時間 :return: True ‘‘‘ # 倒計時時間 back_time = arg while back_time != 0: sys.stdout.write(\r) ##意思是打印在清空 back_time -= 1 sys.stdout.write("%s" % (int(back_time))) sys.stdout.flush() time.sleep(1) return True # -----<自定義任務函數>----- # class Tasks: @staticmethod def start(): ip1 = "42.93.48.16" ip2 = "192.168.2.141" adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2) os.system(adjure) @staticmethod def stop(): adjure = """netstat -anpo | grep hping3 | awk -F "[ /]+" ‘{print $7}‘ | xargs -i -t kill -9 {}""" os.system(adjure) # -----<任務函數>----- # def slow_worker(): ‘‘‘ 你的自定義任務, 需要執行的代碼 :return: ‘‘‘ print(Starting worker) time.sleep(0.1) print(Finished worker) Tasks.start() # -----<任務計劃執行時間>----- # def crontab_time(custom_time): # custom_time = "2018-08-21 13:44:00" date_time = FormatDatetime.become_timestamp(custom_time) now_time = time.time() # 余數, 有小數 remainder_data = int(date_time - now_time) if remainder_data < 0: raise MyCustomError else: while remainder_data > 0: remainder_data -= 1 time.sleep(1) return True # -----<執行函數>----- # def my_crontab(custom_time=‘‘, frequency=1, countdown=0): ‘‘‘ 幾點幾分執行任務 此函數屬於業務邏輯, 可以考慮再次封裝 custom_time = "2018-08-21 13:19:00" 時間格式必須是這樣 frequency = 1 # 頻次 countdown = 0 # 倒計時多少s :return: ‘‘‘ if custom_time: crontab_time_status = crontab_time(custom_time) if crontab_time_status: for i in range(frequency): p = multiprocessing.Process(target=slow_worker) p.start() if countdown: status = timer(countdown) if status: p.terminate() Tasks.stop() else: for i in range(frequency): p = multiprocessing.Process(target=slow_worker) p.start() if countdown: status = timer(countdown) if status: p.terminate() Tasks.stop() if __name__ == __main__: my_crontab(frequency=1, countdown=50)
View Code

Python模擬Linux的Crontab, 寫個任務計劃需求