1. 程式人生 > >django celery的分布式異步之路(一) hello world

django celery的分布式異步之路(一) hello world

settings csrf trac 選擇 load 你在 函數 1.2 密碼

設想你遇到如下場景:

1)高並發

2)請求的執行相當消耗機器資源,流量峰值的時候可能超出單機界限

3)請求返回慢,客戶長時間等在頁面等待任務返回

4)存在耗時的定時任務

這時你就需要一個分布式異步的框架了。

celery會是一個不錯的選擇。本文將一步一步的介紹如何使用celery和django進行集成,並進行分布式異步編程。

1、安裝依賴

默認你已經有了python和pip。我使用的版本是:

python 2.7.10
pip 9.0.1
virtualenv 15.1.0

創建沙盒環境,我們生產過程中通過沙盒環境來使用各種python包的版本,各個應用的沙盒環境之間互不幹擾。

$ mkdir
kangaroo $ cd kangaroo $ virtualenv kangaroo.env # 沙盒下面有什麽,可以看到有python的bin、include和pip $ ll kangaroo.env total 8 drwxrwxr-x 2 data_monitor data_monitor 4096 Aug 7 16:00 bin drwxrwxr-x 2 data_monitor data_monitor 22 Aug 7 16:00 include drwxrwxr-x 3 data_monitor data_monitor 22 Aug 7 16:00 lib -rw-rw-r-- 1
data_monitor data_monitor 60 Aug 7 16:00 pip-selfcheck.json # 讓沙盒環境在當前session(shell)中生效 $ source kangaroo.env/bin/activate # 可以看到命令行首多了(kangaroo.env),而且python的路徑已經變了 (kangaroo.env) [[email protected] kangaroo]$ which python /data/home/data_monitor/yangfan/test/kangaroo/kangaroo.env/bin/python

下面我們開始在kangaroo環境下安裝相應版本的django和celery,以及django-celery集成包。

(kangaroo.env) [[email protected] kangaroo]$ pip install django==1.10.6
(kangaroo.env) [[email protected] kangaroo]$ pip install celery==3.1.25
(kangaroo.env) [[email protected] kangaroo]$ pip install django-celery==3.2.1

我在安裝的時候寫明了版本號,是因為這套版本號在我們的生產環境過玩轉過。

如果你換了對應的版本號的話,可能會引發沖突,出現意想不到的問題。親測還是有一些版本之間是有怪問題的。

2、創建工程

創建工程kangaroo:django-admin startproject kangaroo

# 在kangaroo.env同級目錄下創建工程kangaroo
cd /home/data_monitor/yangfan/test/kangaroo
# 創建工程kangaroo (kangaroo.
env) [[email protected] kangaroo]$ django-admin startproject kangaroo (kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll total 0 drwxrwxr-x 3 data_monitor data_monitor 37 Aug 7 16:45 kangaroo drwxrwxr-x 5 data_monitor data_monitor 65 Aug 7 16:00 kangaroo.env

創建APP foot:python manage.py startapp foot

# 進入工程目錄,你會看到manage.py文件
cd kangaroo
# 創建APP foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ python manage.py startapp foot
(kangaroo.env) [data_monitor@bigdata-arch-client10 kangaroo]$ ll
total 4
drwxrwxr-x 3 data_monitor data_monitor 116 Aug  7 16:48 foot
drwxrwxr-x 2 data_monitor data_monitor 108 Aug  7 16:48 kangaroo
-rwxrwxr-x 1 data_monitor data_monitor 806 Aug  7 16:45 manage.py
# 進入foot app目錄,看一下有什麽
(kangaroo.env) [[email protected] kangaroo]$ cd foot/
(kangaroo.env) [data_monitor@bigdata-arch-client10 foot]$ ll
total 20
-rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 admin.py
-rw-rw-r-- 1 data_monitor data_monitor 124 Aug  7 16:48 apps.py
-rw-rw-r-- 1 data_monitor data_monitor   0 Aug  7 16:48 __init__.py
drwxrwxr-x 2 data_monitor data_monitor  24 Aug  7 16:48 migrations
-rw-rw-r-- 1 data_monitor data_monitor  98 Aug  7 16:48 models.py
-rw-rw-r-- 1 data_monitor data_monitor  60 Aug  7 16:48 tests.py
-rw-rw-r-- 1 data_monitor data_monitor  63 Aug  7 16:48 views.py

至此我們創建了工程kangaroo和app foot,下面我們介紹如何集成celery。

3、django-celery的集成配置

我們這裏集成的方式是使用django-celery包。

集成配置要註意以下幾個地方就好了,配置起來還是比較簡單的。

1)修改kangaroo/settings.py文件

