1. 程式人生 > >django+celery 實現分布式任務

django+celery 實現分布式任務

交換機 觸發 本質 seconds 數據報 bit con 定時器1 mps

想用django做一個自動運維平臺,利用netsnmp來獲取交換機及服務器信息,但是snmpget任務需要在後臺實時運行,為了不影響html響應,利用celery來結合django做異步任務隊列。

一、環境準備
1.首先安裝celery
pip3 install celery
2.安裝djcelery
pip3 install django-celery
3.安裝一個broker
我們必須擁有一個broker消息隊列用於發送和接收消息。Celery官網給出了多個broker的備選方案:RabbitMQ、Redis、Database(不推薦)以及其他的消息中間件。本次我們利用redis

sudo apt-get install redis

redis-server
啟動redis服務, 端口假設為6379
redis-cli 查看redis 狀態
root@ubuntu:~/Desktop/webserver/myserver# redis-cli
127.0.0.1:6379>

二、django 配置
配置settings.py
首先,在Django工程的settings.py文件中加入如下配置代碼:

celery 配置信息 start

#############################
import djcelery

celery 配置

djcelery.setup_loader()
BROKER_URL = ‘redis://127.0.0.1:6379/1‘

CELERY_TIMEZONE = ‘Asia/Shanghai‘
CELERYBEAT_SCHEDULER = ‘djcelery.schedulers.DatabaseScheduler‘
CELERY_IMPORTS = (‘serverapp.task‘)
#############################

celery 配置信息 end

#############################

當djcelery.setup_loader()運行時,Celery便會去查看INSTALLD_APPS下包含的所有app目錄中的tasks.py文件,找到標記為task的方法,將它們註冊為celery task

? BROKER_URL:broker是代理人,它負責分發任務給worker去執行。我使用的是Redis作為broker

? 沒有設置 CELERY_RESULT_BACKEND,默認沒有配置,此時Django會使用默認的數據庫(也是你指定的orm數據庫)。

CELERY_IMPORTS:是導入目標任務文件

CELERYBEAT_SCHEDULER:使用了django-celery默認的數據庫調度模型,任務執行周期都被存在默認指定的orm數據庫中.

CELERYBEAT_SCHEDULE:設置定時的時間配置, 可以精確到秒,分鐘,小時,天,周等。

3.在主工程的配置文件settings.py 中應用註冊表INSTALLED_APPS中加入 djcelery

INSTALLED_APPS = [
‘django.contrib.admin‘,
‘django.contrib.auth‘,
‘django.contrib.contenttypes‘,
‘django.contrib.sessions‘,
‘django.contrib.messages‘,
‘django.contrib.staticfiles‘,
‘djcelery‘, #加入djcelery
]

4.(3)創建應用實例

? 在主工程目錄添加celery.py, 添加自動檢索django工程tasks任務

? vim artproject/celery.py

#目的是拒絕隱士引入,celery.py和celery沖突。
from future import absolute_import

import os

from celery import Celery, platforms
platforms.C_FORCE_ROOT = True

os.environ.setdefault(‘DJANGO_SETTINGS_MODULE‘, ‘myserver.settings‘)
#Specifying the settings here means the celery command line program will know where your Django project is.
#This statement must always appear before the app instance is created, which is what we do next:
from django.conf import settings

app = Celery(‘serverapp‘)

app.config_from_object(‘django.conf:settings‘)
#This means that you don’t have to use multiple configuration files, and instead configure Celery directly from the Django settings.
#You can pass the object directly here, but using a string is better since then the worker doesn’t have to serialize the object.

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
#With the line above Celery will automatically discover tasks in reusable apps if you define all tasks in a separate tasks.py module.
#The tasks.py should be in dir which is added to INSTALLED_APP in settings.py.
#So you do not have to manually add the individual modules to the CELERY_IMPORT in settings.py.

@app.task(bind=True)
def debug_task(self):
print(‘Request: {0!r}‘.format(self.request)) #dumps its own request information

5.(4) 創建任務 tasks

每個任務本質上就是一個函數,在tasks.py中,寫入你想要執行的函數即可。

在應用art中添加我們需要提供的異步服務和定時服務

vim art/tasks.py

#!/usr/bin/env python

encoding: utf-8

