1. 程式人生 > >Celery框架實現非同步執行任務

Celery框架實現非同步執行任務

Celery

官方

Celery 官網:http://www.celeryproject.org/

Celery 官方文件英文版:http://docs.celeryproject.org/en/latest/index.html

Celery 官方文件中文版:http://docs.jinkan.org/docs/celery/

Celery架構

Celery的架構由三部分組成,訊息中介軟體(message broker)、任務執行單元(worker)和 任務執行結果儲存(task result store)組成。

訊息中介軟體

Celery本身不提供訊息服務,但是可以方便的和第三方提供的訊息中介軟體整合。包括,RabbitMQ, Redis等等

任務執行單元

Worker是Celery提供的任務執行的單元,worker併發的執行在分散式的系統節點中。

任務結果儲存

Task result store用來儲存Worker執行的任務的結果,Celery支援以不同方式儲存任務的結果,包括AMQP, redis等

使用場景

非同步任務:將耗時操作任務提交給Celery去非同步執行,比如傳送簡訊/郵件、訊息推送、音視訊處理等等

定時任務:定時執行某件事情,比如每天資料統計

Celery的安裝配置

pip install celery

訊息中介軟體:RabbitMQ/Redis

app=Celery('任務名', broker='xxx', backend='xxx')

Celery執行非同步任務

包架構封裝

project
    ├── celery_task     # celery包
    │   ├── __init__.py # 包檔案
    │   ├── celery.py   # celery連線和配置相關檔案,且名字必須是celery.py
    │   └── tasks.py    # 所有任務函式
    ├── add_task.py     # 新增任務
    └── get_result.py   # 獲取結果

基本使用(新增立即執行任務)

執行流程:

​ 1)建立app + 任務

​ 2)啟動celery(app)服務:

​ 非windows
​ 命令:celery worker -A celery_task -l info
​ windows:
​ pip3 install eventlet
​ celery worker -A celery_task -l info -P eventlet

​ 3)新增任務:手動新增,要自定義新增任務的指令碼,右鍵執行指令碼

​ 4)獲取結果:手動獲取,要自定義獲取任務的指令碼,右鍵執行指令碼

celery.py
from celery import Celery

# broker: 任務倉庫
broker = 'redis://127.0.0.1:6379/5'
# backend: 任務結果倉庫
backend = 'redis://127.0.0.1:6379/6'
# include: 任務(函式)所在檔案
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
tasks.py(任務檔案)
from .celery import app
import time
@app.task
def add(n, m):
    print(n)
    print(m)
    time.sleep(10)
    print('n+m的結果:%s' % (n + m))
    return n + m

@app.task
def low(n, m):
    print(n)
    print(m)
    print('n-m的結果:%s' % (n - m))
    return n - m
add_task.py(新增要執行的任務)
# 右鍵執行該檔案,下面的匯入環境是合理的
from celery_task.tasks import add, low

# 往celery的Broker中新增立即任務
# 先啟動celery: celery worker -A celery_task -l info -P eventlet ,然後右鍵執行執行
t1 = add.delay(10, 20)
t2 = low.delay(50, 10)
print(t2.id)
get_result.py(檢視任務結果)
from celery_task.celery import app

from celery.result import AsyncResult

# 任務執行的id,可從上方任務執行完獲取
id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        # 拿到任務執行完的結果
        result = async.get()
        print(result)
    elif async.failed():
        print('任務失敗')
    elif async.status == 'PENDING':
        print('任務等待中被執行')
    elif async.status == 'RETRY':
        print('任務異常後正在重試')
    elif async.status == 'STARTED':
        print('任務已經開始被執行')

高階使用(執行延遲任務)

celery.py
from celery import Celery

# broker:任務倉庫
broker = 'redis://127.0.0.1:6379/15'
# backend:任務結果倉庫
backend = 'redis://127.0.0.1:6379/15'
# include:任務(函式)所在檔案
app = Celery(broker=broker, backend=backend, include=['celery_package.tasks'])
tasks.py
from .celery import app

