1. 程式人生 > >任務排程schedule和celery

任務排程schedule和celery

1. schedule

如果要實現一個小的定時任務指令碼,可以採用schedule這個輕量級定時任務排程庫。

import schedule
import time

def job(name):
    print(name,'do something...')

# 每十分鐘執行任務
schedule.every(10).minutes.do(job, name)
# 每小時執行任務
schedule.every().hour.do(job, name)
# 每天定點執行任務
schedule.every().day.at('10:30').do(job, name)
# 每週一執行任務
schedule.every().monday.do(job, name)
# 每週一定點任務
schedule.every().monday.at('10:30').do(job, name)

while True:
    # 保持任務執行
    schedule.run_pending()
    time.sleep(1)

值得注意的是,如果是多個任務執行,實際上他們是序列執行的。如果上面的任務耗時,會影響下面任務的執行。

對於這種情況,可以使用多執行緒/多程序來解決。

import datetime
import schedule
import threading
import time


def job1():
    print('this is job1')
    time.sleep(2)
    print('job1:',datetime.datetime.now())

def job2():
    print('this is job2')
    time.sleep(2)
    print('job2:',datetime.datetime.now())

def task1():
    threading.Thread(target=job1).start()

def task2():
    threading.Thread(target=job2).start()

def run():
    schedule.every(10).seconds.do(task1)
    schedule.every(10).seconds.do(task2)

    while True:
        schedule.run_pending()
        time.sleep(1)

schedule的使用比較簡單,就是一個死迴圈執行任務,因此定時任務job不應該是死迴圈型別的,這個任務執行緒需要有一個執行完畢的出口,否則會導致無限迴圈問題。另外一點是,定時任務的執行時間如果比schedule的間隔時間長,同樣會造成執行緒堆積問題,引發異常。

2. celery

celery是一個強大的分散式任務佇列,相比schedule更加完備而強大,同時也更加“重”。它可以讓任務的執行完全脫離主程式,甚至是分配到其他主機上執行。

通常使用celery來實現非同步任務和定時任務。其結構組成如下:

Celery_framework

可以看到,celery主要包含以下幾個模組:

  • 任務模組

包含非同步任務與定時任務。非同步任務通常在業務邏輯中被觸發,並被髮往任務佇列;定時任務由celery beat程序週期性地將任務發往任務佇列。

  • 訊息中介軟體broker

broker,即為任務排程佇列,接收任務生產者發來的任務訊息,將任務存入佇列。celery本身不提供任務佇列,推薦使用RabbitMQ和Redis。

  • 任務執行單元worker

worker實時監控訊息佇列,獲取排程的任務,並執行。

  • 任務結果儲存backend

backend用於儲存任務的執行結果,通訊息中介軟體一樣,儲存可使用RabbitMQ,Redis,MongoDB等。

非同步任務

使用celery實現非同步任務主要包括三個步驟:

  • 建立一個celery例項
  • 啟動celery worker
  • 程式呼叫非同步任務

以下做具體介紹:

  1. 1.建立celery例項

這裡使用Redis作為broker和backend。

建立檔案task.py

import time
from celery import Celery

# 指定訊息中介軟體用redis
broker = 'redis://127.0.0.1:6379'
# 指定儲存用redis
backend = 'redis://127.0.0.1:6379/0'
# 建立一個celery例項app,名稱為my_task
app = Celery('my_task', broker=broker, backend=backend)

# 建立一個celery任務add,被@app.task裝飾後,成為可被排程的任務
@app.task
def add(x, y):
    time.sleep(5) # 模擬耗時操作
    return x+y
  1. 2.啟動celery worker

在當前目錄下,使用如下方式啟動celery worker

celery worker -A task --loglevel=info

其中:

  • 引數-A指定了celery例項的位置,這裡是task.py中,celery會自動在該檔案中尋找celery例項物件,當然也可以直接指定為-A task.app;
  • 引數--loglevel指定了日誌級別,預設為warning,也可以使用-l info來表示;

  1. 3.呼叫任務

現在可以使用delay()或者apply_async()方法來呼叫任務。

在當前目錄下,開啟Python控制檯,輸入如下:

我們從task.py中匯入了add任務物件,然後使用delay()方法傳送任務到broker,worker程序監測到該任務後就執行,

這時發現報錯,原因是在Windows系統下使用celery4版本,解決方法是安裝一個eventlet包,然後啟動worker時加一個引數

celery worker -A task -l info -P eventlet

然後就可以正常使用了。

另外如果想獲取執行後的結果,可以這樣做:

上面是在互動環境中呼叫任務,實際上通常在程式用呼叫,建立client.py如下:

from task import add
import time

print('開始時間', time.ctime())
add.delay(2,5)
print('完成時間', time.ctime())

然後執行檔案,結果如下:

可以看出,雖然任務函式需要等待5秒才返回結果,但是由於是一個非同步任務,不會阻塞當前主程式,所以立刻執行了列印完成的語句。

相比直接把broker和backend配置寫入程式程式碼中,更好的方式是增加一個配置檔案,通常命名為`celeryconfig.py`。

__init__.py程式碼如下:

from celery import Celery

# 建立celery例項
app = Celery('demo')

# 通過celery例項載入配置模組
app.config_from_object('celery_app.celeryconfig')

celeryconfig.py程式碼如下:

# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

# 指定時區
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定匯入的任務模組
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2'
)

task1.py程式碼如下:

import time
from celery_app import app

@app.task
def add(x,y):
    time.sleep(2)
    return x+y

task2.py程式碼如下:

import time
from celery_app import app

@app.task
def multiply(x,y):
    time.sleep(2)
    return x*y

client.py程式碼如下:

import time
from celery_app import task1
from celery_app import task2


print('開始', time.ctime())
task1.add.delay(2,3) # delay是apply_async的快捷方式
task2.multiply.apply_async(args=[2,3])

print('完成', time.ctime())

現在可以啟動worker程序

然後執行python命令執行client.py檔案。

在worker視窗,我們可以看到任務的執行

定時任務

celery beat程序通過讀取配置檔案的內容,週期性地將定時任務發往任務佇列。

除了celerconfig.py內容增加了定時排程內容,其他模組和非同步任務相同。

celerconfig.py程式碼如下:

from datetime import timedelta
from celery.schedules import crontab

# 指定broker
BROKER_URL = 'redis://127.0.0.1:6379'
# 指定backend
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'

# 指定時區
CELERY_TIMEZONE = 'Asia/Shanghai'

# 指定匯入的任務模組
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2'
)

# 定時排程schedules
CELERYBEAT_SCHEDULE={
    'add-every-30-seconds':{
        'task':'celery_app.task1.add',
        'schedule':timedelta(seconds=30),  # 每30秒執行一次
        'args':(2,3)                        # 任務函式引數
    },

    'multiply-every-30-seconds':{
        'task':'celery_app.task2.multiply',
        'schedule':crontab(hour=14,minute=30), # 每天下午2點30分執行一次
        'args':(2,3)                            # 任務函式引數
    }

}

現在,啟動worker程序,然後啟動beat程序,定時任務將被髮送到broker

之後在worker視窗,可以看到task1每30秒執行一次,task2則定點執行一次。

為了簡化,也可以將啟動worker程序和beat程序放在一條命令中:

celery -B -A celery_app worker -l info -P eventlet