讓djcelery模塊生效

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
   ...
    djcelery,
    kombu.transport.django,
    ...
)

配置broker和backend

# Celery settings
# redis做broker, 第二個":"前後是redis的用戶名密碼,後面的2是db
# BROKER_URL = ‘redis://:[email protected]:6379/2‘
# rabbitMQ做broker,第二個":"前後是rabbitMQ的用戶名密碼
BROKER_URL = amqp://admin:[email protected]:5672//
# Celery的backend記錄地址,這裏只給出redis的配置
CELERY_RESULT_BACKEND = redis://:[email protected]:6379/3

4、第一個task

其實應該在創建app的時候就將appName添加到settings.py的INSTALLED_APPS中,我們沒有這樣做事留到現在好說明問題。

我們修改settings.py加入foot。

import os
import djcelery
djcelery.setup_loader()
...
INSTALLED_APPS = (
   ...
    djcelery,
    kombu.transport.django,
   ...
    foot,
    ...
)

當settings.py中的djcelery.setup_loader()運行時, Celery便會查看所有INSTALLED_APPS中app目錄中的tasks.py文件, [email protected], 並將它們註冊為celery task。

所以我們在foot包下創建tasks.py文件,並且添加我們的task。

foot/tasks.py

# -*- coding:utf-8 -*-
import time
import logging

from celery import task
logger = logging.getLogger(__name__)

@task()
def foot_task(param_dict):
    logger.info(foot task start! param_dict:%s‘ % param_dict)
    time.sleep(10)
    logger.info(foot task finished!)
    return

這個task等待接收一個參數字典,只是簡單的打印參數,然後sleep10s就退出了。

讓task在後臺worker中註冊,當有任務分發下來的時候就開始執行。只需執行

python manage.py celery worker --loglevel=info

5、分發任務dispatch

任務觸發的兩種方式:

1)定時調度

2)請求執行

我這裏寫了一個通用的函數,這個函數用於分發任務

def dispatch(task, param_dict):
    param_json = json.dumps(param_dict)
    try:
        task.apply_async(
            [param_json],
            retry=True,
            retry_policy={
                max_retries: 1,
                interval_start: 0,
                interval_step: 0.2,
                interval_max: 0.2,
            },
        )
    except Exception, ex:
        logger.info(traceback.format_exc())
        raise

如何觸發foot.tasks中的任務呢,只需要

import logging
import traceback
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt

from foot.tasks import foot_task
from common.dispatcher import dispatch

logger = logging.getLogger(__name__)

@csrf_exempt
def hello(request):
    if request.method == GET:
        try:
            user = request.GET.get(username)
            dispatch(test_task, {hello: user})
            return JsonResponse({code: 0, msg:success})
        except Exception, ex:
            return JsonResponse({code: -1, msg: traceback.format_exc()})

1)當你在客戶端發送請求:hello?username=‘kangaroo‘時

2)服務瞬間返回:{‘code‘: 0, ‘msg‘:‘success‘}

3)後端sleep10秒後執行成功,打印hello:kangaroo

這就是異步的效果。

django celery的分布式異步之路(一) hello world