1. 程式人生 > >Celery學習---Celery 分布式隊列介紹及安裝

Celery學習---Celery 分布式隊列介紹及安裝

amqp ron https 失敗 成功 efault 理解 ice 添加

Celery介紹和基本使用

Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery, 舉幾個實例場景中可用的例子:

1. 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。

2. 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

【簡單講:異步 + 定時】

Celery有以下優點

1. 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的

2. 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務

3. 快速:一個單進程的celery每分鐘可處理上百萬個任務

4. 靈活: 幾乎celery的各個組件都可以被擴展及自定制

Celery 在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis

技術分享圖片

Celery是一個上層任務,當有用戶請求到來的時候,發送一個任務給celery,這個任務會被中間件Redis/RabbitMQ接收發送給Celery的節點去執行任務,有一個好處就是可以橫向的擴展機器去執行任務。

Celery安裝使用

Celery安裝使用

Celery的默認broker[可理解為中間件Redis/RabbitMQ]是RabbitMQ, 僅需配置一行就可以

	broker_url = ‘amqp://guest:guest@localhost:5672//‘

rabbitMQ 沒裝的話請裝一下,安裝看這裏 http://docs.celeryproject.org/en/latest/getting-started/brokers/rabbitmq.html#id3

使用Redis做broker也可以

安裝redis組件

pip install -U "celery[redis]"

配置

Configuration is easy, just configure the location of your Redis database:
app.conf.broker_url = ‘redis://localhost:6379/0‘
Where the URL is in the format of:【如果中間件有認證操作】
redis://:password@hostname:port/db_number
all fields after the scheme are optional, and will default to localhost on port 6379, using database 0.   如果想獲取每個任務的執行結果,還需要配置一下把任務結果存在哪
If you also want to store the state and return values of tasks in Redis, you should configure these settings:
app.conf.result_backend = ‘redis://localhost:6379/0‘

Celery的基本操作+異步使用  

Win7下安裝celery模塊

pip3 install celery

技術分享圖片

技術分享圖片

[測試發現celery安裝完成後Win7的cmd可以用celery,不需要添加]
如果需要手動添加,則找到celery的安裝路徑,寫入Win7的path裏即可。
E:\PyCharm 2017.2.4\Python3.2.5\Lib\site-packages\celery\bin\

Ubuntu下安裝

首先安裝pip3

sudo apt-get install python3-pip  

技術分享圖片

安裝Celery

pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple celery

技術分享圖片

Ubuntu下安裝成功

技術分享圖片

Ubuntu下運行報錯,缺少Redis

運行Celery任務後報錯

ImportError: Missing redis library (pip install redis)

技術分享圖片

安裝Redis連接模塊:

pip3 install redis   

技術分享圖片

啟動Celery Worker來開始監聽並執行任務

celery -A Celery的Py文件名 worker --loglevel=info

後臺Celery任務的運行

操作前期條件: 安裝並啟動Redis

myCelery.py

from celery import Celery
# 定義了一個Celery的App
app = Celery(‘tasks‘,
             broker=‘redis://192.168.2.105‘,   # Celery和用戶請求的中間代理
             backend=‘redis://192.168.2.105‘)  # 接收Celery返回結果的

@app.task       # 函數變成一個Celery的任務,調用celery實現異步任務
def add(x, y):
    print("running...", x, y)
    return x + y

運行:

celery -A myCelery worker --loglevel=info 【寫文件名稱即可】

運行結果[Win]

技術分享圖片

運行結果[Linux]

技術分享圖片

註: celery文件運行起來後只能接收和執行任務[等待任務狀態...],還需要用戶發送任務

前臺Celery任務的發布

Linux下發布任務

進入文件所在的路徑下

omc@omc-virtual-machine:~/Celery$ cd /home/omc/Celery

進入Python環境

omc@omc-virtual-machine:~/Celery$ python

技術分享圖片

在Python環境下導入文件的函數

Python 3.5.2 (default, Nov 23 2017, 16:37:01) 
[GCC 5.4.0 20160609] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from myCelery import add
>>> add.delay(100,100)
<AsyncResult: 22631adf-e54f-4718-99a1-1e76b5c1575f>
>>> add(100,300)      
running... 100 300
400

技術分享圖片

註:中間件Redis只負責存儲了中間的消息

打開Redis查看存儲的消息內容

omc@omc-virtual-machine:~$ redis-cli

技術分享圖片

127.0.0.1:6379> get celery-task-meta-ebad12b8-d4ff-461b-b6e5-0dab4eaba855

技術分享圖片

技術分享圖片

>>> r = add.delay(20,20000)
>>> r.

技術分享圖片

>>> r.get()
20020

技術分享圖片

返回對象的其他方法的使用:

>>> r = cmd_Celery.my_cmd.delay("df -h")
The ready() method returns whether the task has finished processing or not:
>>> r.ready()
False

You can wait for the r to complete, but this is rarely used since it turns the asynchronous call into a synchronous one:
>>> r.get(timeout=1)  # 設置超時時間
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:
>>> r.get(propagate=False)   #  propagate 擴展,添加propagate後會對報錯進行格式化輸出

If the task raised an exception you can also gain access to the original traceback:
>>> r.traceback     # 報錯調試用
…

當Celery有多個worker的時候[默認是隨機分配的]:

前臺發布:

技術分享圖片

後臺有2個Celery的worker

work1:

技術分享圖片

Work2:

技術分享圖片

Celery異步執行命令

操作前期條件: 安裝並啟動Redis

cmd_Celery.py

from celery import Celery
# 定義了一個Celery的App
app = Celery(‘tasks‘,
             broker=‘redis://192.168.2.105‘,
             # redis://:password@hostname:port/db_number  有密碼認證的連接
             # broker=‘redis://:密碼@192.168.2.105:6379/0‘,
             backend=‘redis://192.168.2.105‘)  # 接收Celery返回結果的

# 函數變成一個Celery的任務,調用celery實現異步任務
import subprocess
import time
@app.task
def my_cmd(cmd):
    print(‘Celery of CMD:‘, cmd)
    time.sleep(5)   # 判斷是否是異步的標誌,會卡5秒後執行cmd的命令
   cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    return cmd_obj.stdout.read().decode("utf-8")  # 返回結果必須是可JSON的,需要編碼

後臺啟動Celery任務:

omc@omc-virtual-machine:~/Celery$ celery -A cmd_Celery worker --loglevel=debug 

技術分享圖片

前臺客戶端發送命令:

>>> python3
>>> import cmd_Celery
>>> r = cmd_Celery.my_cmd.delay("df -h")
>>> r.get()

技術分享圖片

後臺Celery服務器端執行任務:

技術分享圖片

如果返回結果不是可JSON會報錯:

默認返回的是byte類型的,需要進行解碼

技術分享圖片

後臺Celery的終止

2次Ctrl+C

技術分享圖片

後臺啟動/停止多個Celery的worker

前臺啟動命令: celery -A 項目名worker -loglevel=info 

後臺啟動命令: celery multi start w1 -A 項目名 -l info 

後臺重啟命令: celery multi start w1 -A 項目名 -l info 

後臺停止命令: celery multi stop w1 -A 項目名 -l info 

前後臺的區別: 後臺是mult啟動

Celery學習---Celery 分布式隊列介紹及安裝