從零開始搭建django前後端分離專案 系列三(實戰之非同步任務執行)
阿新 • • 發佈:2018-12-10
前面已經將專案環境搭建好了,下面進入實戰環節。這裡挑選專案中涉及到的幾個重要的功能模組進行講解。
celery執行非同步任務和任務管理
Celery 是一個專注於實時處理和任務排程的分散式任務佇列。由於本專案進行資料分析的耗時比較長,所以採用非同步方式執行任務。本專案中Broker使用redis,Result Backend使用django的資料庫,部分配置如下settings.py(具體配置見專案程式碼):
import djcelery djcelery.setup_loader() BROKER_URL = 'redis://127.0.0.1:6379/5' BROKER_POOL_LIMIT= 0 CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler' # 定時任務 CELERY_RESULT_BACKEND='djcelery.backends.database:DatabaseBackend' # CELERY_RESULT_BACKEND = 'redis://10.39.211.198:6379/6' CELERY_ACCEPT_CONTENT = ['json'] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE='Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = 10 CELERYD_MAX_TASKS_PER_CHILD = 10 # 每個worker最多執行10個任務就會被銷燬,可防止記憶體洩露
專案中涉及到的celery任務執行成功、執行失敗、執行完成、執行被終止、執行失敗的事件和訊號如下:
@task_prerun.connect def pre_task_run(task_id, task, sender, *args, **kwargs): logger.info('task [{task_id}] 開始執行, taskname: {task.name}'.format(task_id=task_id, task=task)) @task_revoked.connect def task_revoked(request,terminated,sender,expired,signal,signum): now=datetime.now() task_id=request.id logger.warn('task [{0}] 被停止。'.format(task_id)) job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save() class MyTask(Task): def on_success(self, retval, task_id, args, kwargs): job=Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('task [{0}] success。'.format(task_id)) logger.info('task [{0}] 執行成功, success'.format(task_id)) return super(MyTask, self).on_success(retval, task_id, args, kwargs) def on_failure(self, exc, task_id, args, kwargs, einfo): job = Job.objects.filter(task_id=task_id).first() if job: channel = job.id print('channel:', channel) redis_helper = RedisHelper(channel) redis_helper.public('failed') logger.error('task [{0}] 執行失敗, reason: {1} ,einfo: {2}'.format(task_id,exc,einfo)) return super(MyTask, self).on_failure(exc, task_id, args, kwargs, einfo) def after_return(self, status, retval, task_id, args, kwargs, einfo): now = datetime.now() job = Job.objects.filter(task_id=task_id).first() if job: job.runtime = (now - job.create_date).seconds job.save()
獲取任務執行結果:
from celery.result import AsyncResult res=AsyncResult(taskid).get()
終止任務:
from celery.task.control import broadcast, revoke, rate_limit,inspect revoke(task_id, terminate=True)
celery任務啟動:
啟用事件傳送: python manage.py celery -A myproject worker -l info -E --autoscale=6,3 啟動快照相機: python manage.py celerycam -F 10 -l info
在開發過程中發現,當非同步任務中匯入sklearn包時報錯
AttributeError: 'Worker' object has no attribute '_config'
所以在專案task.py中需要新增如下程式碼:
from celery.signals import worker_process_init @worker_process_init.connect def fix_multiprocessing(**_): from multiprocessing import current_process try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'}
並且需要把sklearn相關包從檔案開始匯入移到函式內部匯入,具體見專案程式碼。
效果圖: