1. 程式人生 > >django與celery結合實現非同步任務

django與celery結合實現非同步任務

celery 基本概念

Celery 是一個 基於python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery

celery的優點

      

  1. 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
  2. 高可用:當任務執行失敗或執行過程中發生連線中斷,celery 會自動嘗試重新執行任務
  3. 快速:一個單程序的celery每分鐘可處理上百萬個任務
  4. 靈活: 幾乎celery的各個元件都可以被擴充套件及自定製

實現非同步的請求,配置如下

1.在工程目錄下建立一個celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
 
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
# proj 是專案的名稱需要更改 
app = Celery('proj')
 
# Using a string here means the worker don't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
 
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
 
 
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

2.在專案工程目錄下的__init__.py中新增

from __future__ import absolute_import, unicode_literals
 
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
 
__all__ = ['celery_app']

3.在自己的app專案下新增task.py檔案

# Create your tasks here
from __future__ import absolute_import, unicode_literals
from celery import shared_task
 
 
@shared_task
def add(x, y):
    return x + y
 
 
@shared_task
def mul(x, y):
    return x * y
 
 
@shared_task
def xsum(numbers):
    return sum(numbers)

3.在setting.py中新增設定redis

CELERY_BROKER_URL = 'redis://:密碼@ip'
CELERY_RESULT_BACKEND = 'redis://:密碼@ip'

4.在videx.py中進行寫任務

from celery.result import AsyncResult
from django.shortcuts import render,HttpResponse
import time
# Create your views here.
from myapp import tasks
def index(request):
    res = tasks.add.delay(5,98)
    print(res)
    return HttpResponse(res.task_id)

def task_res(request):
    result = AsyncResult(id='5fea266e-18ad-4659-aa8f-c5d16783defd')

    return HttpResponse(result.get())

5.路由的配置urls.py

   url(r'^index/$', index),
   url(r'^task_res/$', task_res),

這些配置好之後,講程式碼上傳到liunx環境下

進入i專案路徑使用命令啟動非同步任務

執行下面的命令

celery -A CeleryTest worker -l info

這樣服務端的任務已經啟動成功了

在此啟動自己的django專案就可以使用了,在測試中返回結果的id用id可以獲取非同步任務的結果狀態等資訊