1. 程式人生 > >Python定時任務排程——APScheduler

Python定時任務排程——APScheduler

簡介

APScheduler 框架可以讓使用者定時執行或者週期性執行 Python 任務。既可以新增任務也可以刪除任務,還可以將任務儲存在資料庫中。當 APScheduler 重啟之後,還會繼續執行之前設定的任務。
APScheduler 是跨平臺的,注意 APScheduler 既不是守護程序也不是服務,更不是命令列程式。APScheduler 是程序內的排程器,也就是說它的實現原理是在程序內產生內建的阻塞來建立定時服務,以便在預定的時間內執行某個任務。

安裝

可以使用 pip 進行安裝:

pip install apscheduler

基礎概念

APScheduler 由四個元件構成:
- 觸發器(triggers)
- 任務倉庫(job stores)
- 執行器(executors)
- 排程器(schedulers)

觸發器包含了所有定時任務邏輯,每個任務都有一個對應的觸發器,觸發器決定任何的何時執行,初始配置情況下,觸發器是無狀態的。

任務倉庫儲存要執行的任務,其中一個預設的任務倉庫將任務儲存在記憶體中,而另外幾個任務倉庫將任務儲存在資料庫中。在將任務儲存到任務倉庫前,會對任務執行序列化操作,當重新讀取任務時,再執行反序列化操作。除了預設的任務倉庫,其他任務倉庫都不會在記憶體中儲存任務,而是作為任務儲存、載入、更新以及搜尋的一箇中間件。任務倉庫在定時器之間不能共享。

執行器用來執行定時任務,它只是將要執行的任務放在新的執行緒或者執行緒池中執行。執行完畢之後,再通知定時器。

排程器將其它幾個元件聯絡在一起,一般在應用中只有一個排程器,程式開發者不會直接操作觸發器、任務倉庫或執行器,相反,排程器提供了這個介面。任務倉庫以及執行器的配置都是通過排程器來實現的。

選擇合適的排程器、任務倉庫、執行器和觸發器

APScheduler 支援的儲存方式有:
- MemoryStore
- SQLAlchemyJobStore,預設使用 SQLite。
- MongoDBJobStore
- ZooKeeperJobStore
- RedisJobStore
- RethinkDBJobStore

如果是非持久任務,使用預設配置的MemoryStore就可以了,如果是永續性任務,那麼久需要根據程式設計環境進行選擇了。

APScheduler 中一些常用排程器:
- BlockingScheduler:適合於只在程序中執行單個任務的情況
- BackgroundScheduler: 適合於要求任何在程式後臺執行的情況
- AsyncIOScheduler:適合於使用asyncio框架的情況
- GeventScheduler: 適合於使用gevent框架的情況
- TornadoScheduler: 適合於使用Tornado框架的應用
- TwistedScheduler: 適合使用Twisted框架的應用
- QtScheduler: 適合使用QT的情況

大多數情況下,執行器選擇ThreadPoolExecutor就可以了,但是如果涉及到比較耗CPU的任務,就可以選擇ProcessPoolExecutor,以充分利用多核CPU。,當然也可以同時使用兩個執行器。

當排程一個任務時,需要選擇一個觸發器。這個觸發器決定何時執行任務。APScheduler 支援的觸發器有三種:
- date:任務僅執行一次
- interval:任務迴圈執行
- cron:任務定時執行

trigger 對任務的控制

add_job的第二個引數是trigger,它管理著作業的排程方式。它可以為date, interval或者cron。對於不同的trigger,對應的引數也相同。

cron 定時排程

引數:

屬性 型別 舉例
year int、str 4-digit year
month int、str month (1-12)
day int、str day of the (1-31)
week int、str ISO week (1-53)
day_of_week int、str number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
hour int、str hour (0-23)
minute int、str minute (0-59)
second int、str second (0-59)
start_date datetime、str earliest possible date/time to trigger on (inclusive)
end_date datetime、str latest possible date/time to trigger on (inclusive)
timezone datetime.tzinfo、str time zone to use for the date/time calculations (defaults to scheduler timezone)

和Linux的Crontab一樣,它的值格式為:

Expression Field Description
* any Fire on every value
*/a any Fire every a values, starting from the minimum
a-b any Fire on any value within the a-b range (a must be smaller than b)
a-b/c any Fire every c values within the a-b range
xth y day Fire on the x -th occurrence of weekday y within the month
last x day Fire on the last occurrence of weekday x within the month
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions

