1. 程式人生 > >使用Scheduler做Python任務排程

使用Scheduler做Python任務排程

在工作中多少都會涉及到一些定時任務,比如定時郵件提醒等。 本文通過開源專案schedule來學習定時。

任務排程是如何工作的,以及基於此實現一個web版本的提醒工具。

schedule簡介

既然schedule說是給人類使用的作業排程器, 先來看看作者給提供的例子:

import schedule

import time

def job():

print(“I’m working…”)

schedule.every(10).minutes.do(job) (每隔10分鐘執行一次任務)

schedule.every().hour.do(job) (每隔一小時執行一次任務)

schedule.every().day.at(“10:30”).do(job) (每天10:30執行一次任務)

schedule.every().monday.do(job) (每週一的這個時候執行一次任務)

schedule.every().wednesday.at(“13:15”).do(job) (每週三13:15執行一次任務)

while True:

schedule.run_pending()

time.sleep(1)

程式碼確實簡潔明瞭,一讀就懂。 那麼接下來就來學習一下原始碼。

一般情況下分為三個類,原始碼分析就圍繞這三個類:

CancelJob

class CancelJob(object): pass 可以看到就是一個空類, 這個類的作用就是當你的job執行函式返回一個CancelJob型別的物件,那麼執行完後就會被Scheduler移除. 簡單說就是隻會執行一次.

Scheduler

為了使程式碼緊湊,這裡刪除了註釋,剩下也就34行程式碼.

class Scheduler(object): def init(self): self.jobs = [] def run_pending(self): runnable_jobs = (job for job in self.jobs if job.should_run) for job in sorted(runnable_jobs): self._run_job(job) def run_all(self, delay_seconds=0): for job in self.jobs: self._run_job(job) time.sleep(delay_seconds) def clear(self): del self.jobs[:] def cancel_job(self, job): try: self.jobs.remove(job) except ValueError: pass def every(self, interval=1): job = Job(interval) self.jobs.append(job) return job def _run_job(self, job): ret = job.run() if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job) @property def next_run(self): if not self.jobs: return None return min(self.jobs).next_run @property def idle_seconds(self): return (self.next_run - datetime.datetime.now()).total_seconds() Scheduler作用就是在job可以執行的時候執行它. 這裡的函式也都比較簡單:

run_pending:執行所有可以執行的任務 run_all:執行所有任務,不管是否應該執行 clear:刪除所有排程的任務 cancel_job:刪除一個任務 every: 建立一個排程任務, 返回的是一個job _run_job:執行一個job next_run:獲取下一個要執行任務的時間, 這裡使用的是min去得到最近將執行的job, 之所以這樣使用,是Job過載了_lt方法,這樣寫起來確實很簡潔. idle_seconds:還有多少秒即將開始執行任務. Job

Job是整個定時任務的核心. 主要功能就是根據建立Job時的引數,得到下一次執行的時間. 程式碼如下,稍微有點長(會省略部分程式碼,可以看原始碼):

class Job(object): def init(self, interval): self.interval = interval # pause interval * unit between runs self.job_func = None # the job job_func to run self.unit = None # time units, e.g. ‘minutes’, ‘hours’, … self.at_time = None # optional time at which this job runs self.last_run = None # datetime of the last run self.next_run = None # datetime of the next run self.period = None # timedelta between runs, only valid for self.start_day = None # Specific day of the week to start on def lt(self, other): return self.next_run < other.next_run def minute(self): assert self.interval == 1, ‘Use minutes instead of minute’ return self.minutes @property def minutes(self): self.unit = ‘minutes’ return self @property def hour(self): assert self.interval == 1, ‘Use hours instead of hour’ return self.hours @property def hours(self): self.unit = ‘hours’ return self @property def day(self): assert self.interval == 1, ‘Use days instead of day’ return self.days @property def days(self): self.unit = ‘days’ return self @property def week(self): assert self.interval == 1, ‘Use weeks instead of week’ return self.weeks @property def weeks(self): self.unit = ‘weeks’ return self @property def monday(self): assert self.interval == 1, ‘Use mondays instead of monday’ self.start_day = ‘monday’ return self.weeks def at(self, time_str): assert self.unit in (‘days’, ‘hours’) or self.start_day hour, minute = time_str.split(‘:’) minute = int(minute) if self.unit == ‘days’ or self.start_day: hour = int(hour) assert 0 <= hour <= 23 elif self.unit == ‘hours’: hour = 0 assert 0 <= minute <= 59 self.at_time = datetime.time(hour, minute) return self def do(self, job_func, *args, **kwargs): self.job_func = functools.partial(job_func, *args, **kwargs) try: functools.update_wrapper(self.job_func, job_func) except AttributeError: # job_funcs already wrapped by functools.partial won’t have # name, module or doc and the update_wrapper() # call will fail. pass self._schedule_next_run() return self @property def should_run(self): return datetime.datetime.now() >= self.next_run def run(self): logger.info(‘Running job %s’, self) ret = self.job_func() self.last_run = datetime.datetime.now() self._schedule_next_run() return ret def _schedule_next_run(self): assert self.unit in (‘seconds’, ‘minutes’, ‘hours’, ‘days’, ‘weeks’) self.period = datetime.timedelta(**{self.unit: self.interval}) self.next_run = datetime.datetime.now() + self.period # 太長,後面講一下邏輯或者看原始碼. 這個方法也不是很多,有很多邏輯是一樣的. 簡單介紹一下:

首先看一下幾個引數的含義:

interval:間隔多久,每interval秒或分等. job_func:job執行函式 unit : 間隔單元,比如minutes, hours at_time :job具體執行時間點,比如10:30等 last_run:job上一次執行時間 next_run :job下一次即將執行時間 period: 距離下次執行間隔時間 start_day: 周的特殊天,也就是monday等的含義 再來看一下各個方法:

lt: 比較哪個job最先即將執行, Scheduler中next_run方法裡使用min會用到, 有時合適的使用python這些特殊方法可以簡化程式碼,看起來更pythonic. second、seconds的區別就是second時預設interval ==1,即schedule.every().second和schedule.every(1).seconds是等價的,作用就是設定unit為seconds. minute和minutes、hour和hours、day和days、week和weeks也類似. monday: 設定start_day 為monday, unit 為weeks,interval為1. 含義就是每週一執行job. 類似 tuesday、wednesday、thursday、friday、saturday、sunday一樣. at: 表示某天的某個時間點,所以不適合minutes、weeks且start_day 為空(即單純的周)這些unit. 對於unit為hours時,time_str中小時部分為0. do: 設定job對應的函式以及引數, 這裡使用functools.update_wrapper去更新函式名等資訊.主要是functools.partial返回的函式和原函式名稱不一樣.具體可以看看官網文件. 然後呼叫_schedule_next_run去計算job下一次執行時間. should_run: 判斷job是否可以運行了.依據是當前時間點大於等於job的next_run _schedule_next_run: 這是整個job的定時的邏輯部分是計算job下次執行的時間點的.描述一下流程: 計算下一次執行時間: 這裡根據unit和interval計算出下一次執行時間. 舉個例子,比如schedule.every().hour.do(job, message=’things’)下一次執行時間就是當前時間加上一小時的間隔. 但是當start_day不為空時,即表示某個星期. 這時period就不能直接加在當前時間了. 看程式碼: 其中days_ahead表示job表示的星期幾與當表示的星期幾差幾天. 比如今天是星期三,job表示的是星期五,那麼days_ahead就為2,最終self.next_run效果就是在now基礎上加了2天. 當at_time不為空時, 需要更新執行的時間點,具體就是計算時、分、秒然後呼叫replace進行更新. 這裡對unit為days或hours進行特殊處理: 當已經過了執行時間的話的話,unit為days的話減去一天, unit為hours的話減去一小時. 這樣可以保證任務今天執行. 後面還有一句程式碼: 這句的含義時對於像monday這些定時任務特殊情況的處理. 舉個例子, 今天是星期四12:00,建立的job是星期四13:00, days_ahead <=7 這個條件滿足,最終next_run實際加了7,這樣的話這個任務就不會運行了. 所以這一步實際就是把7減掉. 看上去有點繞, 實際只要把days_ahead <= 0改為days_ahead < 0這句程式碼就不用了.

學習總結

通過學習schedule,可以看到實現一個基礎的任務定時排程就是根據job的配置計算執行時間和執行job. 程式碼裡我認為比較好的地方有:

lt的使用,這樣min函式直接應用在job上. @property是程式碼更簡潔 返回self支援連綴操作,像schedule.every(10).minutes.do(job)看起來很直接. 時間部分完全是根據datetime實現的,有很多很好用的函式

這裡寫圖片描述