Celery是一個用Python開發的非同步的分散式任務排程模組

Celery有以下優點:

  1. 簡單:一但熟悉了celery的工作流程後,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單程序的celery每分鐘可處理上百萬個任務
  4. 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製

應用:

建立tasks.py

from celery import Celery

app = Celery('tasks', broker='amqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Celery 的第一個引數是當前模組的名稱,這個引數是必須的,這樣的話名稱可以自動生成。第二個引數是中間人關鍵字引數,指定你所使用的訊息中間人的 URL,此處使用了 RabbitMQ,也是預設的選項  

呼叫任務:

你可以用 delay()方法來呼叫任務

這是 apply_async() 方法的快捷方式,該方法允許你更好地控制任務執行

from tasks import add
add.delay(4, 4)

儲存結果:

如果你想要保持追蹤任務的狀態,Celery 需要在某個地方儲存或傳送這些狀態。可以從內建的幾個結果後端選擇:SQLAlchemy/Django ORM、 Memcached 、 Redis 、 AMQP( RabbitMQ )或 MongoDB , 或者你可以自制。

下例中你將會使用 amqp 結果後端來發送狀態訊息。後端通過 Celery 的 backend 引數來指定。如果你選擇使用配置模組,則通過 CELERY_RESULT_BACKEND 選項來設定:

app = Celery('tasks', backend='amqp', broker='amqp://')

配置:

Celery,如同家用電器一般,並不需要太多的操作。它有一個輸入和一個輸出, 你必須把輸入連線到中間人上,如果想則把輸出連線到結果後端上。但如果你仔細觀察後蓋,有一個蓋子露出許多滑塊、轉盤和按鈕:這就是配置。

預設配置對大多數使用案例已經足夠好了,但有許多事情需要微調來讓 Celery 如你所願地工作。

配置可以直接在應用上設定,也可以使用一個獨立的配置模組。

例如你可以通過修改 CELERY_TASK_SERIALIZER 選項來配置序列化任務載荷的預設的序列化方式:

app.conf.CELERY_TASK_SERIALIZER = 'json'

如果你一次性設定多個選項,你可以使用update:

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,
)

對於大型專案,採用獨立配置模組更為有效,事實上你會為硬編碼週期任務間隔和任務路由選項感到沮喪,因為中心化儲存配置更合適。尤其是對於庫而言,這使得使用者控制任務行為成為可能,你也可以想象系統管理員在遇到系統故障時對配置做出簡單修改。

你可以呼叫 config_from_object() 來讓 Celery 例項載入配置模組:

app.config_from_object('celeryconfig')  

配置檔案:celeryconfig.py

from kombu import Queue, Exchange

BROKER_URL = 'redis://127.0.0.1:6379/7'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8'

CELERY_IMPORTS = (
    'celery_app.task'
)

CELERY_QUEUES = (
    Queue('default', exchange=Exchange('default'), routing_key='default'),
    Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'),
    Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'),
)

CELERY_ROUTES = {
    'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'},
    'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'}
}

定義routes用來決定不同的任務去哪一個queue

在啟動worker時指定該worker執行哪一個queue中的任務

celery -A celery_app worker -l info -Q app_task1 -P eventlet
celery -A celery_app worker -l info -Q app_task2 -P eventlet

RabbitMQ 是預設的中間人,所以除了需要你要使用的中間人例項的 URL 位置, 它並不需要任何額外的依賴或起始配置: 

BROKER_URL = 'amqp://guest:guest@localhost:5672//'

如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪

app.conf.result_backend = 'redis://localhost:6379/0'  

完整使用Celery流程

開始使用Celery啦  

安裝celery模組

pip install celery

建立一個celery application 用來定義你的任務列表

建立一個任務檔案就叫tasks.py吧

from celery import Celery

app = Celery('tasks',
             broker='redis://localhost',
             backend='redis://localhost')

@app.task
def add(x,y):
    print("running...",x,y)
    return x+y

啟動Celery Worker來開始監聽並執行任務

celery -A tasks worker --loglevel=info

呼叫任務

再開啟一個終端, 進行命令列模式,呼叫任務

from tasks import add
add.delay(4, 4)

看你的worker終端會顯示收到 一個任務,此時你想看任務結果的話,需要在呼叫 任務時 賦值個變數

result = add.delay(4, 4)  

結果

result.ready()  # False or True

在專案中如何使用celery

可以把celery配置成一個應用

目錄格式如下

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py內容

from __future__ import absolute_import, unicode_literals
from celery import Celery

app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

proj/tasks.py中的內容

from __future__ import absolute_import, unicode_literals
from .celery import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

啟動worker 

celery -A proj worker -l info

Celery 定時任務

celery支援定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模組叫celery beat

寫一個指令碼 叫periodic_task.py

from celery import Celery
from celery.schedules import crontab

app = Celery()

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test('hello') every 10 seconds.
    sender.add_periodic_task(10.0, test.s('hello'), name='add every 10')

    # Calls test('world') every 30 seconds
    sender.add_periodic_task(30.0, test.s('world'), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s('Happy Mondays!'),
    )

@app.task
def test(arg):
    print(arg)

add_periodic_task 會新增一條定時任務

上面是通過呼叫函式新增定時任務,也可以像寫配置檔案 一樣的形式新增, 下面是每30s執行的任務

app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,
        'args': (16, 16)
    },
}
app.conf.timezone = 'UTC'

任務新增好了,需要讓celery單獨啟動一個程序來定時發起這些任務, 注意, 這裡是發起任務,不是執行,這個程序只會不斷的去檢查你的任務計劃, 每發現有任務需要執行了,就發起一個任務呼叫訊息,交給celery worker去執行

啟動任務排程器 celery beat

celery -A periodic_task beat

此時還差一步,就是還需要啟動一個worker,負責執行celery beat發起的任務

啟動celery worker來執行任務

celery -A periodic_task worker

此時觀察worker的輸出,是不是每隔一小會,就會執行一次定時任務呢!

celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule

更復雜的定時配置

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    'add-every-monday-morning': {
        'task': 'tasks.add',
        'schedule': crontab(hour=7, minute=30, day_of_week=1),
        'args': (16, 16),
    },
}

最佳實踐之與django結合

Django專案目錄:

- proj/
  - proj/__init__.py
  - proj/settings.py
  - proj/urls.py
- manage.py  

建立檔案proj/proj/celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery('proj')

# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

配置proj/proj/__init__.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app

__all__ = ['celery_app']  

為celery設定環境變數

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')  

建立Celery應用

app = Celery('proj')  

配置settings

app.config_from_object('django.conf:settings', namespace='CELERY')  

配置應用:

app.conf.update(
    # 配置broker, 這裡我們用redis作為broker
    BROKER_URL='redis://:[email protected]:6379/1',
)

設定app自動載入任務:

app.autodiscover_tasks(settings.INSTALLED_APPS)  # 從已經安裝的app中查詢任務

然後在具體的app裡的tasks.py裡寫你的任務

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task

@shared_task
def add(x, y):
    return x + y

@shared_task
def mul(x, y):
    return x * y

@shared_task
def xsum(numbers):
    return sum(numbers)

在你的django views裡呼叫celery task

from django.shortcuts import render,HttpResponse

# Create your views here.

from  bernard import tasks

def task_test(request):

    res = tasks.add.delay(228,24)
    print("start running task")
    print("async task res",res.get() )

    return HttpResponse('res %s'%res.get())

注意,經測試,每新增或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat程序讀到