1. 程式人生 > >Celery 筆記一(快速入門)

Celery 筆記一(快速入門)

在一個應用服務中,對於時效性要求沒那麼高的業務場景,我們沒必要等到所有任務執行完才返回結果,例如使用者註冊場景中,儲存了使用者賬號密碼之後,就可以立即返回,後續的賬號啟用郵件,可以用一種非同步的形式去處理,這種非同步操作可以用佇列服務來實現。否則,如果等到郵件傳送成功可能幾秒過去了。

Celery 是什麼?

Celery 是Python語言實現的分散式佇列服務,除了支援即時任務,還支援定時任務,Celery 有5個核心角色。

1、Task

任務(Task)就是你要做的事情,例如一個註冊流程裡面有很多工,給使用者發驗證郵件就是一個任務,這種耗時任務可以交給Celery去處理,還有一種任務是定時任務,比如每天定時統計網站的註冊人數,這個也可以交給Celery週期性的處理。

2、Broker

Broker 的中文意思是經紀人,指為市場上買賣雙方提供中介服務的人。在Celery中它介於生產者和消費者之間經紀人,這個角色相當於資料結構中的佇列。例如一個Web系統中,生產者是處理核心業務的Web程式,業務中可能會產生一些耗時的任務,比如簡訊,將任務傳送給 Broker,就是把這個任務暫時放到佇列中,等待消費者來處理。消費者是 Worker,是專門用於執行任務的後臺服務。Worker將實時監控佇列中是否有任務,如果有就拿出來進行處理。Celery本身不提供佇列服務,一般用Redis或者RabbitMQ來扮演Broker的角色

3、Worker

Worker 就是那個一直在後臺執行任務的人,也稱為任務的消費者,它會實時地監控佇列中有沒有任務,如果有就立即取出來執行。

4、Beat

Beat 是一個定時任務排程器,它會根據配置定時將任務傳送給 Broker,等待 Worker 來消費。

5、Backend

Backend 用於儲存任務的執行結果,每個任務都有返回值,比如傳送郵件的服務會告訴我們有沒有傳送成功,這個結果就是存在Backend中,當然我們並不總是要關心任務的執行結果。

584bbf78e1783.png

記住這5個角色後面理解Celery就輕鬆了。

快速入門

接觸任何新東西,沒有什麼比實際動手學得更快了。假設我們選擇Redis作為broker,你需要安裝redis並且已經啟動了redis服務(這個步驟請自行借用搜尋引擎解決)

0、安裝celery

pip install -U "celery[redis]"

1、建立Celery例項

# tasks.py
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

2、建立任務

假設這個傳送郵件的任務需要5秒鐘才能執行完

# tasks.py
@app.task
def send_mail(email):
    print("send mail to ", email)
    import time
    time.sleep(5)
    return "success"

在沒有Celery的情況下,程式順序執行,每個步驟都需要等上一步執行完成。又叫同步操作

1. 插入記錄到資料庫
2. 發郵件
3. 註冊成功

我們可以把2放在一個任務中交給celery去非同步執行,這樣我們就不需要等待發郵件完成,你只需要安排celery去處理幫我去完成就好了。程式碼就變成了

1. 插入記錄到資料庫
2. celery 幫我去發郵件
3. 註冊成功

第二步是非常快的,它只需要把任務放進佇列裡面去,並不會等任務真正執行完。這跟生活是完全貼切的,例如我們很多事情都不是自己親歷其為去做,而是將一個不太重要或即時性沒那麼高的事情轉交給別人處理,我可以繼續處理後面的事情。

3、啟動Worker

啟動Worker,監聽 Broker 中是否有任務,命令:celery worker,你可能需要指定引數

celery -A tasks worker --loglevel=info

-A: 指定 celery 例項在哪個模組中,例子中,celery例項在tasks.py檔案中,啟動成功後,能看到資訊

celeyrun.png

函式用app.task 裝飾器修飾之後,就會成為Celery中的一個Task。

4、呼叫任務

在主程式中呼叫任務,將任務傳送給 Broker, 而不是真正執行該任務

# user.py
from tasks import send_mail

def register():
    import time
    start = time.time()
    print("1. 插入記錄到資料庫")
    print("2. celery 幫我發郵件")
    send_mail.delay("[email protected]")
    print("3. 告訴使用者註冊成功")
    print("耗時:%s 秒 " % (time.time() - start))

if __name__ == '__main__':
    register()

在主程式中,呼叫函式的.delay方法

目錄結構:

── celery_test
   ├── tasks.py
   └── user.py

執行 python user.py, 啟動應用程式

1. 插入記錄到資料庫
2. celery 幫我發郵件
3. 告訴使用者註冊成功
耗時:0.22688984870910645 秒 

程式花了不到0.23秒就執行完成,如果按照正常的同步邏輯去執行,至少需要5秒鐘,因為發郵件的任務就花了5秒。

在worker服務視窗看日誌資訊

celerywork.png

注意:

1、celery worker 啟動時,如果是root使用者,需要設定環境變數:

$ export C_FORCE_ROOT='true'

2、 Celery4.x 開始不再支援Windows平臺,如果需要在Windows開發,請使用3.x的版本。

3、使用 RabbitMQ 或 Redis 作為 Broker,生產環境永遠不要使用關係資料庫

4、不要使用複雜物件作為任務函式的引數

# Good
@app.task
def my_task(user_id):
    user = User.objects.get(id=user_id)
    print(user.name)
    # ...
# Bad
@app.task
def my_task(user):
    print(user.name)
    # ...

小結

學習Celery,首先需要知道它的應用場景,然後是Celery中的常見角色,最後按照步驟感受一下Celery是如何跑起來的。

參考連結:

  • http://funhacks.net/2016/12/13/celery/
  • https://celery.readthedocs.io/en/latest/userguide/tasks.html#tips-and-best-practices
  • http://celerytaskschecklist.com/

關注公眾號「Python之禪」(id:vttalk)獲取最新文章 python之禪