1. 程式人生 > >從零開始搭建django前後端分離專案 系列三(實戰之非同步任務執行)

從零開始搭建django前後端分離專案 系列三(實戰之非同步任務執行)

前面已經將專案環境搭建好了,下面進入實戰環節。這裡挑選專案中涉及到的幾個重要的功能模組進行講解。

celery執行非同步任務和任務管理

Celery 是一個專注於實時處理和任務排程的分散式任務佇列。由於本專案進行資料分析的耗時比較長,所以採用非同步方式執行任務。本專案中Broker使用redis,Result Backend使用django的資料庫,部分配置如下settings.py(具體配置見專案程式碼):

import djcelery
djcelery.setup_loader()
BROKER_URL = 'redis://127.0.0.1:6379/5'
BROKER_POOL_LIMIT 
= 0 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務 CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend' # CELERY_RESULT_BACKEND = 'redis://10.39.211.198:6379/6' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE
='Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = 10 CELERYD_MAX_TASKS_PER_CHILD = 10 # 每個worker最多執行10個任務就會被銷燬,可防止記憶體洩露

專案中涉及到的celery任務執行成功、執行失敗、執行完成、執行被終止、執行失敗的事件和訊號如下:

@task_prerun.connect
def pre_task_run(task_id, task, sender, *args, **kwargs):
    logger.info('task [{task_id}] 開始執行, taskname: {task.name}
'.format(task_id=task_id, task=task)) @task_revoked.connect def task_revoked(request,terminated,sender,expired,signal,signum): now=datetime.now() task_id=request.id logger.warn('task [{0}] 被停止。'.format(task_id)) job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save() class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): job=Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('task [{0}] success。'.format(task_id)) logger.info('task [{0}] 執行成功, success'.format(task_id)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): job = Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('failed') logger.error('task [{0}] 執行失敗, reason: {1} ,einfo: {2}'.format(task_id,exc,einfo)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): now = datetime.now() job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save()

獲取任務執行結果:

from celery.result import AsyncResult
res=AsyncResult(taskid).get()

終止任務:

from celery.task.control import broadcast, revoke, rate_limit,inspect
revoke(task_id, terminate=True)

celery任務啟動:

啟用事件傳送:
python manage.py celery -A myproject worker -l info -E --autoscale=6,3
啟動快照相機:
python manage.py celerycam -F 10 -l info

在開發過程中發現,當非同步任務中匯入sklearn包時報錯 

AttributeError: 'Worker' object has no attribute '_config'

所以在專案task.py中需要新增如下程式碼:

from celery.signals import worker_process_init
@worker_process_init.connect
def fix_multiprocessing(**_):
  from multiprocessing import current_process
  try:
    current_process()._config
  except AttributeError:
    current_process()._config = {'semprefix': '/mp'}

並且需要把sklearn相關包從檔案開始匯入移到函式內部匯入,具體見專案程式碼。

效果圖: