Celery是一個用Python開發的非同步的分散式任務排程模組
Celery有以下優點:
- 簡單:一但熟悉了celery的工作流程後,配置和使用還是比較簡單的
- 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
- 快速:一個單程序的celery每分鐘可處理上百萬個任務
- 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製
應用:
建立tasks.py
from celery import Celery app = Celery('tasks', broker='amqp://guest@localhost//') @app.task def add(x, y): return x + y
Celery 的第一個引數是當前模組的名稱,這個引數是必須的,這樣的話名稱可以自動生成。第二個引數是中間人關鍵字引數,指定你所使用的訊息中間人的 URL,此處使用了 RabbitMQ,也是預設的選項
呼叫任務:
你可以用 delay()方法來呼叫任務
這是 apply_async() 方法的快捷方式,該方法允許你更好地控制任務執行
from tasks import add add.delay(4, 4)
儲存結果:
如果你想要保持追蹤任務的狀態,Celery 需要在某個地方儲存或傳送這些狀態。可以從內建的幾個結果後端選擇:SQLAlchemy/Django ORM、 Memcached 、 Redis 、 AMQP( RabbitMQ )或 MongoDB , 或者你可以自制。
下例中你將會使用 amqp 結果後端來發送狀態訊息。後端通過 Celery 的 backend 引數來指定。如果你選擇使用配置模組,則通過 CELERY_RESULT_BACKEND 選項來設定:
app = Celery('tasks', backend='amqp', broker='amqp://')
配置:
Celery,如同家用電器一般,並不需要太多的操作。它有一個輸入和一個輸出, 你必須把輸入連線到中間人上,如果想則把輸出連線到結果後端上。但如果你仔細觀察後蓋,有一個蓋子露出許多滑塊、轉盤和按鈕:這就是配置。
預設配置對大多數使用案例已經足夠好了,但有許多事情需要微調來讓 Celery 如你所願地工作。
配置可以直接在應用上設定,也可以使用一個獨立的配置模組。
例如你可以通過修改 CELERY_TASK_SERIALIZER 選項來配置序列化任務載荷的預設的序列化方式:
app.conf.CELERY_TASK_SERIALIZER = 'json'
如果你一次性設定多個選項,你可以使用update:
app.conf.update( CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], # Ignore other content CELERY_RESULT_SERIALIZER='json', CELERY_TIMEZONE='Europe/Oslo', CELERY_ENABLE_UTC=True, )
對於大型專案,採用獨立配置模組更為有效,事實上你會為硬編碼週期任務間隔和任務路由選項感到沮喪,因為中心化儲存配置更合適。尤其是對於庫而言,這使得使用者控制任務行為成為可能,你也可以想象系統管理員在遇到系統故障時對配置做出簡單修改。
你可以呼叫 config_from_object() 來讓 Celery 例項載入配置模組:
app.config_from_object('celeryconfig')
配置檔案:celeryconfig.py
from kombu import Queue, Exchange BROKER_URL = 'redis://127.0.0.1:6379/7' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8' CELERY_IMPORTS = ( 'celery_app.task' ) CELERY_QUEUES = ( Queue('default', exchange=Exchange('default'), routing_key='default'), Queue('app_task1', exchange=Exchange('app_task1'), routing_key='app_task1'), Queue('app_task2', exchange=Exchange('app_task2'), routing_key='app_task2'), ) CELERY_ROUTES = { 'celery_app.task.task1': {'queue': 'app_task1', 'routing_key': 'app_task1'}, 'celery_app.task.task2': {'queue': 'app_task2', 'routing_key': 'app_task2'} }
定義routes用來決定不同的任務去哪一個queue
在啟動worker時指定該worker執行哪一個queue中的任務
celery -A celery_app worker -l info -Q app_task1 -P eventlet celery -A celery_app worker -l info -Q app_task2 -P eventlet
RabbitMQ 是預設的中間人,所以除了需要你要使用的中間人例項的 URL 位置, 它並不需要任何額外的依賴或起始配置:
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪
app.conf.result_backend = 'redis://localhost:6379/0'
完整使用Celery流程
開始使用Celery啦
安裝celery模組
pip install celery
建立一個celery application 用來定義你的任務列表
建立一個任務檔案就叫tasks.py吧
from celery import Celery app = Celery('tasks', broker='redis://localhost', backend='redis://localhost') @app.task def add(x,y): print("running...",x,y) return x+y
啟動Celery Worker來開始監聽並執行任務
celery -A tasks worker --loglevel=info
呼叫任務
再開啟一個終端, 進行命令列模式,呼叫任務
from tasks import add add.delay(4, 4)
看你的worker終端會顯示收到 一個任務,此時你想看任務結果的話,需要在呼叫 任務時 賦值個變數
result = add.delay(4, 4)
結果
result.ready() # False or True
在專案中如何使用celery
可以把celery配置成一個應用
目錄格式如下
proj/__init__.py /celery.py /tasks.py
proj/celery.py內容
from __future__ import absolute_import, unicode_literals from celery import Celery app = Celery('proj', broker='amqp://', backend='amqp://', include=['proj.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
proj/tasks.py中的內容
from __future__ import absolute_import, unicode_literals from .celery import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
啟動worker
celery -A proj worker -l info
Celery 定時任務
celery支援定時任務,設定好任務的執行時間,celery就會定時自動幫你執行, 這個定時任務模組叫celery beat
寫一個指令碼 叫periodic_task.py
from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 a.m. sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg)
add_periodic_task 會新增一條定時任務
上面是通過呼叫函式新增定時任務,也可以像寫配置檔案 一樣的形式新增, 下面是每30s執行的任務
app.conf.beat_schedule = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': 30.0, 'args': (16, 16) }, } app.conf.timezone = 'UTC'
任務新增好了,需要讓celery單獨啟動一個程序來定時發起這些任務, 注意, 這裡是發起任務,不是執行,這個程序只會不斷的去檢查你的任務計劃, 每發現有任務需要執行了,就發起一個任務呼叫訊息,交給celery worker去執行
啟動任務排程器 celery beat
celery -A periodic_task beat
此時還差一步,就是還需要啟動一個worker,負責執行celery beat發起的任務
啟動celery worker來執行任務
celery -A periodic_task worker
此時觀察worker的輸出,是不是每隔一小會,就會執行一次定時任務呢!
celery -A periodic_task beat -s /home/celery/var/run/celerybeat-schedule
更復雜的定時配置
from celery.schedules import crontab app.conf.beat_schedule = { # Executes every Monday morning at 7:30 a.m. 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }
最佳實踐之與django結合
Django專案目錄:
- proj/ - proj/__init__.py - proj/settings.py - proj/urls.py - manage.py
建立檔案proj/proj/celery.py
from __future__ import absolute_import, unicode_literals import os from celery import Celery # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings') app = Celery('proj') # Using a string here means the worker don't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django app configs. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print('Request: {0!r}'.format(self.request))
配置proj/proj/__init__.py:
from __future__ import absolute_import, unicode_literals # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app __all__ = ['celery_app']
為celery設定環境變數
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
建立Celery應用
app = Celery('proj')
配置settings
app.config_from_object('django.conf:settings', namespace='CELERY')
配置應用:
app.conf.update( # 配置broker, 這裡我們用redis作為broker BROKER_URL='redis://:[email protected]:6379/1', )
設定app自動載入任務:
app.autodiscover_tasks(settings.INSTALLED_APPS) # 從已經安裝的app中查詢任務
然後在具體的app裡的tasks.py裡寫你的任務
# Create your tasks here from __future__ import absolute_import, unicode_literals from celery import shared_task @shared_task def add(x, y): return x + y @shared_task def mul(x, y): return x * y @shared_task def xsum(numbers): return sum(numbers)
在你的django views裡呼叫celery task
from django.shortcuts import render,HttpResponse # Create your views here. from bernard import tasks def task_test(request): res = tasks.add.delay(228,24) print("start running task") print("async task res",res.get() ) return HttpResponse('res %s'%res.get())
注意,經測試,每新增或修改一個任務,celery beat都需要重啟一次,要不然新的配置不會被celery beat程序讀到