1. 程式人生 > >Python定時任務 Celery+Redis

Python定時任務 Celery+Redis

引言:

  當需要用python建立和完成定時任務時,第一個會想到的就是使用crontab庫,

  但是crontab不支援Windows系統,於是我們可以使用一個支援Windows且功能強大的庫:Celery。

Celery-分散式任務佇列:

  Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具。

  它是一個任務佇列,專注於實時處理,同時還支援任務排程。

  Celery是開源的,並根據BSD許可授權

  Celery由Python語言實現。

Celery安裝:

  celery的版本非常令人頭疼,它的版本無法做到完全上下相容,且有些版本會與一些庫衝突,所以對於celery版本的選擇非常重要:

    1、celery 4.0.0~4.2.0(latest) 目前最新的celery版本是官方的4.2.0 連結:http://docs.celeryproject.org/en/latest/

     (1) celery 4.0.0及以上版本目前不相容python3.7 由於在python3.7中 async變成了關鍵字 在執行celery worker的時候會報錯,

         

         from . import async,base 此處出錯。

     (2) django-celery是一個便於在django工程中管理celery的庫,但是django-celery不支援celery 4.0.0以上版本,如若要在django工程中

      使用celery,請降低版本,比如celery 3.1.26

    2、celery 3~4的版本之間 相對穩定 本人用的就是celery 3.1.26版本 推薦 pip install celery==3.1.26 

Celery介紹:

上圖為celery工作簡要流程,原諒我盜的圖...原作者po上:https://www.qikqiak.com/

1、application(task producer)"任務的生產者",我們通過程式碼層面,產生任務然後傳送給broker(task queue)

2、celery beat(task scheduler)"任務排程器",常見為生成定時任務

3、broker(task queue)"任務佇列",我們需要將任務送往任務佇列去,官網強烈推薦的broker有:redis和rabbitmq

4、worker(taks consumer)"任務消費者",在任務送往broker後,worker從中進行操作,完成任務後生成result

5、result 完成任務後產生的結果

Broker的選擇:

  broker我們可選的有redis和rabbitmq,官網推薦的兩種,我們這裡選redis,相對於rabbitmq更加輕量級,安裝方便。

  所以我們需要安裝redis,redis很容易安裝,這裡就不細講了,po上github連結:https://github.com/MicrosoftArchive/redis/releases

  與此同時 celery還需要python的redis依賴庫這裡注意版本 pip install redis==2.10 最好下2.10版本,這裡是為了後續和django聯用

  若下最新版本,可能會報以下錯:

任務的編寫:

  主要的程式碼層面來了,我們通過流程圖知道,我們需要生產任務,目錄結構如下圖:

        

  1、其中__init__.py是通過啟動專案時,選擇的配置檔案:

1 from celery import Celery
2 
3 app = Celery('demo')
4 app.config_from_object('celery_app.celeryconfig')

  2、celeryconfig.py裡面主要是celery的配置:

from datetime import timedelta
from celery.schedules import crontab
# 配置broker為redis
BROKER_URL = 'redis://localhost:6379/1'
# 配置結果儲存至redis
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
# 時區設定
CELERY_TIMEZONE='Asia/Shanghai'
# 匯入任務
CELERY_IMPORTS = (
    'celery_app.task1',
    'celery_app.task2',
    )
# 配置定時任務的排程器
CELERYBEAT_SCHEDULE={
    # 任務名字
    'task1':{
        # 任務啟動的函式
        'task':'celery_app.task1.add',
        # 定時時間設定,每10秒一次
        'schedule':timedelta(seconds=10),
        # 傳遞的引數
        'args':(2,8)
    },
    'task2':{
        'task':'celery_app.task2.mul',
        # 定時時間設定,16:45
        'schedule':crontab(hour=16,minute=45),
        'args':(4,5)

    }
}

  3、task1的任務:

import time

from celery_app import app

@app.task
def add(x, y):
    # 阻塞測試
    time.sleep(3)
    return x + y  

  4、 task2的任務:

import time

from celery_app import app

@app.task
def mul(x, y):
    time.sleep(3)
    return x * y

執行任務:

  1、開啟redis伺服器:

$ .\redis-server.exe .\redis.windows.conf

 

  2、在celery_app檔案的上一級 shift+右鍵 開啟命令列視窗,win10開啟powershell:

celery worker -A celery_app --pool=solo -l INFO

  此命令開啟worer 任務目標為celery_app windows下需要加--pool=solo 日誌級別為INFO

  3、然後開啟celery beat 啟動定時任務,另開一個命令列視窗:

 celery beat -A celery_app -l INFO

   4、結果如下:

  

  可以看見celery beat一直在隔10秒傳送任務

  再來看worker:

  

  第一次是處理了4秒 其餘是3秒,可以看出windows處理celery還是不太穩定。

  結語:celery是一個強大的庫,能讓我們處理非同步任務,不過最好還是於Linux上執行