1. 程式人生 > >Celery分布式隊列學習

Celery分布式隊列學習

dba morn tom name 調度器 ber utf sub body

1. celery介紹和使用

Celery 是一個 基於python開發的分布式異步消息任務隊列(可以簡單理解為python多進程或多線程中的queue),通過它可以輕松的實現任務的異步處理。celery的架構由三部分組成:消息中間件(message broker),任務執行單元(worker)和任務執行結果儲存(task result store)。各個職責如下:

消息中間件

Celery本身不提供消息服務,但是可以方便的和第三方提供的消息中間件集成,包括RabbitMQ,Redis,MongoDB等。

任務執行單元

Worker是Celery提供的任務執行的單元,worker並發的運行在分布式的系統節點中

任務結果存儲

Task result store用來存儲Worker執行的任務的結果,Celery支持以不同方式存儲任務的結果,包括Redis,MongoDB,Django ORM,AMQP等

         技術分享圖片

                技術分享圖片

安裝celery和redis組件 (用redis做broker)

sudo pip install -U "celery[redis]"

sudo pip install celery

測試celery

  新建一個python文件tasks.py

#/usr/bin/python

from celery import Celery

app 
= Celery(tasks,broker=redis://127.0.0.1:6379/0,backend=redis://127.0.0.1:6379/0) # ‘tasks‘為module名稱,broker為消息中間件,backend為任務結果存儲地方(backend可以不設置,默認為disabledbackend) #127.0.0.1:6379/0 為redis server安裝的ip地址和端口,0為數據庫編號(redis有0-16個數據庫) @app.task def add(x, y): return x+y

  啟動celery worker來監聽任務隊列:

    celery -A tasks worker --loglevel=info #tasks為上面module文件的名稱

  調用任務

    再開一個終端,進入python環境

    from tasks import add

    add.delay(4,4)

  celery worker端執行結果如下,說明celery可以正常運行了

技術分享圖片

  若設置了backend,可以查看任務執行結果

    from tasks import add

    result = add.delay(4,4)

    #result.ready()      #查看worker是否執行完任務

    result.get(timeout=10)  # 返回執行結果,若任務還未執行完成,等待10s,否則拋出異常

    #result.get(propagate=False)

      技術分享圖片

運行過程中出現的幾個錯誤及解決方式:

  bash:celery command not found:查看/usr/bin的確沒有celery,最後發現celery安裝在/usr/local/python-3.5.2/bin/目錄下,而$PATH中不包括該目錄,所以shell找不到celery命令,建立一個軟連接就好了:sudo ln -s /usr/local/python-3.5.2/bin/celery /usr/bin/celery

  AttributeError "‘float‘ object has no attribute ‘items‘" :參考https://github.com/celery/celery/issues/5175發現原因,由於celery和redis 3.0版本兼容問題,需要回滾redis版本:pip install redis==2.10.6 (查看redis版本:pip show redis;)

2. celery在項目中的使用

建立一個python project,如下圖所示

技術分享圖片

celery.py

#/usr/bin/python
#coding:utf-8

from __future__ import absolute_import
from celery import Celery
# absolute_import 保證從絕對路徑引入celery,防止倒入時目錄下celery.py文件覆蓋celery庫文件

app = Celery(proj, broker=redis://localhost,
        backend=redis://localhost, include=[proj.tasks])
#將proj目錄下tasks加進來

 app.config_from_object(proj.config)
#使用config.py作為配置文件,也可以直接在這裏寫相關配置信息

if __name__==__main__:
    app.start()

tasks.py

#/usr/bin/python
#coding:utf-8

from __future__ import absolute_import
from proj.celery import app

@app.task
def add(x,y):
    print(running add..)
    return x+y

@app.task
def mul(x,y):
    print(running mul..)
    return x*y

@app.task
def sub(x,y):
    print(running sub..)
    return x-y

config.py

#/usr/bin/python
#coding:utf-8
from __future__ import absolute_import

#BROKER_URL=‘redis://localhost:6379/0‘
#CELERY_RESULT_BACKEND=‘redis://localhost:6379/0‘

在proj 所在路徑下: celery -A proj worker -l info

另外打開終端:    from proj.tasks import add, mul, sub

           add.delay(6,3)

      技術分享圖片

後臺啟動多個worker :http://docs.celeryproject.org/en/latest/userguide/daemonizing.html#daemonizing

3,celery定時任務

  http://docs.celeryproject.org/en/latest/userguide/periodic-tasks.html

  利用celery beat任務模塊可以完成定時任務,設定好任務的執行時間,celery就會定時自動幫你執行,還可以結合celery.schedules中的crontab模塊來設置時間(類似於linux的crontab).新建periodic_task.py文件,代碼如下:

#/usr/bin/python
#coding:utf-8
from celery import Celery
from celery.schedules import crontab

app = Celery(broker=‘redis://localhost‘)  #默認會采用broker_url = ‘amqp://guest:guest@localhost:5672//‘,若未安裝rabbitmq會報錯

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls test(‘hello‘) every 10 seconds.
    sender.add_periodic_task(10.0, test.s(hello), name=add every 10)

    # Calls test(‘world‘) every 30 seconds
    sender.add_periodic_task(30.0, test.s(world), expires=10)

    # Executes every Monday morning at 7:30 a.m.
    sender.add_periodic_task(
        crontab(hour=7, minute=30, day_of_week=1),
        test.s(Happy Mondays!),
    )

@app.task
def test(arg):
    print(arg)

  上面是通過函數add_periodic_task()添加定時任務,也可以通過配置文件方式添加任務:

app.conf.beat_schedule = {
    add-every-30-seconds: {
        task: tasks.add,
        schedule: 30.0,
        args: (16, 16)
    },
}
app.conf.timezone = UTC   #‘Asia/Shanghai‘

  相關參數含義:

    task: The name of the task to execute. 
    schedule:The frequency of execution.
        This can be the number of seconds as an integer, a timedelta, or a crontab. You can also define your own custom schedule types, by extending the interface of schedule.
    args: Positional arguments (list or tuple).
    kwargs: Keyword arguments (dict).
    options:Execution options (dict).
        This can be any argument supported by apply_async() – exchange, routing_key, expires, and so on.
    relative: If relative is true timedelta schedules are scheduled “by the clock.” This means the frequency is rounded to the nearest second, minute, hour or day depending on the period of the timedelta.
              By default relative is false, the frequency isn’t rounded and will be relative to the time when celery beat was started

  啟動任務調度器:celery beat

    celery -A periodic_task beat

    技術分享圖片

    (beat需要存儲任務的上一次執行時間到本地(默認celerybeat-schedule),所以需要有對當前文件路徑的寫權限,也可以自定義存儲目錄:如下命令)

     celery -A proj beat -s /home/celery/var/run/celerybeat-schedule

  啟動worker來執行任務:(另外打開終端)

    celery -A periodic_task worker

    技術分享圖片

  利用corntab設置更復雜的定時時間,類似於linux中的corntab命令,linux crontab http://www.cnblogs.com/peida/archive/2013/01/08/2850483.html

  如下面的每周一早上7:30

from celery.schedules import crontab

app.conf.beat_schedule = {
    # Executes every Monday morning at 7:30 a.m.
    add-every-monday-morning: {
        task: tasks.add,
        schedule: crontab(hour=7, minute=30, day_of_week=1),
        args: (16, 16),
    },
}

  更多corntab示例:

Example Meaning
crontab() Execute every minute.
crontab(minute=0, hour=0) Execute daily at midnight.
crontab(minute=0, hour=‘*/3‘) Execute every three hours: midnight, 3am, 6am, 9am, noon, 3pm, 6pm, 9pm.
crontab(minute=0,
hour=‘0,3,6,9,12,15,18,21‘)
Same as previous.
crontab(minute=‘*/15‘) Execute every 15 minutes.
crontab(day_of_week=‘sunday‘) Execute every minute (!) at Sundays.
crontab(minute=‘*‘,
hour=‘*‘,day_of_week=‘sun‘)
Same as previous.
crontab(minute=‘*/10‘,
hour=‘3,17,22‘,day_of_week=‘thu,fri‘)
Execute every ten minutes, but only between 3-4 am, 5-6 pm, and 10-11 pm on Thursdays or Fridays.
crontab(minute=0,hour=‘*/2,*/3‘) Execute every even hour, and every hour divisible by three. This means: at every hour except: 1am, 5am, 7am, 11am, 1pm, 5pm, 7pm, 11pm
crontab(minute=0, hour=‘*/5‘) Execute hour divisible by 5. This means that it is triggered at 3pm, not 5pm (since 3pm equals the 24-hour clock value of “15”, which is divisible by 5).
crontab(minute=0, hour=‘*/3,8-17‘) Execute every hour divisible by 3, and every hour during office hours (8am-5pm).
crontab(0, 0,day_of_month=‘2‘) Execute on the second day of every month.
crontab(0, 0,
day_of_month=‘2-30/3‘)
Execute on every even numbered day.
crontab(0, 0,
day_of_month=‘1-7,15-21‘)
Execute on the first and third weeks of the month.
crontab(0, 0,day_of_month=‘11‘,
month_of_year=‘5‘)
Execute on the eleventh of May every year.
crontab(0, 0,
month_of_year=‘*/3‘)
Execute on the first month of every quarter.

官方文檔:http://docs.celeryproject.org/en/latest/

參考博客:https://www.cnblogs.com/alex3714/p/6351797.html

     https://www.cnblogs.com/forward-wang/p/5970806.html

Celery分布式隊列學習