1. 程式人生 > >[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (2)

[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (2)

# [原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (2) [toc] ## 0x00 摘要 Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。Celery 是呼叫其Worker 元件來完成具體任務處理。 前文講了 Celery 啟動過程的前半部分,本文繼續後半部分的分析。 本文相關其他連結如下: [[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html) [[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html) [[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html) [[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html) [[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html) [[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html) 以及 [原始碼解析 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562308.html) [[原始碼解析] 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562310.html) [[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)](https://www.cnblogs.com/rossiXYZ/p/14563763.html) ## 0x01 前文回顧 前文提到了,我們經過一系列過程,正式來到了 Worker 的邏輯。我們在本文將接下來繼續看後續 work as a program 的啟動過程。 ```python +----------------------+ +----------+ | @cached_property | | User | | Worker | +----+-----+ +---> | | | | | | | worker_main | | Worker application | | | | celery/app/base.py | v | +----------------------+ +---------+------------+ | | Celery | | | | | | Celery application | | | celery/app/base.py | | | | | +---------+------------+ | | | | celery.main | | | v | +---------+------------+ | | @click.pass_context | | | celery | | | | | | | | | CeleryCommand | | | celery/bin/celery.py | | | | | +---------+------------+ | | | | | | | v | +----------+------------+ | | @click.pass_context | | | worker | | | | | | | | | WorkerCommand | | | celery/bin/worker.py | | +-----------+-----------+ | | | +-----------------+ ``` 為了便於大家理解,我們先給出最終的流程圖如下: ![](https://img2020.cnblogs.com/blog/1850883/202103/1850883-20210321200854832-1138498285.png) ## 0x2 Worker as a program 這裡的 worker 其實就是 業務主體,值得大書特書。 程式碼來到了celery/apps/worker.py。 ```python class Worker(WorkController): """Worker as a program.""" ``` 例項化的過程呼叫到了WorkController基類的init。 初始化基本就是: - loader 載入各種配置; - setup_defaults做預設設定; - setup_instance 就是正式建立,包括配置存放訊息的queue。 - 通過Blueprint來建立 Worker 內部的各個子模組。 程式碼位於celery/apps/worker.py。 ```python class WorkController: """Unmanaged worker instance.""" app = None pidlock = None blueprint = None pool = None semaphore = None #: contains the exit code if a :exc:`SystemExit` event is handled. exitcode = None class Blueprint(bootsteps.Blueprint): """Worker bootstep blueprint.""" name = 'Worker' default_steps = { 'celery.worker.components:Hub', 'celery.worker.components:Pool', 'celery.worker.components:Beat', 'celery.worker.components:Timer', 'celery.worker.components:StateDB', 'celery.worker.components:Consumer', 'celery.worker.autoscale:WorkerComponent', } def __init__(self, app=None, hostname=None, **kwargs): self.app = app or self.app # 設定app屬性 self.hostname = default_nodename(hostname) # 生成node的hostname self.startup_time = datetime.utcnow() self.app.loader.init_worker() # 呼叫app.loader的init_worker()方法 self.on_before_init(**kwargs) # 呼叫該初始化方法 self.setup_defaults(**kwargs) # 設定預設值 self.on_after_init(**kwargs) self.setup_instance(**self.prepare_args(**kwargs)) # 建立例項 ``` 此時會呼叫app.loader的init_worker方法, ### 2.1 loader 此處的app.loader,是在Celery初始化的時候設定的loader屬性,該值預設是celery.loaders.app:AppLoader。其作用就是匯入各種配置
。 其位於celery/loaders/base.py,定義如下: ```python @cached_property def loader(self): """Current loader instance.""" return get_loader_cls(self.loader_cls)(app=self) ``` get_loader_cls如下: ```python def get_loader_cls(loader): """Get loader class by name/alias.""" return symbol_by_name(loader, LOADER_ALIASES, imp=import_from_cwd) ``` 此時的loader例項就是AppLoader,然後呼叫該類的init_worker方法, ```python def init_worker(self): if not self.worker_initialized: # 如果該類沒有被設定過 self.worker_initialized = True # 設定成設定過 self.import_default_modules() # 匯入預設的modules self.on_worker_init() ``` import_default_modules如下,主要就是匯入在app配置檔案中需要匯入的modules, ```python def import_default_modules(self): responses = signals.import_modules.send(sender=self.app) # Prior to this point loggers are not yet set up properly, need to # check responses manually and reraised exceptions if any, otherwise # they'll be silenced, making it incredibly difficult to debug. for _, response in responses: # 匯入專案中需要匯入的modules if isinstance(response, Exception): raise response return [self.import_task_module(m) for m in self.default_modules] ``` ### 2.2 setup_defaults in worker 繼續分析Worker類初始化過程中的self.setup_defaults方法,給執行中需要設定的引數設定值
, 這之後,self.pool_cls的數