1. 程式人生 > >分散式任務佇列--Celery的學習筆記

分散式任務佇列--Celery的學習筆記

一、Celery簡介

  Celery是一個簡單,靈活,可靠的分散式系統,用於處理大量訊息,同時為操作提供維護此類系統所需的工具。它是一個任務佇列,專注於實時處理,同時還支援任務排程。

  所謂任務佇列,是一個邏輯上的概念,可以將抽象中的任務傳送到指定的執行任務的元件,任務佇列可以跨執行緒或機器執行。

  Celery是基於Python開發的分散式非同步訊息任務佇列,通過它可以輕鬆的實現任務的非同步處理, 如果你的業務場景中需要用到非同步任務,就可以考慮使用celery。

 

二、Celery使用場景

  1.高併發的請求任務,比如需要傳送大量請求的網路爬蟲,就可以使用Celery來加速爬取。

  2.非同步任務,將耗時的操作交給Celery來完成,比如傳送/接收郵件、訊息推送等等。

  3.定時任務,需要定時執行的程式,比如每天定時執行爬蟲爬取資料。

 

三、Celery架構

  下圖是我找到的一張表示Celery架構的圖:

  

  任務生產者:產生任務並且把任務提交到任務佇列的就是任務生產者。

  任務排程Beat:Celery會根據配置檔案對任務進行調配,可以按一定時間間隔週期性地執行某些任務。

  中間人Broker:Celery使用訊息進行通訊,需要中間人在客戶端和Worker之間進行傳遞,接收客戶端傳送過來的任務,並將任務分配給Worker。

  在Celery的文件中,可以找到官方給出的實現Broker的工具有:

名稱 狀態 監控 遠端控制
RabbitMQ 穩定
Redis 穩定
Amazon SQS 穩定
Zookeeper 實驗性

  消費者Worker:Worker是執行任務的單元,在Celery任務佇列中屬於消費者。Worker會不斷地監聽佇列,一旦有任務新增進來,就會將任務取出來進行執行。Worker還可以執行在多臺機器上,只要它們都指向同一個Broker就可以。

  結果儲存Backend:結果儲存Backend,顧名思義就是將Worker執行後得到的結果儲存起來。Celery中有幾個內建的結果儲存可供選擇,包括SQLAlchemy / Django ORM、Redis、RabbitMQ、Mamcached等。

 

四、Celery安裝

     Celery4.0版本是支援Python2.7的最後一個版本,所以如果你還在用py2的話,可能要選擇安裝Celery3或者更早的版本。我本人用的Python版本是Python3.7,然後安裝的Celery版本是4.3。安裝的話使用pip安裝就好:

pip install celery

  如果pip安裝出錯的話,可以去這個網址進行下載。在使用pip安裝的時候會自動安裝一些相關依賴,如果這些依賴安裝出錯的話,搜一下相應版本的Wheel檔案下載安裝即可。

  中介軟體Broker我選擇使用的是Redis,這裡就不說Redis怎麼安裝了,上一篇部落格中有Ubuntu下安裝Redis的介紹。

 

五、Celery使用示例

 1.應用

   在使用Celery的時候,第一件事是要建立一個Celery例項,一般稱之為應用,簡稱為app。建立一個test.py,其中程式碼如下:

1 from celery import Celery
2 
3 
4 app = Celery("test", broker="redis://127.0.0.1:6379", backend="redis://127.0.0.1:6379")
5 
6 
7 @app.task
8 def add(x, y):
9     return x + y

2.執行Celery伺服器

  在建立好應用之後,就可以使用Celery命令執行程式執行Worker了:

celery -A test worker -l info

  執行後可以看到如下圖:  

  

  有關可用命令列選項的完整列表,執行如下命令:

celery worker --help

3.呼叫任務

  要呼叫任務,可以使用delay()方法。

  

  該任務會返回一個AsyncResult例項,可用於查詢任務狀態、獲取任務返回值等。此時檢視前面執行的伺服器,會看到有如下資訊:

Received task: test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9]

Task test.add[e7f01461-8c4d-4c29-ab6b-27be5084ecd9] succeeded in 0.006505205000166825s: 5

4.檢視結果

  在前面定義的時候,已經選擇使用Redis作為結果後端了,所以任務執行後的結果會儲存到Redis中。而且,在呼叫任務的時候,還可以進行如下操作:

  

  其中ready()方法會返回該任務是否已經執行,get()方法則會獲取任務返回的結果。

 5.配置檔案

  由於Celery的配置資訊比較多,因此一般會建立一個配置檔案來儲存這些配置資訊,通常會命名為celeryconfig.py。在test.py所在資料夾下新建配置檔案celeryconfig.py,其中的程式碼如下:

 1 # broker(訊息中介軟體來接收和傳送任務訊息)
 2 BROKER_URL = 'redis://127.0.0.1:6379'
 3 # backend(儲存worker執行的結果)
 4 CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379'
 5 
 6 # 設定時間參照,不設定預設使用的UTC時間
 7 CELERY_TIMEZONE = 'Asia/Shanghai'
 8 # 指定任務的序列化
 9 CELERY_TASK_SERIALIZER = 'json'
10 # 指定執行結果的序列化
11 CELERY_RESULT_SERIALIZER = 'json'

  然後修改下test.py中的程式碼:

 1 from celery import Celery
 2 
 3 
 4 app = Celery("test")
 5 app.config_from_object("celerystudy.celeryconfig")
 6 
 7 
 8 @app.task
 9 def add(x, y):
10     return x + y