1. 程式人生 > >Django項目中使用celery做異步任務

Django項目中使用celery做異步任務

led swagger 讀取 celery messages RoCE 個人 none 後臺

異步任務介紹

在寫項目過程中經常會遇到一些耗時的任務, 比如:發送郵件、發送短信等等~。這些操作如果都同步執行耗時長對用戶體驗不友好,在這種情況下就可以把任務放在後臺異步執行
celery就是用於處理異步任務的框架,celery能完成的功能遠不止異步任務,還有一個很常用的功能定時任務

架構圖

技術分享圖片

Celery包含如下組件:

  • Celery Beat:任務調度器,Beat進程會讀取配置文件的內容,周期性地將配置中到期需要執行的任務發送給任務隊列。

  • Celery Worker:執行任務的消費者,通常會在多臺服務器運行多個消費者來提高執行效率。

  • Broker:消息代理,或者叫作消息中間件,接受任務生產者發送過來的任務消息,存進隊列再按序分發給任務消費方(通常是消息隊列或者數據庫)。

  • Producer:調用了Celery提供的API、函數或者裝飾器而產生任務並交給任務隊列處理的都是任務生產者。

  • Result Backend:任務處理完後保存狀態信息和結果,以供查詢。Celery默認已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。

Celery 安裝

pip install django-celery celery-with-redis

項目結構

[vagrant@reboot test_drf]$ tree opsweb/
opsweb/
├── apps
│?? ├── account
│?? │?? ├── admin.py
│?? │?? ├── apps.py
│?? │?? ├── __init__.py
│?? │?? ├── migrations
│?? │?? ├── models.py
│?? │?? ├── __pycache__
│?? │?? ├── tasks.py
│?? │?? ├── tests.py
│?? │?? └── views.py
├── celerybeat.pid
├── logs
│?? ├── celery
│?? │?? ├── beat.log
│?? │?? └── celery.log
├── manage.py
├── opsweb
│?? ├── celery.py
│?? ├── __init__.py
│?? ├── __pycache__
│?? ├── settings.py
│?? ├── urls.py
│?? └── wsgi.py

加載celery app(settings文件中)

INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'corsheaders',
    'rest_framework',
    'rest_framework_swagger',
    'account',
    'djcelery' # celery app 很強大
]

添加Celery全局配置(settings文件中)

# Celery
import djcelery
djcelery.setup_loader()  # 加載djcelery

CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = True

# 允許的格式
CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'yaml']

BROKER_URL = 'redis://127.0.0.1:6379/0'     # redis作為中間件
BROKER_TRANSPORT = 'redis'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'     # Backend數據庫

# CELERYD_LOG_FILE = BASE_DIR + "/logs/celery/celery.log"         # log路徑
# CELERYBEAT_LOG_FILE = BASE_DIR + "/logs/celery/beat.log"     # beat log路徑

同步Celery表到數據庫

python manage.py migrate

創建celery.py文件(與settings同級)

import os
import django
from celery import Celery
from django.conf import settings

# set the default Django settings module for the 'celery' program.  
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'opsweb.settings')
django.setup()
app = Celery('opsweb')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

創建任務文件

在需要使用異步任務的app中創建tasks.py,寫入對應的任務函數,博主喜歡把tasks放在對應的app下,其實放在其他目錄下也可以的,看個人習慣

from opsweb.celery import app
from celery.schedules import crontab
import traceback
from django.contrib.auth.models import User
import os
@app.task(name="create_user")
def useradd(username):
   try:
        print(username)
   except:
        print('fail')
        traceback.print_exc()

觸發任務

在對應的視圖中導入tasks中的任務函數調用即可

from account.tasks import useradd
# 調用異步任務函數
useradd.delay('username')

啟動Celery

進入opsweb工程下,啟動Celery

[vagrant@reboot opsweb]$celery -A opsweb worker -B -l info
或:
[vagrant@reboot opsweb]$python manage.py celery worker -B -l info

備註:
本場景用戶訪問觸發任務,流程如下:
用戶頁面上點擊事件->調用任務/定時計劃任務->任務進入redis隊列
->如果celery啟動則依次執行任務->如果celery沒啟動,則會存到redis

隊列裏,一旦啟動就依次執行

啟動Django

[vagrant@reboot opsweb]$python manage.py runserver 0:8000

測試

頁面上觸發了異步任務就會在celery日誌裏看到任務信息,我這裏只是寫了簡單的任務例子

[2018-09-01 23:56:59,704: WARNING/Worker-2] hello
[2018-09-01 23:56:59,707: INFO/MainProcess] Received task: create_user[c9724e23-b9ba-44fc-b195-6b1153d2c161]
[2018-09-01 23:56:59,708: INFO/MainProcess] Task create_user[f3b3e644-b8aa-4679-8a42-0efc2574abf6] succeeded in 0.0038937819999773637s: None

計劃任務 - djcelery

djcelery app提供了定時任務的功能,註冊並同步到數據庫之後,會生產五個表,結構如下:

MariaDB [test002]> show tables
    -> ;
+---------------------------+
| Tables_in_test002         |
+---------------------------+
...
| djcelery_crontabschedule  |
| djcelery_intervalschedule |
| djcelery_periodictask     |
| djcelery_periodictasks    |
| djcelery_taskstate        |
| djcelery_workerstate      |
+---------------------------+

在django後臺可以看到註冊的表
技術分享圖片

每個任務可以接受參數

技術分享圖片

定時任務函數

from opsweb.celery import app
@app.task(name="create_user"  )
def create_user(*args,**kwargs): # 分別接收後臺傳入的列表和字典參數
    print (args,kwargs)

啟動Celery beat

[vagrant@reboot opsweb]$ python manage.py celery beat # 啟動定時任務

Celery會通過celery beat進程來完成. Celerybeat會保持運行, 一旦到了某一定期任務需要執行時, Celery beat便將其加入到queue中

supervisor管理Celery任務

配置如下

  • 主動觸發任務
celery_worker.conf
[program:celery_worker]
# 進入工作目錄
directory=/vagrant/test_drf/opsweb
# 執行celery指令
command=python manage.py celery worker -B -l info
autorestart=true
loglevel=info
redirect_stderr=true
stdout_logfile=/var/log/supervisor/celery_worker.log
  • 定時任務觸發
celery_beat.conf 同上, 區別如下:
[program:celery_beat]
command=python manage.py celery beat
stdout_logfile=/var/log/supervisor/celery_beat.log

一些不錯的文章
http://www.cnblogs.com/znicy/p/5626040.html Django中使用celery,非常經典
https://www.cnblogs.com/huangxiaoxue/p/7266253.html 基於celery開發的平臺,非常棒
http://python.jobbole.com/81953/ 劉天斯大神 經典案例
http://www.imooc.com/article/16164 慕課網老師博客
http://blog.csdn.net/michael_lbs/article/details/74923367 python+django+djcelery
http://blog.csdn.net/lizhihua0925/article/details/53842110 model api

http://www.jianshu.com/p/7085dcc70d0e 挺好
http://blog.csdn.net/crb912/article/details/78344659 很詳細

Django項目中使用celery做異步任務