@app.task
def jump(n1, n2):
    res = n1 * n2
    print('n1 * n2 = %s' % res)
    return res
add_task.py(新增延遲任務)

注:
args是jump任務需要的引數,沒有就設定為空()

​ eta是該任務執行的UTC格式的時間

from celery_package.tasks import jump

# # 直接執行函式
# jump(10, 20)

# 新增celery立即任務
# jump.delay(10, 20)

from datetime import datetime, timedelta
# 以秒為單位新增延遲時間
def eta_second(second):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(seconds=second)
    return utc_ctime + time_delay

# 以天為單位新增延遲時間
def eta_days(days):
    ctime = datetime.now()
    utc_ctime = datetime.utcfromtimestamp(ctime.timestamp())
    time_delay = timedelta(days=days)
    return utc_ctime + time_delay

# apply_async就是新增延遲任務
jump.apply_async(args=(200, 50), eta=eta_second(10))

高階使用(自動任務)

執行流程:

​ 1)建立app + 任務

​ 2)啟動celery(app)服務:
​ 非windows
​ 命令:celery worker -A celery_task -l info
​ windows:
​ pip3 install eventlet
​ celery worker -A celery_task -l info -P eventlet

​ 3)新增任務:自動新增任務,所以要啟動一個新增任務的服務
​ 命令:celery beat -A celery_task -l info

​ 4)獲取結果:手動獲取,要自定義獲取任務的指令碼,右鍵執行指令碼

celery.py
from celery import Celery

broker = 'redis://127.0.0.1:6379/15'
backend = 'redis://127.0.0.1:6379/15'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自動任務的定時配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
    # 定時任務:任務名自定義
    'fall_task': {
        'task': 'celery_task.tasks.fall',  # 任務源
        'args': (30, 10),  # 任務引數
        'schedule': timedelta(seconds=3), # 定時新增任務的時間
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
    }
}
tasks.py
from .celery import app

@app.task
def fall(n1, n2):
    res = n1 / n2
    print('n1 / n2 = %s' % res)
    return res
get_result.py
from celery_task.celery import app

from celery.result import AsyncResult

id = '21325a40-9d32-44b5-a701-9a31cc3c74b5'
if __name__ == '__main__':
    async = AsyncResult(id=id, app=app)
    if async.successful():
        result = async.get()
        print(result)
    elif async.failed():
        print('任務失敗')
    elif async.status == 'PENDING':
        print('任務等待中被執行')
    elif async.status == 'RETRY':
        print('任務異常後正在重試')
    elif async.status == 'STARTED':
        print('任務已經開始被執行')

django中使用

注意點:

新增自動任務時,需要另外啟動一個新增任務的服務,就是再起一個服務端執行下面的命令。
命令:celery beat -A celery_task -l info

celery.py
# 載入django環境
import os, django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "luffyapi.settings.dev")
django.setup()


from celery import Celery
# 任務倉庫
broker = 'redis://127.0.0.1:6379/15'
# 任務結果倉庫
backend = 'redis://127.0.0.1:6379/15'
# include任務函式檔案的位置
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

# 時區
app.conf.timezone = 'Asia/Shanghai'
# 是否使用UTC
app.conf.enable_utc = False

# 自動任務的定時配置
from celery.schedules import crontab
from datetime import timedelta
app.conf.beat_schedule = {
    # 定時任務:任務名自定義
    'update_banner_cache': {
        'task': 'celery_task.tasks.update_banner_cache',  # 任務源
        'args': (),  # 任務引數
        'schedule': timedelta(seconds=10), # 定時新增任務的時間
        # 'schedule': crontab(hour=8, day_of_week=1),  # 每週一早八點
    }
}
tasks.py
from .celery import app
# 獲取專案中的模型類
from api.models import Banner

@app.task
def test_django_celery():
    banner_query = Banner.objects.filter(is_delete=False).all()
    print(banner_query)