Python程式設計:定時任務apscheduler框架
阿新 • • 發佈:2018-12-13
APScheduler : Advanced Python Scheduler
官方文件:https://apscheduler.readthedocs.io/en/3.3.1/userguide.html
本文只做簡單總結,具體示例參考文章底部連結
體系結構
schedulers(排程器)
- BlockingScheduler : 主執行緒中執行,阻塞執行緒
- BackgroundScheduler : 後臺執行緒中執行,不會阻塞執行緒
- AsyncIOScheduler
- GeventScheduler
- TornadoScheduler
- TwistedScheduler
- QtScheduler
triggers(觸發器)
- date 一次性任務
- run_date (datetime 或 str) 作業的執行日期或時間
- timezone (datetime.tzinfo 或 str) 指定時區
- interval 迴圈任務
- weeks (int) 間隔幾周
- days (int) 間隔幾天
- hours (int) 間隔幾小時
- minutes (int) 間隔幾分鐘
- seconds (int) 間隔多少秒
- start_date (datetime 或 str) 開始日期
- end_date (datetime 或 str) 結束日期
- timezone (datetime.tzinfo 或str) 時區
- job stores(作業儲存器)
- cron 定時任務
- year (int 或 str) 年,4位數字
- month (int 或 str) 月 (範圍1-12)
- day (int 或 str ) 日 (範圍1-31
- week (int 或 str) 周 (範圍1-53)
- day_of_week (int 或 str) 周內第幾天或者星期幾 (範圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
- hour (int 或 str) 時 (範圍0-23)
- minute (int 或 str) 分 (範圍0-59)
- second (int 或 str) 秒 (範圍0-59)
- start_date (datetime 或 str) 最早開始日期(包含)
- end_date (datetime 或 str) 最晚結束時間(包含)
- timezone (datetime.tzinfo 或str) 指定時區
job stores(作業儲存器)
- 新增 job
- add_job()
- scheduled_job()
- 移除 job
- remove_job()
- job.remove()
- 修改 job
- modify_job()
- 獲取 job 列表
- get_jobs()
- 關閉 job
- scheduler.shutdown(wait=false)
executors(執行器)
- ProcessPoolExecutor
- ThreadPoolExecutor
簡單示例
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
def jobA():
print("{}: {}".format("job A", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
def jobC():
print("{}: {}".format("job C", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 新增作業, 方式一,可指定job_id
scheduler.add_job(jobA, 'interval', seconds=5)
scheduler.add_job(jobC, 'interval', minutes=2, id="job_c")
# 新增作業, 方式二
@scheduler.scheduled_job('interval', seconds=5)
def jobB():
print("{}: {}".format("job B", datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
# 移除作業
scheduler.remove_job(job_id="job_c")
scheduler.start()
參考