1. 程式人生 > >Celery異步的分布式任務調度理解

Celery異步的分布式任務調度理解

生產 業務邏輯 應用 text 發送 mongo 實現 delay 手機號

什麽是Celery呢?

Celery是一個用Python開發的異步的分布式任務調度模塊。

Celery本身不包含消息服務,使用第三方消息服務,也就是Broker,來傳遞任務,目前支持的有Rebbimq,Redis,數據庫以及其他的一些比如Amazon SQS,Monogdb和IronMQ 。

Celery支持同步和異步執行兩種模式。同步模式為任務調用方等待任務執行完成,這種方式等同於RPC(Remote Procedure Call), 異步方式為任務在後臺執行,調用方調用後就去做其他工作,之後再根據需要來查看任務結果。Celery自己沒有實現消息隊列,而是直接已存在的消息隊列作為Broker角色。官方推薦的Broker為 RabbitMQ ,除此之外,Redis、Beanstalkd、MongoDB等也都支持,具體可參考

官方文檔

Celery整體架構可以理解為下圖:

技術分享圖片

整體上包括三個角色:

  • Celery client: 這是任務生產者,它負責將任務發送到Broker中。
  • Broker: Broker負責將任務分發給相應的celery worker。
  • Celery worker: 這是任務的執行者,完成相應的業務邏輯,在具體實現上體現為Python函數。

下面說一下Celery實例:

使用Celery完成發送短信:

這裏使用redis作為消息隊列(任務隊列)

  關於Redis和Rebbimq 消息隊列的比較:https://www.cnblogs.com/lzc978/articles/10291597.html

整個celery服務器實時監聽任務隊列邏輯

技術分享圖片

0. 安裝環境

0.1 pip安裝Celery框架

pip install celery

0.2 啟動Celery

celery -A 應用的包路路徑 worker -l info

1. 代碼結構

技術分享圖片

1.1 config.py配置文件

# celery配置文件
# 指定任務隊列列的位置 使用redis做為broker任務隊列
broker_url = "redis://192.168.103.210/7"

1.2 main.py celery分布式異步任務隊列啟動文件

# celery啟動文件
from celery import Celery

# 創建celery實例例 celery_app = Celery(file) # 加載celery配置 celery_app.config_from_object(celery_tasks.config) # 自動註冊celery任務 celery_app.autodiscover_tasks([celery_tasks.sms])

2. Celery異步任務的定義

2.1 sms.tasks.py 異步任務/將被放在redis的任務隊列(broker)中等待觸發執行

# 發送短信的異步任務
from .yuntongxun.sms import CCP
from . import constants
from celery_tasks.main import celery_app

# 裝飾器?將send_sms_code裝飾為異步任務,並設置別名
@celery_app.task(name=send_sms_code)
def send_sms_code(mobile, sms_code):
"""
發送短信異步任務
:param mobile: 手機號
:param sms_code: 短信驗證碼
:return: None
"""
CCP().send_template_sms(mobile, [sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], constants.SEND_SMS_TEMPLATE_ID)

3. 開啟Celery

celery -A celery_tasks.main worker -l info

4. 執行Celery異步任務(觸發worker)

4.1 views.py 視圖邏輯,即觸發執行worker的業務邏輯

# 生成和發送短信驗證碼
sms_code = %06d % random.randint(0,999999)
# CCP().send_template_sms(mobile,[sms_code, constants.SMS_CODE_REDIS_EXPIRES // 60], 1)
# celery異步發送短信
send_sms_code.delay(mobile,sms_code)

至此,一個簡單的異步分布式Celery(異步任務)服務器搭建完成

Celery異步的分布式任務調度理解