1. 程式人生 > >Celery分散式任務佇列框架--基於flask實現

Celery分散式任務佇列框架--基於flask實現

  1. 使用Celery的方法
    Celery是分散式的任務佇列

    特點: 簡單、靈活、高可用

    1) 安裝Celery

    pip install celery

    2) 安裝 redis
    redis可以使用list結構,提供訊息佇列的功能

    3)建立Celery物件並指定broker代理路徑
    broker 格式: redis://[:[email protected]]host:port/db

    app = Celery('tasks',
                broker='redis://127.0.0.1:6379/10’)
    app = Celery('tasks',
backend='redis://:[email protected]:6379/7’, # 返回值存入資料庫 broker='redis://:[email protected]:6379/8') # :密碼@host/post/db @app.task def sendMsg(recievers,html): #注意 在celery的子任務中必須要嘗試獲取 with manage.app.test_request_context( ): msg = Message(subject='tpp使用者啟用-v1.0',
recipients=[recievers], sender='[email protected]') #需要使用html,如果使用text可能會報錯 msg.html = html ext.mail.send(msg) print('郵件傳送成功’) if __name__ == '__main__': print('--批量下訂單--') for i in range(20
): #向celery傳送任務,並獲取非同步結果物件 result:AsyncResult = goOrder.delay('XB99900888'+str(i)) #實時獲取結果(任務執行結果) #result.get(timeout=1,interval=0.5,callback=orderCallback) print(result.get_leaf( )) print('--下訂單已完成--')

4)建立功能函式(完成的時間不確定),並將函式交給Celery後臺執行緒

    @app.task
    def goOrder(orderId):
        # 下訂單的功能
        time.sleep(20)
        print('--下訂單成功--')

5) 執行經過@app.task裝飾的函式
格式: 函式名.delay(函式的引數)
goOrder.delay(100011) # 向celery傳送執行函式的訊號

6) 在window 會出現 ValueError: not enough values to unpack…
原因: Window系統 預設不支援Python的執行緒間的事件處理
解決: pip install eventlet
啟動: celery -A tasks worker –loglevel=info -P eventlet
例:

import time

from celery import Celery
from celery.bin import celery
from celery.result import AsyncResult
from flask_mail import Message

import manage
import ext

app = Celery('tasks',
             backend='redis://:[email protected]:6379/7',
             broker='redis://:[email protected]:6379/8')  #   :密碼@host/post/db

@app.task   #交給Celery佇列去呼叫
def goOrder(order_id):
    print('--goOrder--')
    time.sleep(5)
    print('完成{}的訂單'.format(order_id))

    return '{} 確認完成'.format(order_id)

def orderCallback(id,value):

    print(id,'----訂單完成----',value)

@app.task
def sendMsg(recievers,html):
    #注意 在celery的子任務中必須要嘗試獲取
    with manage.app.test_request_context():
        msg = Message(subject='使用者啟用-v1.0',
                      recipients=[recievers],
                      sender='[email protected]')
        #需要使用html,如果
        msg.html = html
        ext.mail.send(msg)
        print('郵件傳送成功')

if __name__ == '__main__':
    print('--批量下訂單--')
    for i in range(20):
        #向celery傳送任務,並獲取非同步結果物件
        result:AsyncResult = goOrder.delay('XB99900888'+str(i))
        #實時獲取結果(任務執行結果)
        #result.get(timeout=1,interval=0.5,callback=orderCallback)
        print(result.get_leaf())
    print('--下訂單已完成--')

if __name__ == '__main1__':
    celery.worker_main()