例如:

# Schedules job_function to be run on the third Friday
# of June, July, August, November and December at 00:00, 01:00, 02:00 and 03:00
sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# Runs from Monday to Friday at 5:30 (am) until 2014-05-30 00:00:00
sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')

interval 間隔排程

引數:

屬性 型別 舉例
weeks int number of weeks to wait
days int number of days to wait
hours int number of hours to wait
minutes int number of minutes to wait
seconds int number of seconds to wait
start_date datetime、str starting point for the interval calculation
end_date datetime、str latest possible date/time to trigger on
timezone datetime.tzinfo、str time zone to use for the date/time calculations

例如:

# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)

date 定時排程

最基本的一種排程,作業只會執行一次。它的引數如下:

屬性 型別 舉例
run_date datetime、str the date/time to run the job at
timezone datetime.tzinfo、str time zone for run_date if it doesn’t have one already

例如:

# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text

使用MemoryStore

使用 MemoryStore、BlockingScheduler 觀察 corn、interval、date 的不同。

from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler


def alarm(type):
    print '[%s Alarm] This alarm was scheduled at %s.' % (type, datetime.now().strftime('%Y-%m-%d %H:%M:%S'))


# 定時執行
def corn_trigger():
    global SCHEDULER
    SCHEDULER.add_job(func=alarm, args=['cron'], trigger='cron', second='*/5', id='corn_job')


# 迴圈執行
def interval_trigger():
    global SCHEDULER
    SCHEDULER.add_job(func=alarm, args=['interval'], trigger='interval', seconds=5, id='interval_job')


# 一次執行
def date_trigger():
    global SCHEDULER
    SCHEDULER.add_job(func=alarm, args=['date'], trigger='date', run_date=datetime.now(), id='date_job')


SCHEDULER = BlockingScheduler()
if __name__ == '__main__':
    corn_trigger()
    interval_trigger()
    date_trigger()

    try:
        SCHEDULER.start()
    except (KeyboardInterrupt, SystemExit):
        SCHEDULER.shutdown()

執行結果:

[date Alarm] This alarm was scheduled at 2017-07-22 11:12:42.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:45.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:47.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:50.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:52.
[cron Alarm] This alarm was scheduled at 2017-07-22 11:12:55.
[interval Alarm] This alarm was scheduled at 2017-07-22 11:12:57.

任務持久化(SQLAlchemyJobStore)

APScheduler 可以把任務持久化,如持久化到 MySQL 中。當 APScheduler 把任務持久好到 MySQL 時,會預設自動建立一張 apscheduler_jobs

欄位名 說明
id 定義的 job_id
next_run_time 下次執行時間
job_state job 的資訊

定時從 MySQL 中查詢資料

import MySQLdb
import time
import logging

from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s',
                    datefmt='%Y-%m-%d %H:%M:%S',
                    filename='log.txt',
                    filemode='a')


def query(host, port, user, password, db):
    conn = MySQLdb.connect(host=host, port=port, user=user, passwd=password, db=db)
    cr = conn.cursor()
    cr.execute('select * from score')
    conn.commit()
    res = cr.fetchall()
    print res
    with open(r'rs.txt', 'a') as f:
        f.write(str(res) + '\n')


if __name__ == '__main__':
    url = 'mysql://root:[email protected]:3306/work'
    executors = {
        'default': ThreadPoolExecutor(20),
        'processpool': ProcessPoolExecutor(5)
    }
    job_defaults = {
        'coalesce': False,
        'max_instances': 3
    }
    scheduler = BackgroundScheduler(executors=executors, job_defaults=job_defaults)
    scheduler.add_jobstore('sqlalchemy', url=url)

    start = datetime.strptime('2017-07-22 11:32:00', '%Y-%m-%d %H:%M:%S')
    end = datetime.strptime('2017-07-22 11:34:00', '%Y-%m-%d %H:%M:%S')
    scheduler.add_job(func=query, args=('127.0.0.1', 3306, 'root', '123456', 'test'),
                      trigger='cron', start_date=start, end_date=end, second='*/5', id='query')

    try:
        scheduler.start()
        while True:
            time.sleep(2)
    except (KeyboardInterrupt, SystemExit):
        scheduler.shutdown()

參考資料