1. 程式人生 > >Django的celery配置(包括定時任務、隊列)

Django的celery配置(包括定時任務、隊列)

如何 one tab 輸入 ats minute 依然 threading att

一、安裝celery

Django項目不需要安裝celery這個包,可以直接使用django-celery這個包,,先來安裝它,在終端中輸入:

pip install django-celery

二、安裝rabbitmq,建立celery隊列

我做的項目用的就是rabbitmq,按道理來說,也是可以用redis作為消息隊列的,但是rabbitmq更好,此處不做詳細解釋,有興趣的同學的可以去研究下。
ubuntu環境下,在終端中輸入:

sudo apt-get install rabbitmq-server

三、配置settings.py

首先要在INSTALLED_APPS中添加對djcelery的引用

INSTALLED_APPS = [
    ‘decelery‘,
]

再在settings.py中添加以下代碼

import djcelery
djcelery.setup_loader()

BROKER_URL = ‘amqp://guest:guest@localhost:5672/‘
CELERY_RESULT_BACKEND = ‘amqp‘
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

四、添加celery異步任務

在common/tasks/task1.py中添加以下代碼,就定義了celery異步任務。
因為celery任務可能會很多,為了便於管理,我們就在項目下的common/tasks文件夾中創建了許多task.py文件,如task1.py、task2.py等。
在task1.py中添加以下代碼:

from celery.task import task

@task
def function1(a, b):
    print a + b

註意,讓讓這個celery任務能執行,還必須要在settings.py中加一段配置,這個在上面已經添加了,這裏再特別提醒下,如下:

# common、tasks是文件夾,task1、task2是tasks文件夾下面的py文件
# CELERY_IMPORTS:是導入目標任務文件
CELERY_IMPORTS = (
    "common.tasks.task1",
    "common.tasks.task2",
)

要調用這個異步任務的話,就用

from common.tasks.task1 import function

function.delay(a, b)

是不是很方便?工具的好處就是把事情簡單化,但是太依賴工具,不懂的底層原理的話容易把人搞傻。
比如其實還可以用threading來創建新線程來執行異步任務,此處不再贅述,否則又是一套長篇大論,有興趣的童鞋可以去學習一下如何使用threading庫。

五、啟動celery

因為我們這裏用的是django-celery,而不是直接使用celery這個庫,所以啟動celery的命令與celery官網裏面介紹的是不一樣的,新人很可能因為這個掉進坑裏,所以這裏我特別提醒一下。
終端命令如下:

# 先啟動服務器
python manage.py runserver
# 再啟動worker(--concurrency=2是開4個worker進程,不加也可以啟動,只不過在生產環境還是應該多開幾個進程的)
python manage.py celery worker --concurrency=2 -l info

六、celery定時任務的配置

在項目中有時不僅僅使用以上的異步任務,有時候需要創建很多定時任務,這樣celery又可以大顯身手了。在settings.py中添加以下配置,就可以添加定時任務

from celery.schedules import crontab

# 下方的common和tasks依然是文件夾
# function2、function3分別是tasks文件夾中的task1.py、task2.py文件的函數的函數名
CELERYBEAT_SCHEDULE = {
    ‘function2‘: {
        ‘task‘: ‘common.tasks.task1‘,
        ‘schedule‘: crontab(minute=‘*/50‘), # 每50分鐘執行一次
    },
    ‘function3‘: {
        ‘task‘: ‘common.tasks.task2‘,
        ‘schedule‘: crontab(minute=0, hour=‘8,13‘), # 每天的8點0分和13點0分各執行一次
    },
}

下面是common/tasks/task1.py中的函數

from celery.task import task

@task
def function2():
    print ‘=‘*40
    print ‘This is function2, celery is great!‘
    print ‘=‘*40

下面是common/tasks/task2.py中的函數,跟task1.py中是一樣的使用方法。

from celery.task import task

@task
def function3():
    print ‘=‘*40
    print ‘This is function3, celery is great!‘
    print ‘Fuck celery start failure!‘
    print ‘=‘*40

七、啟動celery定時任務

得先啟動第六節中介紹的命令後,再執行這個命令,定時任務才能執行。因為beat只是分配定時任務給celery的worker,所以只有worker啟動後,定時任務才能異步執行。
顧名思義,worker就是幹苦力的民工,累活臟或都給它幹。beat可以形象的理解為工地做計劃的人,到了要幹活的時候就分配任務給民工,可能是包工頭,也可能是一般的管理工程進度的小弟。反正都是苦命的人,屌絲何必難為屌絲。 (╥╯^╰╥)
終端命令如下:

python manage.py celery beat -l info

八、celery隊列的配置

