1. 程式人生 > >celery在python中的應用

celery在python中的應用

deep ins out 發送短信 getting 輸入驗證 end delay odi

這裏不解釋celery,如果不清楚可以參考下面鏈接:

http://docs.celeryproject.org/en/latest/getting-started/introduction.html

這裏來演示一下在Django項目中如何使用celery:

1. 首先我們需要使用到兩個庫,用pip安裝:

  pip install celery

  pip install django-celery

2. 在celery建議使用rabbitmq作為消息代理,當然也支持redis作代理,abbitmq提供的隊列和消息持久化機制確實更加穩定,所以對於追求穩定性的任務更適合適配rabbitmq作為中間件, 這裏用rabbitmq作為消息代理,用redis作為存儲後端

  我的環境是deepin,安裝rabbitmq和redis

  sudo apt-get install rabbitmq-server
  sudo apt-gei install redis

3. 在django中使用celery的方式和普通py文件中的方式略有不同,下面是一個向通過秒滴平臺發送短信驗證碼的demo:

  • 普通py文件用法:
# tasks.py

import os
from celery import Celery

app = Celery(tasks, backend=amqp://guest@localhost//, broker=
redis://localhost:6379/1) @app.task(name="send_verification_code") def _send_verification_code(phone_number, verification_code): """ :param phone_number: 目標手機號 :param verification_code: 驗證碼 :return: True:發送成功 False:發送失敗 """ api = getConfig(MiaoDi, api
) accountSid = getConfig(MiaoDi, accountSid) templateid = getConfig(MiaoDi, templateid) timeout_s = getConfig(MiaoDi, timeout) param = {},{}.format(verification_code, timeout_s) timestamp = datetime.datetime.now().strftime(%Y%m%d%H%M%S) sign = hash_sign(timestamp) data = { accountSid: accountSid, templateid: templateid, param: param, to: phone_number, timestamp: timestamp, sig: sign } response = requests.post(url=api, data=data) ret_json = response.text ret_dict = eval(ret_json) if ret_dict.get(respCode) != 00000: return False else: return True
# view.py
from tasks import _send_verification_code

def send_verification_code(phone_number, verification_code):
  task = _send_verification_code.delay(phone_number, verification_code)
if __name__ == __main__:
  phone_number = input(請輸入手機號:)
  verification_code = input(請輸入驗證碼:)
  send_verification_code(phone_number, verification_code)

celery在python中的應用