1. 程式人生 > >Django中使用Celery

Django中使用Celery

模組:

Python 3.6.6
PyMySQL==0.8.1
Django==2.1.3
redis==3.0.1
celery==4.1.1
django-celery-beat ==1.1.1
django-celery-results==1.0.1

目錄結構

emgc
├── front
│   ├── __init__.py
│   ├── apps.py
│   ├── migrations
│   │   └── __init__.py
│   ├── models.py
│   ├── tasks.py
│   └── views.py
├── manage.py
├── emgc
│   ├── __init__.py
│   ├──	celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
└── templates

配置使用

emgc/emgc/celery.py:

	from __future__ import absolute_import, unicode_literals
	import os
	from celery import Celery
	from emgc import settings
	
	# set the default Django settings module for the 'celery' program.

	os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'emgc.settings')
	app = Celery('emgc')
	
	# Using a string here means the worker don't have to serialize
	# the configuration object to child processes.
	# - namespace='CELERY' means all celery-related configuration keys
	#   should have a `CELERY_` prefix.
	
	app.config_from_object('django.conf:settings', namespace='CELERY')
	
	# Load task modules from all registered Django app configs.
	
	app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)	
	
		
emgc/emgc/__init__.py:

	from __future__ import absolute_import, unicode_literals
	
	# This will make sure the app is always imported when
	# Django starts so that shared_task will use this app.
	
	from .celery import app as celery_app
	__all__ = ['celery_app']


emgc/emgc/settings.py

	import pymysql
	pymysql.install_as_MySQLdb()
	INSTALLED_APPS = [
		...
	    'django_celery_results',
	    'django_celery_beat',
	    'front',
	]
	LANGUAGE_CODE = 'zh-Hans'
	TIME_ZONE = 'Asia/Shanghai'
	USE_I18N = True
	USE_L10N = True
	USE_TZ = True #如果USE_TZ設定為True時,Django會使用系統預設設定的時區,即America/Chicago,此時的TIME_ZONE不管有沒有設定都不起作用。
	# celery for redis
	# 由於celery-4.1.0存在時區bug,必須使用UTC時區
	CELERY_RESULT_BACKEND = 'redis://localhost:6379' # BACKEND配置,這裡使用redis
	# CELERY_RESULT_BACKEND = 'django-db' #使用django orm 作為結果儲存
	CELERY_BROKER_URL = 'redis://localhost:6379'
	CELERY_TIMEZONE = 'UTC'
	CELERY_ENABLE_UTC = True
	CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
	# 使用資料庫來存放定時任務記錄
	CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'


進入專案的taskproj目錄啟動worker:
	celery -A tasks worker --pool=solo -l info (在windows下 不加--pool=solo 出現報錯ValueError: not enough values to unpack (expected 3, got 0))

定義任務

emgc/front/tasks.py:

	from __future__ import absolute_import, unicode_literals
	from celery import shared_task
	
	@shared_task
	def add(x, y):
	    return x + y
	    
	@shared_task
	def mul(x, y):
	    return x * y

觸發任務

emgc/front/views.py:

	from django.http import JsonResponse
	from app01 import tasks
	# Create your views here.
	def index(request):
	    x = int(request.GET.get("x"))
	    y = int(request.GET.get("y"))
	    res1=tasks.add.delay(x,y)
	    res2=tasks.mul.delay(x,y)
	    print("add:", x, y, res1.task_id)
	    print("mul:", x, y, res2.task_id)
	    return JsonResponse("success", safe=False)

從redis中獲取結果:

	127.0.0.1:6379> get celery-task-meta-517829d8-ebe5-4be2-95d9-d750df717cc0 (517829d8-ebe5-4be2-95d9-d750df717cc0 為task_id)

使用django orm 作為結果儲存:

CELERY_RESULT_BACKEND = 'django-db'   emgc/emgc/settings.py中配置

python3 manage.py migrate django_celery_results  生成表
(django_celery_results_taskresult)

model :
		class TaskResult(models.Model):
		"""Task result/status."""

		task_id = models.CharField(_('task id'), max_length=255, unique=True)
		task_name = models.CharField(_('task name'), null=True, max_length=255)
		task_args = models.TextField(_('task arguments'), null=True)
		task_kwargs = models.TextField(_('task kwargs'), null=True)
		status = models.CharField(_('state'), max_length=50,
								  default=states.PENDING,
								  choices=TASK_STATE_CHOICES
								  )
		content_type = models.CharField(_('content type'), max_length=128)
		content_encoding = models.CharField(_('content encoding'), max_length=64)
		result = models.TextField(null=True, default=None, editable=False)
		date_done = models.DateTimeField(_('done at'), auto_now=True)
		traceback = models.TextField(_('traceback'), blank=True, null=True)
		hidden = models.BooleanField(editable=False, default=False, db_index=True)
		meta = models.TextField(null=True, default=None, editable=False)

		objects = managers.TaskResultManager()

		class Meta:
			"""Table information."""

			ordering = ['-date_done']

			verbose_name = _('task result')
			verbose_name_plural = _('task results')

		def as_dict(self):
			return {
				'task_id': self.task_id,
				'task_name': self.task_name,
				'task_args': self.task_args,
				'task_kwargs': self.task_kwargs,
				'status': self.status,
				'result': self.result,
				'date_done': self.date_done,
				'traceback': self.traceback,
				'meta': self.meta,
			}

		def __str__(self):
			return '<Task: {0.task_id} ({0.status})>'.format(self)