1. 程式人生 > >[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)

[原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1)

# [原始碼解析] 並行分散式框架 Celery 之 worker 啟動 (1) [toc] ## 0x00 摘要 Celery是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的非同步任務佇列,同時也支援任務排程。Celery 是呼叫其Worker 元件來完成具體任務處理。 ```shell $ celery --app=proj worker -l INFO $ celery -A proj worker -l INFO -Q hipri,lopri $ celery -A proj worker --concurrency=4 $ celery -A proj worker --concurrency=1000 -P eventlet $ celery worker --autoscale=10,0 ``` 所以我們本文就來講解 worker 的啟動過程。 ## 0x01 Celery的架構 前面我們用幾篇文章分析了 Kombu,為 Celery 的分析打下了基礎。 [[原始碼分析\] 訊息佇列 Kombu 之 mailbox](https://www.cnblogs.com/rossiXYZ/p/14455431.html) [[原始碼分析\] 訊息佇列 Kombu 之 Hub](https://www.cnblogs.com/rossiXYZ/p/14455294.html) [[原始碼分析\] 訊息佇列 Kombu 之 Consumer](https://www.cnblogs.com/rossiXYZ/p/14455093.html) [[原始碼分析\] 訊息佇列 Kombu 之 Producer](https://www.cnblogs.com/rossiXYZ/p/14455186.html) [[原始碼分析\] 訊息佇列 Kombu 之 啟動過程](https://www.cnblogs.com/rossiXYZ/p/14454934.html) [[原始碼解析\] 訊息佇列 Kombu 之 基本架構](https://www.cnblogs.com/rossiXYZ/p/14454761.html) 以及 [原始碼解析 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562308.html) [[原始碼解析] 並行分散式框架 Celery 之架構 (2)](https://www.cnblogs.com/rossiXYZ/p/14562310.html) 下面我們再回顧下 Celery 的結構。Celery的架構圖如下所示: ```python +-----------+ +--------------+ | Producer | | Celery Beat | +-------+---+ +----+---------+ | | | | v v +-------------------------+ | Broker | +------------+------------+ | | | +-------------------------------+ | | | v v v +----+-----+ +----+------+ +-----+----+ | Exchange | | Exchange | | Exchange | +----+-----+ +----+------+ +----+-----+ | | | v v v +-----+ +-------+ +-------+ |queue| | queue | | queue | +--+--+ +---+---+ +---+---+ | | | | | | v v v +---------+ +--------+ +----------+ | worker | | Worker | | Worker | +-----+---+ +---+----+ +----+-----+ | | | | | | +-----------------------------+ | | v +---+-----+ | backend | +---------+ ``` ## 0x02 示例程式碼 其實網上難以找到除錯Celery worker的辦法。我們可以去其原始碼看看,發現如下: ```python # def test_worker_main(self): # from celery.bin import worker as worker_bin # # class worker(worker_bin.worker): # # def execute_from_commandline(self, argv): # return argv # # prev, worker_bin.worker = worker_bin.worker, worker # try: # ret = self.app.worker_main(argv=['--version']) # assert ret == ['--version'] # finally: # worker_bin.worker = prev ``` 所以我們可以模仿來進行,使用如下啟動worker,進行除錯。 ```python from celery import Celery app = Celery('tasks', broker='redis://localhost:6379') @app.task() def add(x, y): return x + y if __name__ == '__main__': app.worker_main(argv=['worker']) ``` ## 0x03 邏輯概述 當啟動一個worker的時候,這個worker會與broker建立連結(tcp長連結),然後如果有資料傳輸,則會建立相應的channel, 這個連線可以有多個channel。然後,worker就會去borker的佇列裡面取相應的task來進行消費了,這也是典型的消費者生產者模式。 這個worker主要是有四部分組成的,task_pool, consumer, scheduler, mediator。其中,task_pool主要是用來存放的是一些worker,當啟動了一個worker,並且提供併發引數的時候,會將一些worker放在這裡面。 celery預設的併發方式是prefork,也就是多程序的方式,這裡只是celery對multiprocessing pool進行了輕量的改造,然後給了一個新的名字叫做prefork,這個pool與多程序的程序池的區別就是這個task_pool只是存放一些執行的worker。 consumer也就是消費者,主要是從broker那裡接受一些message,然後將message轉化為`celery.worker.request.Request` 的一個例項。 Celery 在適當的時候,會把這個請求包裝進Task中,Task就是用裝飾器app_celery.task()裝飾的函式所生成的類,所以可以在自定義的任務函式中使用這個請求引數,獲取一些關鍵的資訊。此時,已經瞭解了task_pool和consumer。 接下來,這個worker具有兩套資料結構,這兩套資料結構是並行執行的,他們分別是 'ET時刻表' 、就緒佇列。 就緒佇列:那些 立刻就需要執行的task, 這些task到達worker的時候會被放到這個就緒佇列中等待consumer執行。 我們下面看看如何啟動Celery。 ## 0x04 Celery應用 程式首先會來到Celery類,這是Celery的應用。 可以看到主要就是:各種類名稱,TLS, 初始化之後的各種signal。 位置在:celery/app/base.py,其定義如下: ```python class Celery: """Celery application.""" amqp_cls = 'celery.app.amqp:AMQP' backend_cls = None events_cls = 'celery.app.events:Events' loader_cls = None log_cls = 'celery.app.log:Logging' control_cls = 'celery.app.control:Control' task_cls = 'celery.app.task:Task' registry_cls = 'celery.app.registry:TaskRegistry' #: Thread local storage. _local = None _fixups = None _pool = None _conf = None _after_fork_registered = False #: Signal sent when app is loading configuration. on_configure = None #: Signal sent after app has prepared the configuration. on_after_configure = None #: Signal sent after app has been finalized. on_after_finalize = None #: Signal sent by every new process after fork. on_after_fork = None ``` 對於我們的示例程式碼,入口是: ```python def worker_main(self, argv=None): if argv is None: argv = sys.argv if 'worker' not in argv: raise ValueError( "The worker sub-command must be specified in argv.\n" "Use app.start() to programmatically start other commands." ) self.start(argv=argv) ``` ### 4.1 新增子command celery/bin/celery.py 會進行新增 子command,我們可以看出來。 這些 Commnd 是可以在命令列作為子命令直接使用的
。 ```python celery.add_command(purge) celery.add_command(call) celery.add_command(beat) celery.add_command(list_) celery.add_command(result) celery.add_command(migrate) celery.add_command(status) celery.add_command(worker) celery.add_command(events) celery.add_command(inspect) celery.add_command(control) celery.add_command(graph) celery.add_command(upgrade) celery.add_command(logtool) celery.add_command(amqp) celery.add_command(shell) celery.add_command(multi) ``` 每一個都是command。我們以worker為例,具體如下: ```python worker = {CeleryDaemonC