multiprocessing解析(一):Process的解析
最近專案中為了提升Python的執行效率,要使用到multiprocessing模組,場景其實不算複雜但是為了避免帶來一些隱晦的bug,我覺得還是深入瞭解一下它比較好。因為當我使用multiprocessing的時候我覺得Python替我做的事情太多了(其實也就是用multiprocessing太方便了)給我帶來了快感卻沒有安全感,還有一個主要的原因就是我覺得fork
是最”奇幻”的函式。
可能會分好幾個章節,也有可能會太監。
1. 文件是最先需要了解的,讀完文件可能會有很多的意外的收穫同時也會留下疑惑,對於一般的使用我覺得讀完文件就差不多了,除非一些很有疑惑的地方你可能需要再深入的瞭解一下。我讀文件的目的第一個就是為了找出疑惑然後帶著疑惑去讀原始碼,還有一個目的就是為了後面讀原始碼提供指導。
2. multiprocessing.Process
daemon
這個屬性,千萬不要被名字迷惑了,它跟我們常見的Linux daemon程序沒有任何關係了。簡單點說它的功能就是:一個程序退出(exit)的時候會嘗試把它所有daemonic(設定了daemon屬性)的子程序終止(terminate)。 3.
multiprocessing.Process
原始碼分析 當我們
import multiprocessing
的時候就會生成一個_MainProcess
類,這是代表當前主程序,內容比較簡單。當我們在主程序中呼叫Process()
_MainProcess
中繼承一些屬性,例如:daemon
屬性。當呼叫Process.start()
開始執行一個程序的時候首先會有三條assert
斷言語句:不能重複start一個程序、只能start當前程序自己建立的程序、daemon程序不能有子程序。接著呼叫_cleanup()
清理當前已經執行完的程序(join
,然後把它從當前程序的子程序集合中去掉)。到了動真格的時候,就是要fork
生成子程序了,在Process
中用Popen
封裝了子程序的操作,用屬性_popen
來代表子程序。下面是Process.start()
的原始碼:
def start(self) :
'''
Start child process
'''
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
assert not _current_process._daemonic, \
'daemonic processes are not allowed to have children'
_cleanup()
if self._Popen is not None:
Popen = self._Popen
else:
from .forking import Popen
self._popen = Popen(self)
_current_process._children.add(self)
我們看到呼叫Popen
建立子程序,然後把生成的Process
物件加入到當前程序的子程序集合中後我們就算完成了子程序的初始化、執行,這個步驟看起來比較簡單,其實真正的乾貨實在Popen
中,所以我們接著來看forking.Popen
類。
4. multiprocessing.forking.Popen
原始碼分析
Popen
首先把stdin、stdout
重新整理一下,接著就是代表子程序退出碼的returncode
,然後就是呼叫os.fork
來真正生成子程序。fork
後當前程序(也就是父程序)返回並且記錄子程序的程序ID(Popen
物件的pid
屬性),新生成的子程序呼叫Process
物件的_bootstrap()
來執行我們的target
,最後呼叫os._exit()
直接退出。
Process.start()->Popen.__init__()->os.fork->Process._bootstrap()
我們又回到了Process
了,感覺是不是有點繞呢。當我們再次回到Process.__bootstrap()
的時候一定要記住我們是在一個克隆的新空間中了,不再是前文中我們一直提到的當前程序了,而是我們在當前程序新生成的子程序中了。在__bootstrap()
中會先重新設定children、_count
屬性以及代表當前程序的全域性變數_current_process
,之所以要重新設定就是我們已經新生成了一個程序空間,我們當前是在這個新空間中執行著,特別是代表當前程序的_current_process
變數需要好好理解一下。接著就是清空_finalizer_registry
(這個我們留到後面再說),呼叫util._run_after_fork()
來做一些fork
之後執行target
之前的操作,當然預設情況下是沒有任何操作的,不過我們也可以呼叫util.register_after_fork()
來註冊一些函式,util.register_after_fork()
這個函式在文件中是沒有提及到的,算是一個”偏門”。最後就是呼叫Process.run
執行我們的target了,到此時我們的子程序就執行到了我們自己要執行的目標中了。執行完後會呼叫util._exit_function()
來做一些掃尾工作,這個函式是我們接下來的重點內容。
5. 程序退出的清理工作,_exit_function
原始碼分析
無論是主程序還是生成的子程序在程序退出之前都需要做一些清理工作,避免一些系統資源被一直佔用而不釋放。在主程序中當我們import multiprocessing.forking
的時候會間接的import multiprocessing.util
,此時就會在util中呼叫atexit.register(_exit_function)
來註冊主程序的exit handler。而子程序在上面我們看到執行完我們的target以後會主動呼叫_exit_function()
,所以無論是主程序還是子程序都是呼叫_exit_function()
來做最後的”善後”工作。
_exit_function()
的功能其實也比較簡單,呼叫_run_finalizers(0)
、終止(terminate)daemonic
子程序、join
回收子程序、再次呼叫_run_finalizers()
(引數有變化)。這裡的程式碼也很好的幫我理解了程序的daemon
屬性是幹嘛的了。
def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
active_children=active_children,
current_process=current_process):
# NB: we hold on to references to functions in the arglist due to the
# situation described below, where this function is called after this
# module's globals are destroyed.
global _exiting
info('process shutting down')
debug('running all "atexit" finalizers with priority >= 0')
_run_finalizers(0)
if current_process() is not None:
# NB: we check if the current process is None here because if
# it's None, any call to ``active_children()`` will throw an
# AttributeError (active_children winds up trying to get
# attributes from util._current_process). This happens in a
# variety of shutdown circumstances that are not well-understood
# because module-scope variables are not apparently supposed to
# be destroyed until after this function is called. However,
# they are indeed destroyed before this function is called. See
# issues 9775 and 15881. Also related: 4106, 9205, and 9207.
for p in active_children():
if p._daemonic:
info('calling terminate() for daemon %s', p.name)
p._popen.terminate()
for p in active_children():
info('calling join() for process %s', p.name)
p.join()
debug('running the remaining "atexit" finalizers')
_run_finalizers()
接下來就是_run_finalizer()
了,這個函式就是把優先順序高於傳入的minpriority
引數的finalizer函式按照建立時間的倒序執行一遍,當前程序的finalizer函式都是儲存在全域性變數_finalizer_registry
中,這個預設是為空的,也就是沒有任何的額外finalizer函式會呼叫,不過使用者可以通過建立multiprocessing.util.Finalize
物件來註冊finalizer函式。Finalize
這個類在multiprocessing的文件中也沒有公開,所以這也算是一個隱藏的功能。這個類我們可以在什麼時候用到呢?比如你在一個Process中建立了一個數據庫連線,需要在程序退出之前關閉這個連線,你除了在你的target程式碼中執行關閉以外還可以通過例項化multiprocessing.util.Finalize
物件來實現這個功能。在multiprocessing內部程式碼中就有很多的地方使用Finalize
來實現清理工作。
6. 至此Process
的出生到滅亡我們都有涉及了,其它的一些功能大體從程式碼中都能看明白了,無非就是對子程序的pid做一些os.waitpid
、os.kill
之類的呼叫,不再一一討論了。Process
只是multiprocessing中最基礎的部分,後面我會接著寫一下最重要的multiprocessing.pool
。