1. 程式人生 > >非同步任務神器 Celery 簡明筆記

非同步任務神器 Celery 簡明筆記

在程式的執行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程式的執行,我們經常會採用多執行緒或非同步任務。比如,在 Web 開發中,對新使用者的註冊,我們通常會給他發一封啟用郵件,而發郵件是個 IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時使用者只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的非同步任務,而主程式可以繼續往下執行。

Celery 是一個強大的分散式任務佇列,它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務(async task)和定時任務(crontab)。它的架構組成如下圖:

Celery_framework

可以看到,Celery 主要包含以下幾個模組:

  • 任務模組 Task包含非同步任務和定時任務。其中,非同步任務通常在業務邏輯中被觸發併發往任務佇列,而定時任務由 Celery Beat 程序週期性地將任務發往任務佇列
  • 訊息中介軟體 BrokerBroker,即為任務排程佇列,接收任務生產者發來的訊息(即任務),將任務存入佇列。Celery 本身不提供佇列服務,官方推薦使用 RabbitMQ 和 Redis 等。
  • 任務執行單元 WorkerWorker 是執行任務的處理單元,它實時監控訊息佇列,獲取佇列中排程的任務,並執行它
  • 任務結果儲存 BackendBackend 用於儲存任務的執行結果
    ,以供查詢。同訊息中介軟體一樣,儲存也可使用 RabbitMQ, Redis 和 MongoDB 等。

非同步任務

使用 Celery 實現非同步任務主要包含三個步驟:

  1. 建立一個 Celery 例項
  2. 啟動 Celery Worker
  3. 應用程式呼叫非同步任務

快速入門

為了簡單起見,對於 Broker 和 Backend,這裡都使用 redis。在執行下面的例子之前,請確保 redis 已正確安裝,並開啟 redis 服務,當然,celery 也是要安裝的。可以使用下面的命令來安裝 celery 及相關依賴:

Python
1 $pip install'celery[redis]'

建立 Celery 例項

將下面的程式碼儲存為檔案 tasks.py

Python
12345678910 # -*- coding: utf-8 -*-importtimefromcelery importCelerybroker='redis://127.0.0.1:6379'backend='redis://127.0.0.1:6379/0'app=Celery('my_task',broker=broker,backend=backend)@app.taskdefadd(x,y):time.sleep(5)# 模擬耗時操作returnx+y

上面的程式碼做了幾件事:

  • 建立了一個 Celery 例項 app,名稱為 my_task
  • 指定訊息中介軟體用 redis,URL 為 redis://127.0.0.1:6379
  • 指定儲存用 redis,URL 為 redis://127.0.0.1:6379/0
  • 建立了一個 Celery 任務 add,當函式被 @app.task 裝飾後,就成為可被 Celery 排程的任務;

啟動 Celery Worker

在當前目錄,使用如下方式啟動 Celery Worker:

Python
1 $celery worker-Atasks--loglevel=info

其中:

  • 引數 -A 指定了 Celery 例項的位置,本例是在 tasks.py 中,Celery 會自動在該檔案中尋找 Celery 物件例項,當然,我們也可以自己指定,在本例,使用 -A tasks.app
  • 引數 --loglevel 指定了日誌級別,預設為 warning,也可以使用 -l info 來表示;

在生產環境中,我們通常會使用 Supervisor 來控制 Celery Worker 程序。

啟動成功後,控制檯會顯示如下輸出:

celery

呼叫任務

現在,我們可以在應用程式中使用 delay()apply_async() 方法來呼叫任務。

在當前目錄開啟 Python 控制檯,輸入以下程式碼:

Python
123 >>>fromtasks importadd>>>add.delay(2,8)<AsyncResult:2272ddce-8be5-493f-b5ff-35a0d9fe600f>

在上面,我們從 tasks.py 檔案中匯入了 add 任務物件,然後使用 delay() 方法將任務傳送到訊息中介軟體(Broker),Celery Worker 程序監控到該任務後,就會進行執行。我們將視窗切換到 Worker 的啟動視窗,會看到多了兩條日誌:

Python
12 [2016-12-1012:00:50,376:INFO/MainProcess]Received task:tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f][2016-12-1012:00:55,385:INFO/PoolWorker-4]Task tasks.add[2272ddce-8be5-493f-b5ff-35a0d9fe600f]succeeded in5.00642602402s:10

