1. 程式人生 > >Win10下Celery4.2.1基於redis的部署與錯誤

Win10下Celery4.2.1基於redis的部署與錯誤

Celery是一個分散式非同步任務的神器,由Python開發但是其通訊協議可以支援其它語言。它還可以設定定時任務,設定多個任務佇列並路由任務到指定的佇列;同時還提供了執行時的一些監控和管理介面。

安裝

  • 安裝python3.7(官網下載直接安裝)
  • 安裝celery庫(pip install celery)
  • 安裝redis庫(pip install redis)

配置\啟動worker

安裝完成之後就可以配置celery的測試程式碼了。首先是clelry的執行配置檔案celeryconfig.py(當然你也可以從命令列傳入)

BROKER_URL = 'redis://pcma.xxx.com:6379/0'
CELERY_RESULT_BACKEND = 'redis://pcma.xxx.com:6379/0'

CELERY_REDIS_MAX_CONNECTIONS = 4
CELERYD_CONCURRENCY = 4
BROKER_TRANSPORT_OPTIONS = {'visibility_timeout': 5}    # 5min

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']  # Ignore other content
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True

接著建立celery的worker程式碼,celery_worker.py

from celery import Celery

app = Celery('celery_work')
app.config_from_object('celeryconfig')

@app.task
def add(x, y):
    return x + y

然後我們就可以啟動worker服務了,此後該worker會一直等待任務並執行。具體啟動命令如下:

celery -A celery_server worker --loglevel=info

這種方式預設是多程序啟動worker。如果你希望使用單程序啟動的話,命令如下:

celery -A celery_server worker --loglevel=info -P solo

另外,你還可以使用協程的方式啟動。當然首先你需要安裝eventlet。(pip install eventlet)

celery -A celery_server worker --loglevel=info -P eventlet

任務呼叫

worker啟動就緒之後,我們要做的事情就是新增要執行的任務。具體程式碼如下:

from celery_worker import add

result = add.delay(4, 4)

如果你希望能獲取到執行的結果,如下介面可以滿足你的需求:

result.ready()    # 獲取任務執行狀態
result.get(timeout=1, propagate=False)    # 獲取任務執行結果
result.traceback    # 獲取任務執行異常時的堆疊資訊

注意:如果你需要獲取任務的結果,那邊就需要配置CELERY_RESULT_BACKEND選項。否則celery不會儲存結果。

問題及解決

由於安裝的是celery4.2.1的版本。其中有2個bug需要針對性的解決。

  1. Python 3.7 syntax error: async is a reserved keyword #4849
  2. Unable to run tasks under Windows #4081

第1個是因為async在Python3.7已經是關鍵字了,但是celery版本沒有更新導致的。下一個釋出版本中會修復。沒有新版本之前,我們只要修改celery檔案中的async為另外的字元即可。比如我修改為了async_2。(注意定義和引用需要修改全套的)

第2個是因為windows下沒有fork多程序的模組,而celery預設啟動方式就是多程序的,並且還用了fork的方式。解決方式有多種:

  • 使用單程序啟動方式(上面已列出,另注意CELERYD_CONCURRENCY配置項取消)
  • 在建立celery例項之前,配置系統環境變數。os.environ.setdefault('FORKED_BY_MULTIPROCESSING', '1')
  • 使用協程的方式啟動(上面已列出)

問題描述

問題1:

  File "celery/backends/redis.py", line 22
    from . import async, base
                      ^
SyntaxError: invalid syntax

問題2:

[2017-06-08 15:31:49,416: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
File "c:\program files\python36\lib\site-packages\billiard\pool.py", line 359, in workloop
result = (True, prepare_result(fun(*args, **kwargs)))
File "c:\program files\python36\lib\site-packages\celery\app\trace.py", line 518, in _fast_trace_task
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)