celery (二) task調用
調用 TASK
基礎
task 的調用方式有三種:
- 類似普通函數的調用方式, 通過
__calling__
調用 ,類似function()
- 通過
apply_async()
調用,能接受較多的參數 - 通過
delay()
調用 ,是apply_async
方法的快捷方法,可接受的參數較少
task.delay(arg1, arg2, kwarg1=1, kwarg2=2)
等同於
task.apply_async(args=[arg1, arg2], kwargs={‘kwarg‘:1, ‘kwarg2‘:2})
鏈接任務
通過鏈接的方式,可以在一個任務執行完畢之後,執行另一個任務。
add.apply_async(args=(2,2),link=add.s(6))
當第一個task完成之後,task的結果會作為第二個函數參數的的一部分傳入第二個task。
上例第一個task結果為 4, 第二個task執行的是 \(4 + 6\) 。
如果第一個task失敗,那麽第一個task的 id 會被傳入到第二個task中
@app.task def error_handler(uuid): result = AsyncResult(uuid) exc = result.get(propagate=False) print(‘Task {0} raised exception: {1!r}\n{2!r}‘.format( uuid, exc, result.traceback))
add.apply_async(args=(2), link=error_handler.s())
當然,兩個是可以同時調用的
add.apply_async((2, 2), link=[add.s(16), error_handler21111.s()])
追蹤狀態
通過設置 on_message
回調函數,可以追蹤 task 的狀態變化
@app.task(bind=True) def hello(self, a, b): time.sleep(1) self.update_state(state="PROGRESS", meta={‘progress‘: 50}) time.sleep(1) self.update_state(state="PROGRESS", meta={‘progress‘: 90}) time.sleep(1) return ‘hello world: %i‘ % (a+b)
def on_raw_message(body):
print(body)
r = hello.apply_async()
print(r.get(on_message=on_raw_message, propagate=False))
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
‘result‘: {‘progress‘: 50},
‘children‘: [],
‘status‘: ‘PROGRESS‘,
‘traceback‘: None}
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
‘result‘: {‘progress‘: 90},
‘children‘: [],
‘status‘: ‘PROGRESS‘,
‘traceback‘: None}
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
‘result‘: ‘hello world: 10‘,
‘children‘: [],
‘status‘: ‘SUCCESS‘,
‘traceback‘: None}
hello world: 10
ETA 和countdown 延遲執行
ETA(預估到達時間)配置一個具體的時間,是一個時間對象,這個時間是相關task的最早的執行時間(也就是說,該任務實際執行時間,可能晚於該時間)。countdown是ETA的快捷方式,countdown 是相對(當前)時間,單位是 秒。它表示該任務會在多少秒之後執行。
>>> from datetime import datetime, timedelta
>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow) # 明天的當前時間執行
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get() # 3秒後執行
Expiration 任務保質期
通過配置 expiration 參數給task設置一個 過期時間,來保證task的時效性。當worker收到一個過期的任務之後,會標記該任務為 revoked(取消)狀態。expiration 既可以是相對時間(單位:秒),也可以是絕對時間(時間對象)
add.apply_async((10, 10), expires=60)
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
... expires=datetime.now() + timedelta(days=1)
重試機制
celery會在連接失敗的時候,自動嘗試重新發送task。一般收到一個task,都會有一條 收到task的log信息。
通過設置 retry=False
來禁用自動重試。當然也可以通過配置其他參數來,配置celery自動重試的策略。
max_retries 最大重試次數
默認為3,如果設置為 None,表示一直重試。如果超過重試次數依舊失敗,會引發一個導致重試失敗的異常。
interval_start 重試等待時間
在多久之後開始重試,默認為 0 ,即可以重試。但是為 秒
interval_step 延遲重試 步長
連續重試的時候,每次重試之後,其延遲時間都會加上該參數的值。 默認是 0.2 ,單位為 秒
interval_max 重試延遲最大等待時間
每次重試之間,最大等待時間。 默認是 0.2 , 單位為 秒
add.apply_async((2, 2), retry=True, retry_policy={
‘max_retries‘: 3,
‘interval_start‘: 0,
‘interval_step‘: 0.2,
‘interval_max‘: 0.2,
})
# 最大重試次數為 3,第一次會在失敗之後,立刻執行; 第二次會在第一次失敗之後,等待0+0.2s執行;第三次會會 0+0.2(兩次重試最大間隔為0.2,所以依舊是0.2,而不是0.4)。所以,三次重試一共耗時 0.2+0.2 = 0.4s
連接失敗,或者是無法建立連接的時候,celery會引發OperationalError
異常。但是如果配置了自動重試,那麽該異常只會在重試次數耗盡之後,依舊無法建立連接的時候,才引發。
>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)
>>> try:
... add.delay(2, 2)
... except add.OperationalError as exc:
... logger.exception(‘Sending task raised: %r‘, exc)
序列化
在celery的客戶端和worker之間發送消息的時候,需要對消息進行序列化。默認的序列化方式是 JSON
,可以通過在 setting
中配置 task_serializer
來更改默認的序列化方式,當然可以對每個task分別設置序列化方式。支持的序列化方式有:JSON
YAML
PICKLE
msgpck
壓縮
celery 同樣可以在傳送消息的時候,對其進行壓縮。壓縮方式有:gzip
和bzip2
。
有如下三種方式來配置壓縮屬性,按優先級分別為:
- compression 調用task時,配置該參數
- Task.compression 屬性。配置自定義 Task類的屬性
- task_compression 在配置文件中配置 該屬性
celery (二) task調用