1. 程式人生 > >celery (二) task調用

celery (二) task調用

ogre 壓縮 als rop 多少 lse 自定義 delta 優先級

調用 TASK

基礎

task 的調用方式有三種:

  • 類似普通函數的調用方式, 通過 __calling__ 調用 ,類似 function()
  • 通過 apply_async() 調用,能接受較多的參數
  • 通過 delay() 調用 ,是apply_async 方法的快捷方法,可接受的參數較少
task.delay(arg1, arg2, kwarg1=1, kwarg2=2)
等同於
task.apply_async(args=[arg1, arg2], kwargs={‘kwarg‘:1, ‘kwarg2‘:2})

鏈接任務

通過鏈接的方式,可以在一個任務執行完畢之後,執行另一個任務。

add.apply_async(args=(2,2),link=add.s(6))

當第一個task完成之後,task的結果會作為第二個函數參數的的一部分傳入第二個task。

上例第一個task結果為 4, 第二個task執行的是 \(4 + 6\)

如果第一個task失敗,那麽第一個task的 id 會被傳入到第二個task中

@app.task
def error_handler(uuid):
    result = AsyncResult(uuid)
    exc = result.get(propagate=False)
    print(‘Task {0} raised exception: {1!r}\n{2!r}‘.format(
          uuid, exc, result.traceback))
add.apply_async(args=(2), link=error_handler.s())

當然,兩個是可以同時調用的

add.apply_async((2, 2), link=[add.s(16), error_handler21111.s()])

追蹤狀態

通過設置 on_message 回調函數,可以追蹤 task 的狀態變化

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={‘progress‘: 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={‘progress‘: 90})
    time.sleep(1)
    return ‘hello world: %i‘ % (a+b)
def on_raw_message(body):
    print(body)

r = hello.apply_async()
print(r.get(on_message=on_raw_message, propagate=False))
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
 ‘result‘: {‘progress‘: 50},
 ‘children‘: [],
 ‘status‘: ‘PROGRESS‘,
 ‘traceback‘: None}
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
 ‘result‘: {‘progress‘: 90},
 ‘children‘: [],
 ‘status‘: ‘PROGRESS‘,
 ‘traceback‘: None}
{‘task_id‘: ‘5660d3a3-92b8-40df-8ccc-33a5d1d680d7‘,
 ‘result‘: ‘hello world: 10‘,
 ‘children‘: [],
 ‘status‘: ‘SUCCESS‘,
 ‘traceback‘: None}
hello world: 10

ETA 和countdown 延遲執行

ETA(預估到達時間)配置一個具體的時間,是一個時間對象,這個時間是相關task的最早的執行時間(也就是說,該任務實際執行時間,可能晚於該時間)。countdown是ETA的快捷方式,countdown 是相對(當前)時間,單位是 秒。它表示該任務會在多少秒之後執行。

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow) # 明天的當前時間執行
>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()  # 3秒後執行

Expiration 任務保質期

通過配置 expiration 參數給task設置一個 過期時間,來保證task的時效性。當worker收到一個過期的任務之後,會標記該任務為 revoked(取消)狀態。expiration 既可以是相對時間(單位:秒),也可以是絕對時間(時間對象)

add.apply_async((10, 10), expires=60)
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

重試機制

celery會在連接失敗的時候,自動嘗試重新發送task。一般收到一個task,都會有一條 收到task的log信息。

通過設置 retry=False 來禁用自動重試。當然也可以通過配置其他參數來,配置celery自動重試的策略。

max_retries 最大重試次數

默認為3,如果設置為 None,表示一直重試。如果超過重試次數依舊失敗,會引發一個導致重試失敗的異常。

interval_start 重試等待時間

在多久之後開始重試,默認為 0 ,即可以重試。但是為 秒

interval_step 延遲重試 步長

連續重試的時候,每次重試之後,其延遲時間都會加上該參數的值。 默認是 0.2 ,單位為 秒

interval_max 重試延遲最大等待時間

每次重試之間,最大等待時間。 默認是 0.2 , 單位為 秒

add.apply_async((2, 2), retry=True, retry_policy={
    ‘max_retries‘: 3,
    ‘interval_start‘: 0,
    ‘interval_step‘: 0.2,
    ‘interval_max‘: 0.2,
})
# 最大重試次數為 3,第一次會在失敗之後,立刻執行; 第二次會在第一次失敗之後,等待0+0.2s執行;第三次會會 0+0.2(兩次重試最大間隔為0.2,所以依舊是0.2,而不是0.4)。所以,三次重試一共耗時 0.2+0.2 = 0.4s

連接失敗,或者是無法建立連接的時候,celery會引發OperationalError 異常。但是如果配置了自動重試,那麽該異常只會在重試次數耗盡之後,依舊無法建立連接的時候,才引發。

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception(‘Sending task raised: %r‘, exc)

序列化

在celery的客戶端和worker之間發送消息的時候,需要對消息進行序列化。默認的序列化方式是 JSON,可以通過在 setting 中配置 task_serializer 來更改默認的序列化方式,當然可以對每個task分別設置序列化方式。支持的序列化方式有:JSON YAML PICKLE msgpck

壓縮

celery 同樣可以在傳送消息的時候,對其進行壓縮。壓縮方式有:gzipbzip2

有如下三種方式來配置壓縮屬性,按優先級分別為:

  • compression 調用task時,配置該參數
  • Task.compression 屬性。配置自定義 Task類的屬性
  • task_compression 在配置文件中配置 該屬性

celery (二) task調用