1. 程式人生 > >cerely異步分布式

cerely異步分布式

edi process python環境 pass utf-8 code 測試 evel -s

1、釋義:

  Celery 是一個 基於python開發的分布式異步消息任務隊列,通過它可以輕松的實現任務的異步處理, 如果你的業務場景中需要用到異步任務,就可以考慮使用celery。

舉幾個實例場景中可用的例子:

  • 你想對100臺機器執行一條批量命令,可能會花很長時間 ,但你不想讓你的程序等著結果返回,而是給你返回 一個任務ID,你過一段時間只需要拿著這個任務id就可以拿到任務執行結果, 在任務執行ing進行時,你可以繼續做其它的事情。
  • 你想做一個定時任務,比如每天檢測一下你們所有客戶的資料,如果發現今天 是客戶的生日,就給他發個短信祝福

Celery 本身並不提供消息服務,在執行任務時需要通過一個消息中間件來接收和發送任務消息,以及存儲任務結果, 一般使用rabbitMQ or Redis

2、Celery的優點:

  • 簡單:一單熟悉了celery的工作流程後,配置和使用還是比較簡單的
  • 高可用:當任務執行失敗或執行過程中發生連接中斷,celery 會自動嘗試重新執行任務
  • 快速:一個單進程的celery每分鐘可處理上百萬個任務
  • 靈活: 幾乎celery的各個組件都可以被擴展及自定制

3、Celery基本工作流程圖

技術分享圖片

4、示例

這裏我們使用redis
連接url的格式為:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = ‘redis://localhost:6379/0‘

安裝celery和redis

  • pip install celery
  • pip install redis

使用celery包含三個方面:

  • 定義任務函數
  • 運行celery服務
  • 客戶應用程序的調用

先創建一個腳本 tasks.py

from celery import Celery        #導入了celery

broker = ‘redis://172.16.94.85:6379/1‘
backend = ‘redis://172.16.94.85:6379/2‘
app = Celery(‘tasks‘, broker=broker, backend=backend) #創建了celery實例app,實力話的過程中指定任務名tasks
(和文件名一致),傳入了broker和backend

@app.task #裝飾器
def add(x, y): #創建任務函數add
print("running...", x, y)
return x + y

在當前命令行終端運行(啟動worker,worker名要和腳本名一致):

celery -A tasks worker --loglevel=info

此時會看見一對輸出,包括註冊的任務

新建 test.py並執行:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time : 2018/5/26 8:17
# @Author : JWQ
# @File : demo1.py

from tasks import add #導入tasks模塊

re = add.delay(10, 20)
print(re.result) #獲取結果
print(re.ready) #是否處理
print(re.get(timeout=1)) #獲取結果
print(re.status) #是否處理

執行test.py後在celery行能看到相關的操作日誌:

[2018-05-25 11:31:28,373: WARNING/ForkPoolWorker-1] (‘running...‘, 4, 4)
[2018-05-25 11:31:28,394: INFO/ForkPoolWorker-1] Task tasks.add[30b145f9-14f7-4cd8-aa5e-7b6105c52325] succeeded in 0.0216804221272s: 8
[2018-05-25 11:31:58,991: INFO/MainProcess] Received task: tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762]
[2018-05-25 11:31:58,995: WARNING/ForkPoolWorker-1] (‘running...‘, 4, 4)
[2018-05-25 11:31:58,998: INFO/ForkPoolWorker-1] Task tasks.add[7f8207cb-d561-4567-8ae7-7c035af02762] succeeded in 0.00274921953678s: 8

打開 backend的redis,也可以看見celery執行的信息。

在python環境中調用的add函數,實際上是在應用程序中調用這個方法。需要註意,如果把返回值賦值給一個變量,那麽原來的應用程序也會被阻塞,需要等待異步任務返回的結果。因此,實際使用中,不需要把結果賦值。

5、Celery模塊調用

既然celery是一個分布式的任務調度模塊,那麽celery是如何和分布式掛鉤呢,celery可以支持多臺不通的計算機執行不同的任務或者相同的任務。

如果要說celery的分布式應用的話,我覺得就要提到celery的消息路由機制,就要提一下AMQP協議。具體的可以查看AMQP的文檔。簡單地說就是可以有多個消息隊列(Message Queue),不同的消息可以指定發送給不同的Message Queue,而這是通過Exchange來實現的。發送消息到Message Queue中時,可以指定routiing_key,Exchange通過routing_key來把消息路由(routes)到不同的Message Queue中去,如圖:

技術分享圖片

6、多worker,多隊列

先寫腳本task.py

[root@localhost celery]# cat tasks.py #!/usr/bin/env python
#-*- coding:utf-8 -*-
from celery import Celery

app = Celery()
app.config_from_object("celeryconfig")

@app.task
def taskA(x,y):
return x + y

@app.task
def taskB(x,y,z):
return x + y + z
上面的代碼中,首先定義了一個Celery的對象,然後通過celeryconfig.py對celery對象進行設置。之後又分別定義了三個task,分別是taskA, taskB和add。
接下來寫celeryconfig.py,需要註意代碼的縮進格式:
[root@localhost celery]# cat celeryconfig.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-

from kombu import Exchange,Queue

BROKER_URL = "redis://192.168.48.131:6379/1"
CELERY_RESULT_BACKEND = "redis://192.168.48.131:6379/2"

CELERY_QUEUES = (
Queue("default",Exchange("default"),routing_key="default"),
Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
)

CELERY_ROUTES = {
‘tasks.taskA‘:{"queue":"for_task_A","routing_key":"for_task_A"},
‘tasks.taskB‘:{"queue":"for_task_B","routing_key":"for_task_B"}
}
配置文件一般單獨寫在一個文件中
啟動一個worker來指定taskA
celery -A tasks worker -l info -n workerA.%h -Q for_task_A
celery -A tasks worker -l info -n workerB.%h -Q for_task_B
腳本測試:
from tasks import *
re1 = taskA.delay(100, 200)
print(re1.result)
re2 = taskB.delay(1, 2, 3)
print(re2.result)
re3 = add.delay(1, 2, 3)
print(re3.status) #PENDING
我們看到add的狀態是PENDING,表示沒有執行,這個是因為沒有celeryconfig.py文件中指定改route到哪一個Queue中,所以會被發動到默認的名字celery的Queue中,但是我們還沒有啟動worker執行celery中的任務。下面,我們來啟動一個worker來執行celery隊列中的任務。
celery -A tasks worker -l info -n worker.%h -Q celery
print(re3.status) #SUCCESS 7、Celery與定時任務 在celery中執行定時任務非常簡單,只需要設置celery對象中的CELERYBEAT_SCHEDULE屬性即可。 下面我們接著在celeryconfig.py中添加CELERYBEAT_SCHEDULE變量:
CELERY_TIMEZONE = ‘UTC‘
CELERYBEAT_SCHEDULE = {
‘taskA_schedule‘ : {
‘task‘:‘tasks.taskA‘,
‘schedule‘:20,
‘args‘:(5,6)
},
‘taskB_scheduler‘ : {
‘task‘:"tasks.taskB",
"schedule":200,
"args":(10,20,30)
},
‘add_schedule‘: {
"task":"tasks.add",
"schedule":10,
"args":(1,2)
}
註意格式,否則會有問題
Celery啟動定時任務:
celery –A tasks beat
技術分享圖片

Celery啟動定時任務:

這樣taskA每20秒執行一次taskA.delay(5, 6)
taskB每200秒執行一次taskB.delay(10, 20, 30)
Celery每10秒執行一次add.delay(1, 2)

cerely異步分布式