在項目中celery的異步任務很多的時候,這個時候我們就需要將不同的任務分配到不同的隊列(queue)中去執行,如果只有一個默認隊列的話,所有異步任務都會在這個隊列中執行(是需要排隊的,先來的先執行),任務很多的時候,就沒法同時執行很多任務了,甚至造成任務的擁堵。將不同的任務分配到不同的隊列就可以保證同一時刻可以同時運行不同隊列中的任務,互補幹擾,並且每個隊列可以單獨開好幾個進程。
進程數最好不要超過CPU的核數,因為CPU只有4個核的話,你開5個進程,同一時間還是只能執行4個進程。

我們可以在項目中設置三個隊列(default, frontend, backend),隊列的名字可以自己任意取。以下是在settings.py中添加隊列的配置:

from kombu import Exchange, Queue

# 默認隊列是default
CELERY_DEFAULT_QUEUE = ‘default‘
CELERY_DEFAULT_EXCHANGE = ‘default‘
CELERY_DEFAULT_ROUTING_KEY = ‘default‘

# x-priority是任務的優先級
# 優先級就是哪個隊列優先執行,比較緊迫的需要馬上執行的任務優先級可以設置為最高
CELERY_QUEUES = (
    Queue(‘default‘, Exchange(‘default‘), routing_key=‘default‘, consumer_arguments={‘x-priority‘: 5}),
    Queue(‘frontend‘, Exchange(‘frontend‘), routing_key=‘frontend‘, consumer_arguments={‘x-priority‘: 10}),
    Queue(‘backend‘, Exchange(‘backend‘), routing_key=‘backend‘, consumer_arguments={‘x-priority‘: 8}),
)

# 特別需要註意的是,異步任務的路徑必須精確到函數名(比如下方的common、tasks是文件夾,task1、task2是py文件,function1就是task1.py中的定義的異步任務的函數名),不然的話異步任務就沒法執行
CELERY_ROUTES = {
    "common.tasks.task1.function1": {‘queue‘: "frontend", ‘routing_key‘: ‘frontend‘},
    "common.tasks.task1.function2": {‘queue‘: "backend", ‘routing_key‘: ‘backend‘},
    "common.tasks.task2.function3": {‘queue‘: "default", ‘routing_key‘: ‘default‘},
}

九、啟動celery隊列

配置了隊列的話,如果執行python manage.py celery worker --concurrency=2 -l info的話就只會創建一個默認的隊列,而我們需要創建多個隊列,這樣我們就不需要運行這個命令了,我們需要在終端中分別運行以下命令:

# -Q 後面加的是配置的隊列名,concurrency(進程數)設置為幾就由自己定了,只要不超過CPU核數就行了
python manage.py celery worker -l info -Q default --concurrency=1
python manage.py celery worker -l info -Q frontend --concurrency=2
python manage.py celery worker -l info -Q backend --concurrency=4

十、補充

Django下要查看其他celery的命令,包括參數配置、啟動多worker進程的方式都可以通過python manage.py celery --help來查看,一下是終端輸入命令後出來的提示信息:

Usage: manage.py celery <command> [options] 

Show help screen and exit.

Options:
  -A APP, --app=APP     app instance to use (e.g. module.attr_name)
  -b BROKER, --broker=BROKER
                        url to broker.  default is ‘amqp://guest@localhost//‘
  --loader=LOADER       name of custom loader class to use.
  --config=CONFIG       Name of the configuration module
  --workdir=WORKING_DIRECTORY
                        Optional directory to change to after detaching.
  -C, --no-color        
  -q, --quiet           
  --version             show program‘s version number and exit
  -h, --help            show this help message and exit

---- -- - - ---- Commands- -------------- --- ------------

+ Main: 
|    celery worker
|    celery events
|    celery beat
|    celery shell
|    celery multi
|    celery amqp

+ Remote Control: 
|    celery status
 
|    celery inspect --help
|    celery inspect active
|    celery inspect active_queues
|    celery inspect clock
|    celery inspect conf None
|    celery inspect memdump
|    celery inspect memsample
|    celery inspect objgraph None
|    celery inspect ping
|    celery inspect registered
|    celery inspect report
|    celery inspect reserved
|    celery inspect revoked
|    celery inspect scheduled
|    celery inspect stats
 
|    celery control --help
|    celery control add_consumer <queue> [exchange [type [routing_key]]]
|    celery control autoscale [max] [min]
|    celery control cancel_consumer <queue>
|    celery control disable_events
|    celery control enable_events
|    celery control pool_grow [N=1]
|    celery control pool_shrink [N=1]
|    celery control rate_limit <task_name> <rate_limit> (e.g. 5/s | 5/m | 5/h)>
|    celery control time_limit <task_name> <soft_secs> [hard_secs]

+ Utils: 
|    celery purge
|    celery list
|    celery migrate
|    celery call
|    celery result
|    celery report
---- -- - - --------- -- - -------------- --- ------------

Type ‘celery <command> --help‘ for help using a specific command.

Django的celery配置(包括定時任務、隊列)