Django 中使用 Celery
起步
在 ofollow,noindex" target="_blank">《分散式任務佇列Celery使用說明》 中介紹了在 Python 中使用 Celery 來實驗非同步任務和定時任務功能。本文介紹如何在 Django 中使用 Celery。
安裝
pip install django-celery
這個命令使用的依賴是 Celery 3.x 的版本,所以會把我之前安裝的 4.x 解除安裝,不過對功能上並沒有什麼影響。我們也完全可以僅用Celery在django中使用,但使用 django-celery
模組能更好的管理 celery。
使用
可以把有關 Celery 的配置放到 settings.py
裡去,但我比較習慣單獨一個檔案來放,然後在 settings.py
引入進來:
# celery_config.py import djcelery import os os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1') djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/1' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2' # UTC CELERY_ENABLE_UTC = True CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_IMPORTS = ( 'app.tasks', ) # 有些情況可以防止死鎖 CELERY_FORCE_EXECV = True # 設定併發的worker數量 CELERYD_CONCURRENCY = 4 # 任務傳送完成是否需要確認,這一項對效能有一點影響 CELERY_ACKS_LATE = True # 每個worker執行了多少任務就會銷燬,防止記憶體洩露,預設是無限的 CELERYD_MAX_TASKS_PER_CHILD = 40 # 規定完成任務的時間 CELERYD_TASK_TIME_LIMIT = 15 * 60# 在15分鐘內完成任務,否則執行該任務的worker將被殺死,任務移交給父程序 # 設定預設的佇列名稱,如果一個訊息不符合其他的佇列就會放在預設佇列裡面,如果什麼都不設定的話,資料都會發送到預設的佇列中 CELERY_DEFAULT_QUEUE = "default" # 設定詳細的佇列 CELERY_QUEUES = { "default": { # 這是上面指定的預設佇列 "exchange": "default", "exchange_type": "direct", "routing_key": "default" }, "beat_queue": { "exchange": "beat_queue", "exchange_type": "direct", "routing_key": "beat_queue" } }
配置檔案中設定了 CELERY_IMPORTS
匯入的任務,所以在django app中建立相應的任務檔案:
# app/tasks.py from celery.task import Task import time class TestTask(Task): name = 'test-task'# 給任務設定個自定義名稱 def run(self, *args, **kwargs): print('start test task') time.sleep(4) print('args={}, kwargs={}'.format(args, kwargs)) print('end test task')
在 settings.py
新增:
INSTALLED_APPS = [ # ... 'djcelery', ] # Celery from learn_django.celery_config import *
觸發任務或提交任務可以在view中來呼叫:
# views.py from django.http import HttpResponse from app.tasks import TestTask def test_task(request): # 執行非同步任務 print('start do request') t = TestTask() t.delay() print('end do request') return HttpResponse('ok')
啟動 woker 的命令是:
python manage.py celery worker -l info
再啟動django,訪問該view,可以看到任務在worker中被消費了。
定時任務
在celery的配置檔案 celery_config.py
檔案中新增:
CELERYBEAT_SCHEDULE = { 'task1-every-1-min': {# 自定義名稱 'task': 'test-task', # 與任務中name名稱一致 'schedule': datetime.timedelta(seconds=5), 'args': (2, 15), 'options': { 'queue': 'beat_queue',# 指定要使用的佇列 } }, }
通過 options
的 queque
來指定要使用的佇列,這裡需要單獨的佇列是因為,如果所有任務都使用同一佇列,對於定時任務來說,任務提交後會位於佇列尾部,任務的執行時間會靠後,所以對於定時任務來說,使用單獨的佇列。
啟動 beat:
python manage.py celery beat -l info
監控工具 flower
如果celery中的任務執行失敗了,有些場景是需要對這些任務進行監控, flower
是基於 Tornado 開發的web應用。安裝用 pip install flower
;啟動它可以是:
python manage.py celery flower # python manage.py celery flower --basic_auth=admin:admin
用瀏覽器訪問 http://localhost:5555
即可檢視: