1. 程式人生 > >python中的輕量級定時任務調度庫:schedule

python中的輕量級定時任務調度庫:schedule

min nbsp 就會 threading 簡單 多線程 保持 一是 設置

提到定時任務調度的時候,相信很多人會想到celery,要麽就寫個腳本塞到crontab中。不過,一個小的定時腳本,要用celery的話太“重”了。所以,我找到了一個輕量級的定時任務調度的庫:schedule。 任務調度,輕量級的定時任務調度的庫:schedule。 庫的安裝還是最簡單的pip install schedule,使用起來也是很容易理解的。
 1 import schedule
 2 import time
 3  
 4 def job():
 5     print("I‘m working...")
 6  
 7 schedule.every(10).minutes.do(job)
8 schedule.every().hour.do(job) 9 schedule.every().day.at("10:30").do(job) 10 schedule.every(5).to(10).days.do(job) 11 schedule.every().monday.do(job) 12 schedule.every().wednesday.at("13:15").do(job) 13 14 while True: 15 schedule.run_pending() 16 time.sleep(1)
這是在pypi上面給出的示例。這個栗子簡單到我不需要怎麽解釋。而且,通過這個栗子,我們也可以知道,schedule其實就只是個定時器。在while True死循環中,schedule.run_pending()是保持schedule一直運行,去查詢上面那一堆的任務,在任務中,就可以設置不同的時間去運行。跟crontab是類似的。 但是,如果是多個任務運行的話,實際上它們是按照順序從上往下挨個執行的。如果上面的任務比較復雜,會影響到下面任務的運行時間。比如我們這樣:
 1
import datetime 2 import schedule 3 import time 4 5 def job1(): 6 print("I‘m working for job1") 7 time.sleep(2) 8 print("job1:", datetime.datetime.now()) 9 10 def job2(): 11 print("I‘m working for job2") 12 time.sleep(2) 13 print("job2:", datetime.datetime.now())
14 15 def run(): 16 schedule.every(10).seconds.do(job1) 17 schedule.every(10).seconds.do(job2) 18 19 while True: 20 schedule.run_pending() 21 time.sleep(1)
接下來你就會發現,兩個定時任務並不是10秒運行一次,而是12秒。是的。由於job1和job2本身的執行時間,導致任務延遲了。 其實解決方法也很簡單:用多線程/多進程。不要幼稚地問我“python中的多線程不是沒有用嗎?”這是兩碼事。開了一條線程,就把job獨立出去運行了,不會占主進程的cpu時間,schedule並沒有花掉執行一個任務的時間,它的開銷只是開啟一條線程的時間,所以,下一次執行就變成了10秒後而不是12秒後。
 1 import datetime
 2 import schedule
 3 import threading
 4 import time
 5  
 6 def job1():
 7     print("I‘m working for job1")
 8     time.sleep(2)
 9     print("job1:", datetime.datetime.now())
10  
11 def job2():
12     print("I‘m working for job2")
13     time.sleep(2)
14     print("job2:", datetime.datetime.now())
15  
16 def job1_task():
17     threading.Thread(target=job1).start()
18  
19 def job2_task():
20     threading.Thread(target=job2).start()
21  
22 def run():
23     schedule.every(10).seconds.do(job1_task)
24     schedule.every(10).seconds.do(job2_task)
25  
26     while True:
27         schedule.run_pending()
28         time.sleep(1)
就是這麽簡單。 唯一要註意的是,這裏面job不應當是死循環類型的,也就是說,這個線程應該有一個執行完畢的出口。一是因為線程萬一僵死,會是非常棘手的問題;二是下一次定時任務還會開啟一個新的線程,執行次數多了就會演變成災難。如果schedule的時間間隔設置得比job執行的時間短,一樣會線程堆積形成災難,所以,還是需要註意一下的。 schedule這個庫使用起來比較簡單,內容不是很多。我這裏介紹的大概用法基本上夠用了,還想了解其他特性的話,可以參考官網:https://schedule.readthedocs.io/en/stable/ 例子:
 1 import datetime
 2 import schedule
 3 import time
 4  
 5 def job1():
 6     print("I‘m working for job1")
 7     time.sleep(2)
 8     print("job1:", datetime.datetime.now())
 9  
10 def job2():
11     print("I‘m working for job2")
12     time.sleep(2)
13     print("job2:", datetime.datetime.now())
14  
15 def run():
16     schedule.every().day.at("22:19").do(job1)
17     schedule.every().day.at("22:19").do(job2)
18     #schedule.every(3).seconds.do(job1)
19     #schedule.every(3).seconds.do(job2)
20  
21     while True:
22         schedule.run_pending()
23         time.sleep(1)
24 
25 
26 if __name__ == "__main__":
27 
28     run()

多線程例子
 1 import datetime
 2 import schedule
 3 import threading
 4 import time
 5  
 6 def job1():
 7     print("I‘m working for job1")
 8     time.sleep(2)
 9     print("job1:", datetime.datetime.now())
10  
11 def job2():
12     print("I‘m working for job2")
13     time.sleep(2)
14     print("job2:", datetime.datetime.now())
15  
16 def job1_task():
17     threading.Thread(target=job1).start()
18  
19 def job2_task():
20     threading.Thread(target=job2).start()
21 
22 
23 def run():
24     schedule.every().day.at("22:23").do(job1_task)
25     schedule.every().day.at("22:23").do(job2_task)
26     #schedule.every(3).seconds.do(job1_task)
27     #schedule.every(3).seconds.do(job2_task)
28  
29     while True:
30         schedule.run_pending()
31         time.sleep(1)
32 
33 
34 if __name__ == "__main__":
35 
36     run()
37  

python中的輕量級定時任務調度庫:schedule