Python定時任務排程——APScheduler
簡介
APScheduler 框架可以讓使用者定時執行或者週期性執行 Python 任務。既可以新增任務也可以刪除任務,還可以將任務儲存在資料庫中。當 APScheduler 重啟之後,還會繼續執行之前設定的任務。
APScheduler 是跨平臺的,注意 APScheduler 既不是守護程序也不是服務,更不是命令列程式。APScheduler 是程序內的排程器,也就是說它的實現原理是在程序內產生內建的阻塞來建立定時服務,以便在預定的時間內執行某個任務。
安裝
可以使用 pip 進行安裝:
pip install apscheduler
基礎概念
APScheduler 由四個元件構成:
- 觸發器(triggers)
- 任務倉庫(job stores)
- 執行器(executors)
- 排程器(schedulers)
觸發器包含了所有定時任務邏輯,每個任務都有一個對應的觸發器,觸發器決定任何的何時執行,初始配置情況下,觸發器是無狀態的。
任務倉庫儲存要執行的任務,其中一個預設的任務倉庫將任務儲存在記憶體中,而另外幾個任務倉庫將任務儲存在資料庫中。在將任務儲存到任務倉庫前,會對任務執行序列化操作,當重新讀取任務時,再執行反序列化操作。除了預設的任務倉庫,其他任務倉庫都不會在記憶體中儲存任務,而是作為任務儲存、載入、更新以及搜尋的一箇中間件。任務倉庫在定時器之間不能共享。
執行器用來執行定時任務,它只是將要執行的任務放在新的執行緒或者執行緒池中執行。執行完畢之後,再通知定時器。
排程器將其它幾個元件聯絡在一起,一般在應用中只有一個排程器,程式開發者不會直接操作觸發器、任務倉庫或執行器,相反,排程器提供了這個介面。任務倉庫以及執行器的配置都是通過排程器來實現的。
選擇合適的排程器、任務倉庫、執行器和觸發器
APScheduler 支援的儲存方式有:
- MemoryStore
- SQLAlchemyJobStore,預設使用 SQLite。
- MongoDBJobStore
- ZooKeeperJobStore
- RedisJobStore
- RethinkDBJobStore
如果是非持久任務,使用預設配置的MemoryStore就可以了,如果是永續性任務,那麼久需要根據程式設計環境進行選擇了。
APScheduler 中一些常用排程器:
- BlockingScheduler:適合於只在程序中執行單個任務的情況
- BackgroundScheduler: 適合於要求任何在程式後臺執行的情況
- AsyncIOScheduler:適合於使用asyncio框架的情況
- GeventScheduler: 適合於使用gevent框架的情況
- TornadoScheduler: 適合於使用Tornado框架的應用
- TwistedScheduler: 適合使用Twisted框架的應用
- QtScheduler: 適合使用QT的情況
大多數情況下,執行器選擇ThreadPoolExecutor就可以了,但是如果涉及到比較耗CPU的任務,就可以選擇ProcessPoolExecutor,以充分利用多核CPU。,當然也可以同時使用兩個執行器。
當排程一個任務時,需要選擇一個觸發器。這個觸發器決定何時執行任務。APScheduler 支援的觸發器有三種:
- date:任務僅執行一次
- interval:任務迴圈執行
- cron:任務定時執行
trigger 對任務的控制
add_job的第二個引數是trigger,它管理著作業的排程方式。它可以為date, interval或者cron。對於不同的trigger,對應的引數也相同。
cron 定時排程
引數:
屬性 | 型別 | 舉例 |
---|---|---|
year | int、str | 4-digit year |
month | int、str | month (1-12) |
day | int、str | day of the (1-31) |
week | int、str | ISO week (1-53) |
day_of_week | int、str | number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) |
hour | int、str | hour (0-23) |
minute | int、str | minute (0-59) |
second | int、str | second (0-59) |
start_date | datetime、str | earliest possible date/time to trigger on (inclusive) |
end_date | datetime、str | latest possible date/time to trigger on (inclusive) |
timezone | datetime.tzinfo、str | time zone to use for the date/time calculations (defaults to scheduler timezone) |
和Linux的Crontab一樣,它的值格式為:
Expression | Field | Description |
---|---|---|
* | any | Fire on every value |
*/a | any | Fire every a values, starting from the minimum |
a-b | any | Fire on any value within the a-b range (a must be smaller than b) |
a-b/c | any | Fire every c values within the a-b range |
xth y | day | Fire on the x -th occurrence of weekday y within the month |
last x | day | Fire on the last occurrence of weekday x within the month |
last | day | Fire on the last day within the month |
x,y,z | any | Fire on any matching expression; can combine any number of any of the above expressions |
例如:
# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
interval 間隔排程
引數:
屬性 | 型別 | 舉例 |
---|---|---|
weeks | int | number of weeks to wait |
days | int | number of days to wait |
hours | int | number of hours to wait |
minutes | int | number of minutes to wait |
seconds | int | number of seconds to wait |
start_date | datetime、str | starting point for the interval calculation |
end_date | datetime、str | latest possible date/time to trigger on |
timezone | datetime.tzinfo、str | time zone to use for the date/time calculations |
例如:
# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)
date 定時排程
最基本的一種排程,作業只會執行一次。它的引數如下:
屬性 | 型別 | 舉例 |
---|---|---|
run_date | datetime、str | the date/time to run the job at |
timezone | datetime.tzinfo、str | time zone for run_date if it doesn’t have one already |
例如:
# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text
使用MemoryStore
使用 MemoryStore、BlockingScheduler 觀察 corn、interval、date 的不同。
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def alarm(type):
print '[%s Alarm] This alarm was scheduled at %s.' % (type, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
# 定時執行
def corn_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['cron'], trigger='cron', second='*/5', id='corn_job')
# 迴圈執行
def interval_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['interval'], trigger='interval', seconds=5, id='interval_job')
# 一次執行
def date_trigger():
global SCHEDULER
SCHEDULER.add_job(func=alarm, args=['date'], trigger='date', run_date=datetime.now(), id='date_job')
SCHEDULER = BlockingScheduler()
if __name__ == '__main__':
corn_trigger()
interval_trigger()
date_trigger()
try:
SCHEDULER.start()
except (KeyboardInterrupt, SystemExit):
SCHEDULER.shutdown()
執行結果:
[date Alarm] This alarm was scheduled at 2017-07-22 11:12:42.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:45.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:47.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:50.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:52.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:55.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:57.
任務持久化(SQLAlchemyJobStore)
APScheduler 可以把任務持久化,如持久化到 MySQL 中。當 APScheduler 把任務持久好到 MySQL 時,會預設自動建立一張 apscheduler_jobs:
欄位名 | 說明 |
---|---|
id | 定義的 job_id |
next_run_time | 下次執行時間 |
job_state | job 的資訊 |
定時從 MySQL 中查詢資料
import MySQLdb
import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
logging.basicConfig(level=logging.INFO,
format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
filename='log.txt',
filemode='a')
def query(host, port, user, password, db):
conn = MySQLdb.connect(host=host, port=port, user=user, passwd=password, db=db)
cr = conn.cursor()
cr.execute('select * from score')
conn.commit()
res = cr.fetchall()
print res
with open(r'rs.txt', 'a') as f:
f.write(str(res) + '\n')
if __name__ == '__main__':
url = 'mysql://root:[email protected]:3306/work'
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults)
scheduler.add_jobstore('sqlalchemy', url=url)
start = datetime.strptime('2017-07-22 11:32:00', '%Y-%m-%d %H:%M:%S')
end = datetime.strptime('2017-07-22 11:34:00', '%Y-%m-%d %H:%M:%S')
scheduler.add_job(func=query, args=('127.0.0.1', 3306, 'root', '123456', 'test'),
trigger='cron', start_date=start, end_date=end, second='*/5', id='query')
try:
scheduler.start()
while True:
time.sleep(2)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()