這說明任務已經被排程並執行成功。

另外,我們如果想獲取執行後的結果,可以這樣做:

Python
123456789 >>>result=add.delay(2,6)>>>result.ready()# 使用 ready() 判斷任務是否執行完畢False>>>result.ready()False>>>result.ready()True>>>result.get()# 使用 get() 獲取任務結果8

在上面,我們是在 Python 的環境中呼叫任務。事實上,我們通常在應用程式中呼叫任務。比如,將下面的程式碼儲存為 client.py:

Python
12345 # -*- coding: utf-8 -*-fromtasks importadd# 非同步任務add.delay(2,8)print'hello world'

執行命令 $ python client.py,可以看到,雖然任務函式 add 需要等待 5 秒才返回執行結果,但由於它是一個非同步任務,不會阻塞當前的主程式,因此主程式會往下執行 print 語句,打印出結果。

使用配置

在上面的例子中,我們直接把 Broker 和 Backend 的配置寫在了程式當中,更好的做法是將配置項統一寫入到一個配置檔案中,通常我們將該檔案命名為 celeryconfig.py。Celery 的配置比較多,可以在官方文件查詢每個配置項的含義。

下面,我們再看一個例子。專案結構如下:

Python
1234567 celery_demo# 專案根目錄├──celery_app# 存放 celery 相關檔案│├──__init__.py│├──celeryconfig.py# 配置檔案│├──task1.py# 任務檔案 1│└──task2.py# 任務檔案 2└──client.py# 應用程式

__init__.py 程式碼如下:

Python
1234 # -*- coding: utf-8 -*-fromcelery importCeleryapp=Celery('demo')# 建立 Celery 例項app.config_from_object('celery_app.celeryconfig')# 通過 Celery 例項載入配置模組

celeryconfig.py 程式碼如下:

Python
12345678 BROKER_URL='redis://127.0.0.1:6379'# 指定 BrokerCELERY_RESULT_BACKEND='redis://127.0.0.1:6379/0'# 指定 BackendCELERY_TIMEZONE='Asia/Shanghai'# 指定時區,預設是 UTC# CELERY_TIMEZONE='UTC'                             CELERY_IMPORTS=(# 指定匯入的任務模組'celery_app.task1','celery_app.task2')

task1.py 程式碼如下:

Python
123456 importtimefromcelery_app importapp@app.taskdefadd(x,y):time.sleep(2)returnx+y

task2.py 程式碼如下:

Python
123456 importtimefromcelery_app importapp@app.taskdefmultiply(x,y):time.sleep(2)returnx*y

client.py 程式碼如下:

Python
123456 # -*- coding: utf-8 -*-fromcelery_app importtask1fromcelery_app importtask2task1.add.apply_async(args=[2,8])# 也可用 task1.add.delay(2, 8)task2.multiply.apply_async(args=[3,7])# 也可用 task2.multiply.delay(3, 7)print'hello world'

現在,讓我們啟動 Celery Worker 程序,在專案的根目錄下執行下面命令:

Python
1 celery_demo$celery-Acelery_app worker--loglevel=info

接著,執行 $ python client.py,它會發送兩個非同步任務到 Broker,在 Worker 的視窗我們可以看到如下輸出:

Python
1234 [2016-12-1013:51:58,939:INFO/MainProcess]Received task:celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa][2016-12-1013:51:58,941:INFO/MainProcess]Received task:celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a][2016-12-1013:52:00,948:INFO/PoolWorker-3]Task celery_app.task1.add[9ccffad0-aca4-4875-84ce-0ccfce5a83aa]succeeded in2.00600231002s:10[2016-12-1013:52:00,949:INFO/PoolWorker-4]Task celery_app.task2.multiply[64b1f889-c892-4333-bd1d-ac667e677a8a]succeeded in2.00601326401s:21

delay 和 apply_async

在前面的例子中,我們使用 delay()apply_async() 方法來呼叫任務。事實上,delay 方法封裝了 apply_async,如下:

Python
123 defdelay(self,*partial_args,**partial_kwargs):"""Shortcut to :meth:`apply_async` using star arguments."""returnself.apply_async(partial_args,partial_kwargs)

也就是說,delay 是使用 apply_async 的快捷方式。apply_async 支援更多的引數,它的一般形式如下: