1. 程式人生 > >Python任務排程模組 – APScheduler

Python任務排程模組 – APScheduler

APScheduler是一個Python定時任務框架,使用起來十分方便。提供了基於日期、固定時間間隔以及crontab型別的任務,並且可以持久化任務、並以daemon方式執行應用。目前最新版本為3.0.x。在APScheduler中有四個元件:

觸發器(trigger)包含排程邏輯,每一個作業有它自己的觸發器,用於決定接下來哪一個作業會執行。除了他們自己初始配置意外,觸發器完全是無狀態的。

作業儲存(job store)儲存被排程的作業,預設的作業儲存是簡單地把作業儲存在記憶體中,其他的作業儲存是將作業儲存在資料庫中。一個作業的資料講在儲存在持久化作業儲存時被序列化,並在載入時被反序列化。排程器不能分享同一個作業儲存。

執行器(executor)處理作業的執行,他們通常通過在作業中提交制定的可呼叫物件到一個執行緒或者進城池來進行。當作業完成時,執行器將會通知排程器。

排程器(scheduler)是其他的組成部分。你通常在應用只有一個排程器,應用的開發者通常不會直接處理作業儲存、排程器和觸發器,相反,排程器提供了處理這些的合適的介面。配置作業儲存和執行器可以在排程器中完成,例如新增、修改和移除作業。

你需要選擇合適的排程器,這取決於你的應用環境和你使用APScheduler的目的。通常最常用的兩個:

– BlockingScheduler : 當排程器是你應用中唯一要執行的東西時使用。

– BackgroundScheduler

 : 當你不執行任何其他框架時使用,並希望排程器在你應用的後臺執行。

安裝APScheduler非常簡單:

pip install apscheduler 

選擇合適的作業儲存,你需要決定是否需要作業持久化。如果你總是在應用開始時重建job,你可以直接使用預設的作業儲存(MemoryJobStore).但是如果你需要將你的作業持久化,以避免應用崩潰和排程器重啟時,你可以根據你的應用環境來選擇具體的作業儲存。例如:使用Mongo或者SQLAlchemyJobStore (用於支援大多數RDBMS)

然而,排程器的選擇通常是為你如果你使用上面的框架之一。然而,預設的ThreadPoolExecutor 通常用於大多數用途。如果你的工作負載中有較大的CPU密集型操作,你可以考慮用ProcessPoolExecutor來使用更多的CPU核。你也可以在同一時間使用兩者,將程序池排程器作為第二執行器。

配置排程器

APScheduler提供了許多不同的方式來配置排程器,你可以使用一個配置字典或者作為引數關鍵字的方式傳入。你也可以先建立排程器,再配置和新增作業,這樣你可以在不同的環境中得到更大的靈活性。下面是一個簡單使用BlockingScheduler,並使用預設記憶體儲存和預設執行器。(預設選項分別是MemoryJobStore和ThreadPoolExecutor,其中執行緒池的最大執行緒數為10)。配置完成後使用start()方法來啟動。

from apscheduler.schedulers.blocking import BlockingScheduler
def my_job():
    print 'hello world'

sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=5)
sched.start()

在執行程式5秒後,將會輸出第一個Hello world。下面進行一個更復雜的配置,使用兩個作業儲存和兩個排程器。在這個配置中,作業將使用mongo作業儲存,資訊寫入到MongoDB中。

from pymongo import MongoClient
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor


def my_job():
    print 'hello world'
host = '127.0.0.1'
port = 27017
client = MongoClient(host, port)

jobstores = {
    'mongo': MongoDBJobStore(collection='job', database='test', client=client),
    'default': MemoryJobStore()
}
executors = {
    'default': ThreadPoolExecutor(10),
    'processpool': ProcessPoolExecutor(3)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}
scheduler = BlockingScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
scheduler.add_job(my_job, 'interval', seconds=5)

try:
    scheduler.start()
except SystemExit:
    client.close()

查詢MongoDB可以看到作業的執行情況如下:

{ 
  "_id" : "55ca54ee4bb744f8a5ab08cc4319bc24",
  "next_run_time" : 1434017278.797,
  "job_state" : new BinData(0, "gAJ9cQEoVQRhcmdzcQIpVQhleGVjdXRvcnEDVQdkZWZhdWx0cQRVDW1heF9pbnN0YW5jZXNxBUsDVQRmdW5jcQZVD19fbWFpbl9fOm15X2pvYnEHVQJpZHEIVSA1NWNhNTRlZTRiYjc0NGY4YTVhYjA4Y2M0MzE5YmMyNHEJVQ1uZXh0X3J1bl90aW1lcQpjZGF0ZXRpbWUKZGF0ZXRpbWUKcQtVCgffBgsSBzoMKUhjcHl0egpfcApxDChVDUFzaWEvU2hhbmdoYWlxDU2AcEsAVQNDU1RxDnRScQ+GUnEQVQRuYW1lcRFVBm15X2pvYnESVRJtaXNmaXJlX2dyYWNlX3RpbWVxE0sBVQd0cmlnZ2VycRRjYXBzY2hlZHVsZXIudHJpZ2dlcnMuaW50ZXJ2YWwKSW50ZXJ2YWxUcmlnZ2VyCnEVKYFxFn1xF1UPaW50ZXJ2YWxfbGVuZ3RocRhHQBQAAAAAAABzfXEZKFUIdGltZXpvbmVxGmgMKGgNTehxSwBVA0xNVHEbdFJxHFUIaW50ZXJ2YWxxHWNkYXRldGltZQp0aW1lZGVsdGEKcR5LAEsFSwCHUnEfVQpzdGFydF9kYXRlcSBoC1UKB98GCxIHIQwpSGgPhlJxIVUIZW5kX2RhdGVxIk51hmJVCGNvYWxlc2NlcSOJVQd2ZXJzaW9ucSRLAVUGa3dhcmdzcSV9cSZ1Lg==")
}

操作作業

1. 新增作業

上面是通過add_job()來新增作業,另外還有一種方式是通過scheduled_job()修飾器來修飾函式。

@sched.scheduled_job('cron', id='my_job_id', day='last sun')
 
def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")

2. 移除作業

job = scheduler.add_job(myfunc, 'interval', minutes=2)
job.remove()
Same, using an explicit job ID:

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')
scheduler.remove_job('my_job_id')

3. 暫停和恢復作業

暫停作業:

– apscheduler.job.Job.pause() 
– apscheduler.schedulers.base.BaseScheduler.pause_job() 

恢復作業:

– apscheduler.job.Job.resume() 

– apscheduler.schedulers.base.BaseScheduler.resume_job()

4. 獲得job列表

獲得排程作業的列表,可以使用 get_jobs() 來完成,它會返回所有的job例項。或者使用 print_jobs() 來輸出所有格式化的作業列表。

5. 修改作業

def some_decorated_task():
    print("I am printed at 00:00:00 on the last Sunday of every month!")</pre>

6. 關閉排程器

預設情況下排程器會等待所有正在執行的作業完成後,關閉所有的排程器和作業儲存。如果你不想等待,可以將wait選項設定為False。

scheduler.shutdown()
scheduler.shutdown(wait=False)

作業執行的控制

add_job的第二個引數是trigger,它管理著作業的排程方式。它可以為date, interval或者cron。對於不同的trigger,對應的引數也相同。

(1). cron定時排程

year(int|str) – 4-digit year

month(int|str) – month (1-12)

day(int|str) – day of the (1-31)

week(int|str) – ISO week (1-53)

day_of_week(int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)

hour(int|str) – hour (0-23)

minute(int|str) – minute (0-59)

second(int|str) – second (0-59)

start_date(datetime|str) – earliest possible date/time to trigger on (inclusive)

end_date(datetime|str) – latest possible date/time to trigger on (inclusive)

timezone(datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone)

和Linux的Crontab一樣,它的值格式為:

Expression Field Description
* any Fire on every value
*/a any Fire every values, starting from the minimum
a-b any Fire on any value within the a-b range (a must be smaller than b)
a-b/c any Fire every values within the a-b range
xth y day Fire on the -th occurrence of weekday within the month
last x day Fire on the last occurrence of weekday within the month
last day Fire on the last day within the month
x,y,z any Fire on any matching expression; can combine any number of any of the above expressions

幾個例子如下:

# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

(2). interval 間隔排程

它的引數如下:

weeks(int) – number of weeks to wait

days(int) – number of days to wait

hours(int) – number of hours to wait

minutes(int) – number of minutes to wait

seconds(int) – number of seconds to wait

start_date(datetime|str) – starting point for the interval calculation

end_date(datetime|str) – latest possible date/time to trigger on

timezone(datetime.tzinfo|str) – time zone to use for the date/time calculations

例子:

# Schedule job_function to be called every two hours
sched.add_job(job_function, 'interval', hours=2)

(3). date 定時排程

最基本的一種排程,作業只會執行一次。它的引數如下:

run_date(datetime|str) – the date/time to run the job at

timezone(datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already

例子:

# The job will be executed on November 6th, 2009
sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
# The job will be executed on November 6th, 2009 at 16:30:05
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])

^^