python學習筆記——multiprocess 多進程組件Pool
1 進程池Pool基本概述
在使用Python進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多臺主機,並行操作可以節約大量時間,如果操作的對象數目不大時,還可以直接適用Process類動態生成多個進程,幾十個尚可,若上百個甚至更多時,手動限制進程數量就顯得特別繁瑣,此時進程池就顯得尤為重要。
進程池Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交至Pool中時,若進程池尚未滿,就會創建一個新的進程來執行請求;若進程池中的進程數已經達到規定的最大數量,則該請求就會等待,直到進程池中有進程結束,才會創建新的進程來處理該請求。
2 進程池Pool的語法
Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])
processes:使用的工作進程的數量;若processes是None,默認適用os.cpu_count()返回的數量。
initializer:若initializer是None,則每一個工作進程在開始的時候就會調用initializer(*initargs)。
maxtasksperchild:工作進程退出前可以完成的任務數,完成後用一個新的工作進程來替代原進程,讓閑置的資源釋放,maxtasksperchild默認是None,此意味只要Pool存在工作進程就一直存活
context: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。
實例方法:
p為進程池對象
p.apply():
apply(func[, args=()[, kwds={}]])
該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束,實際上這也就說所謂的同步執行。
同步執行,按照加入進程池的順序執行事件,每次執行完一個再執行另一個,無法獲取返回值。
p.apply_async()
apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一樣,但它是非阻塞且支持結果返回進行回調;實際上也就是異步執行。
異步執行,同時啟動進程池中多個進程執行事件,可以獲取事件返回值 —
p.map()
map(func, iterable[, chunksize=None])
Pool類中的map方法,與內置map函數用法基本一致,它融合了map函數和apply_async()函數的功能;它會使進程阻塞直到返回結果。
註意:雖然第二個參數是一個叠代器,但實際應用中,必須在整個隊列就緒後,程序才會運行子進程。
p.close():關閉進程池,阻止更多的任務提交到進程池Pool,待任務完成後,工作進程會退出
p.terminate():結束工作進程,不再處理未完成的任務
p.join():等待工作線程的退出,必須在close()或terminate()之後使用,因被終止的進程需要被父進程調用wait(join等價於wait),否則進程會成為僵屍進程。
註意:
(1)使用Pool創建進程池對象,同時進程池中進程已經啟動
(2)向進程池對象中添加事件,事件排隊執行
(3)如果主進程退出,則進程池中所有進程都退出
3 實例
3.1 基礎實例
import multiprocessing as mp def test(): pass p = mp.Pool(processes = 5) # 創建5條進程 for i in range(10): p.apply_async(test) # 向進程池添加任務 p.close() # 關閉進程池,不再接受請求 p.join() # 等待所有的子進程結束
說明:
(1)進程池Pool被創建出來後, p.apply_async(test) 語句不停地循環執行,相當於向進程池中提交了10個請求,它們會被放到一個隊列中。
(2) p = mp.Pool(5) 執行完畢後創建了5條進程,但尚未給它們分配各自的任務;也就意味著,無論有多少任務,實際的進程數只有5條,每次最多5條進程並行。
(3)當Pool中有進程任務執行完畢後,這條進程資源會被釋放,Pool會按先進先出的原則取出一個新的請求給空閑的進程繼續執行。
(4)當Pool所有的進程任務完成後,會產生5個僵屍進程,如果主進程/主線程不結束,系統不會自動回收資源,需要調用join函數負責回收。
(5)在創建Pool進程池時,若不指定進程的最大數量,默認創建的進程數為系統的內核數量
(6)如果采用p.apply(test)阻塞方式添加任務,其每次只能向進程池中添加一條任務,然後for循環會被阻塞等待,直到添加的任務被執行完畢,進程池中的5個進程交替執行新來的任務,此時相當於單進程。——該語句需要再深刻理解,尚未完全明白
參考:python的multiprocessing模塊進程創建、資源回收-Process,Pool
3.2 apply方式添加任務
import multiprocessing as mp import os from time import sleep def worker(msg): print(os.getpid()) sleep(2) print(msg) return msg #創建進程池對象 p = mp.Pool(processes = 4)#創建4條進程 pool_result = [] for i in range(10): msg = ‘hello-%d‘%i r = p.apply(worker,(msg,))#向進程池中添加事件 pool_result.append(r) #獲取事件函數的返回值 for r in pool_result: print(‘return:‘,r) p.close()# 關閉進程池,不再接受請求 p.join() # 等待進程池中的事件執行完畢,回收進程池
運行
8419 hello-0 8418 hello-1 8420 hello-2 8421 hello-3 8419 hello-4 8418 hello-5 8420 hello-6 8421 hello-7 8419 hello-8 8418 hello-9 return: hello-0 return: hello-1 return: hello-2 return: hello-3 return: hello-4 return: hello-5 return: hello-6 return: hello-7 return: hello-8 return: hello-9
這段代碼運行較慢,和進程阻塞有關。相當於單線程!
當將代碼(22行)中的 print(‘return:‘,r) 修改為 print(‘return:‘,r.get()) 時
8670 hello-0 8671 hello-1 8672 hello-2 8673 hello-3 8670 hello-4 8671 hello-5 8672 hello-6 8673 hello-7 8670 hello-8 8671 hello-9 Traceback (most recent call last): File "test1.py", line 22, in <module> print(‘return:‘,r.get()) AttributeError: ‘str‘ object has no attribute ‘get‘
最後報錯: AttributeError: ‘str‘ object has no attribute ‘get‘
3.3 applay_async方式添加任務
import multiprocessing as mp import os from time import sleep def worker(msg): print(os.getpid()) sleep(2) print(msg) return msg #創建進程池對象 p = mp.Pool(processes = 4) #創建4條進程 pool_result = [] for i in range(10): msg = ‘hello-%d‘%i r = p.apply_async(worker,(msg,)) #向進程池中添加事件 pool_result.append(r) #獲取事件函數的返回值 for r in pool_result: print(‘return:‘,r) p.close()#關閉進程池,不再接受請求 p.join()# 等待進程池中的事件執行完畢,回收進程池
運行
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37d68> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37e80> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37f98> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e410f0> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41208> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41320> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41438> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41550> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41668> return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41780> 8739 8740 8742 8741 hello-0 hello-3 8742 hello-1 8739 8740 hello-2 8741 hello-5 8739 hello-6 8740 hello-7 hello-4 hello-8 hello-9
註意:
(1)由於這個是異步方式添加任務,所以運行非常快
(2)由於for是內置循環函數,執行效率較高,所以在結果的前10行均為for語句執行結果
(3) r = p.apply_async(worker,(msg,)) 執行結果為進度對象。
(4)由於任務是異步執行,所以在結果中是“亂序”;並不像applay那樣有序打印。
同樣將代碼(22行)中的 print(‘return:‘,r) 修改為 print(‘return:‘,r.get()) 時,
運行結果
8839 8840 8841 8842 hello-0 hello-1 hello-3 8839 hello-2 8842 8841 8840 return: hello-0 return: hello-1 return: hello-2 return: hello-3 hello-4 hello-5 8839 hello-6 8842 hello-7 return: hello-4 return: hello-5 return: hello-6 return: hello-7 hello-9 hello-8 return: hello-8 return: hello-9
python學習筆記——multiprocess 多進程組件Pool