from future import absolute_import
import time
from django.core.mail import send_mail
from celery.utils.log import get_task_logger
from artproject.celery import app
?
from art.utils.send_mail import pack_html, send_email?
@app.task
br/>?
@app.task
url = "http://1000phone.com"
receiver = ‘[email protected]
content = pack_html(receiver, url)

content = ‘this is email content.‘

send_email(receiver, content)
print(‘send email ok!‘)?
?
@app.task
br/>?
?
@app.task
return x+y
6.遷移生成celery需要的數據表

python manage.py migrate
此時數據庫表結構多出了幾個

7.
8.
9.啟動服務,測試
我們可以采用 python manage.py help 發現多出了 celery 相關選項。

(1)啟動django celery 服務

啟動服務:

python manage.py celery worker --loglevel=info

此時異步處理和定時處理服務都已經啟動了

(2)web端接口觸發異步任務處理

我們在web端加入一個入口,觸發異步任務處理add函數

在應用art的urls.py 中加入如下對應關系

from art.views import add_handler
?
?
url(r‘^add‘, add_handler),

art/views.py 中加入處理邏輯

def add_handler(request):
x = request.GET.get(‘x‘, ‘1‘)
y = request.GET.get(‘y‘, ‘1‘)
from .tasks import add
add.delay(int(x), int(y))
res = {‘code‘:200, ‘message‘:‘ok‘, ‘data‘:[{‘x‘:x, ‘y‘:y}]}
return HttpResponse(json.dumps(res))

啟動web服務,通過url傳入的參數,通過handler的add.delay(x, y)計算並存入mysql

http://127.0.0.1:8000/art/add?x=188&y=22

(4) 測試定時器,發送郵件

在終端輸入 python manage.py celerybeat -l info

會自動觸發每隔30s執行一次tsend_email定時器函數,發送郵件:

CELERYBEAT_SCHEDULE = { #定時器策略
#定時任務一: 每隔30s運行一次
u‘測試定時器1‘: {
"task": "art.tasks.tsend_email",
#"schedule": crontab(minute=‘*/2‘), # or ‘schedule‘: timedelta(seconds=3),
"schedule":timedelta(seconds=30),
"args": (),
},
}
具體發送郵件服務程序見下面的第4節

4 郵件發送服務
項目中經常會有定時發送郵件的情形,比如發送數據報告,發送異常服務報告等。

可以編輯文件 art/utils/send_mail.py, 內容編輯如下:

#!/usr/bin/env python
#-- coding:utf-8 --
#written by zhouguangyou
#發送郵件(wd_email_check123賬號用於內部測試使用,不要用於其他用途)
?
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.image import MIMEImage
from email.header import Header
import time
?
sender = ‘[email protected]
subject = u‘api開放平臺郵箱驗證‘
smtpserver = ‘smtp.163.com‘
username = ‘wwwwww‘
password = ‘wwwww1234‘
mail_postfix="163.com"
?
def send_email(receiver, content):
try:
me = username+"<"+username+"@"+mail_postfix+">"
msg = MIMEText(content, ‘html‘, ‘utf-8‘)
msg[‘Subject‘] = subject
msg[‘From‘] = sender
msg[‘To‘] = receiver
smtp = smtplib.SMTP()
smtp.connect(smtpserver)
smtp.login(username, password)
smtp.sendmail(sender, receiver, msg.as_string())
smtp.quit()
return True
except Exception as e:
print(‘send_email has error with : ‘ + str(e))
return False
?
?
def pack_html(receiver, url):
html_content = u"<html><div>尊敬的用戶<font color=‘#0066FF‘>%s</font> 您好!</div><br>" \
"<div>感謝您關註我們的平臺 ,我們將為您提供最貼心的服務,祝您購物愉快。</div><br>" \
"<div>點擊以下鏈接,即可完成郵箱安全驗證:</div><br>" \
"<div><a href=‘%s‘>%s</a></div><br>" \
"<div>為保障您的帳號安全,請在24小時內點擊該鏈接; </div><br>" \
"<div>若您沒有申請過驗證郵箱 ,請您忽略此郵件,由此給您帶來的不便請諒解。</div>" \
"</html>" % (receiver, url, url)
html_content = html_content
return html_content
?
?
if name == "main":
url = "http://xxxx.com"
receiver = ‘[email protected]
#content = pack_html(receiver, url)
content = ‘this is email content. at %s.‘%int(time.time())
send_email(receiver, content)

至此,在celery ui界面可以看到兩類,定時器處理和異步處理。

django+celery 實現分布式任務