1. 程式人生 > >python學習筆記——multiprocess 多進程組件Pool

python學習筆記——multiprocess 多進程組件Pool

3.1 spa AR file 取出 int 分配 這也 內核

1 進程池Pool基本概述

在使用Python進行系統管理時,特別是同時操作多個文件目錄或者遠程控制多臺主機,並行操作可以節約大量時間,如果操作的對象數目不大時,還可以直接適用Process類動態生成多個進程,幾十個尚可,若上百個甚至更多時,手動限制進程數量就顯得特別繁瑣,此時進程池就顯得尤為重要。

進程池Pool類可以提供指定數量的進程供用戶調用,當有新的請求提交至Pool中時,若進程池尚未滿,就會創建一個新的進程來執行請求;若進程池中的進程數已經達到規定的最大數量,則該請求就會等待,直到進程池中有進程結束,才會創建新的進程來處理該請求。

2 進程池Pool的語法

Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

processes:使用的工作進程的數量;若processes是None,默認適用os.cpu_count()返回的數量。

initializer:若initializer是None,則每一個工作進程在開始的時候就會調用initializer(*initargs)。

maxtasksperchild:工作進程退出前可以完成的任務數,完成後用一個新的工作進程來替代原進程,讓閑置的資源釋放,maxtasksperchild默認是None,此意味只要Pool存在工作進程就一直存活

context: 用在制定工作進程啟動時的上下文,一般使用multiprocessing.Pool()或者一個context對象的Pool()方法來創建一個池,兩種方法都適當的設置了context。

實例方法:

p為進程池對象

p.apply():

apply(func[, args=()[, kwds={}]])

該函數用於傳遞不定參數,主進程會被阻塞直到函數執行結束,實際上這也就說所謂的同步執行。

同步執行,按照加入進程池的順序執行事件,每次執行完一個再執行另一個,無法獲取返回值

p.apply_async()

apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一樣,但它是非阻塞且支持結果返回進行回調;實際上也就是異步執行。

異步執行,同時啟動進程池中多個進程執行事件,可以獲取事件返回值 —

<multiprocessing.pool.ApplyResult object at 0x7f7f6e4357f0>

p.map()

map(func, iterable[, chunksize=None])

Pool類中的map方法,與內置map函數用法基本一致,它融合了map函數和apply_async()函數的功能;它會使進程阻塞直到返回結果。

註意:雖然第二個參數是一個叠代器,但實際應用中,必須在整個隊列就緒後,程序才會運行子進程。

p.close():關閉進程池,阻止更多的任務提交到進程池Pool,待任務完成後,工作進程會退出

p.terminate():結束工作進程,不再處理未完成的任務

p.join():等待工作線程的退出,必須在close()或terminate()之後使用,因被終止的進程需要被父進程調用wait(join等價於wait),否則進程會成為僵屍進程。

註意:

(1)使用Pool創建進程池對象,同時進程池中進程已經啟動

(2)向進程池對象中添加事件,事件排隊執行

(3)如果主進程退出,則進程池中所有進程都退出

3 實例

3.1 基礎實例

import multiprocessing as mp

def test():
    pass

p = mp.Pool(processes = 5) # 創建5條進程

for i in range(10):
    p.apply_async(test) # 向進程池添加任務

p.close() # 關閉進程池,不再接受請求
p.join() # 等待所有的子進程結束

說明:

(1)進程池Pool被創建出來後, p.apply_async(test) 語句不停地循環執行,相當於向進程池中提交了10個請求,它們會被放到一個隊列中。

(2) p = mp.Pool(5) 執行完畢後創建了5條進程,但尚未給它們分配各自的任務;也就意味著,無論有多少任務,實際的進程數只有5條,每次最多5條進程並行。

(3)當Pool中有進程任務執行完畢後,這條進程資源會被釋放,Pool會按先進先出的原則取出一個新的請求給空閑的進程繼續執行。

(4)當Pool所有的進程任務完成後,會產生5個僵屍進程,如果主進程/主線程不結束,系統不會自動回收資源,需要調用join函數負責回收。

(5)在創建Pool進程池時,若不指定進程的最大數量,默認創建的進程數為系統的內核數量

(6)如果采用p.apply(test)阻塞方式添加任務,其每次只能向進程池中添加一條任務,然後for循環會被阻塞等待,直到添加的任務被執行完畢,進程池中的5個進程交替執行新來的任務,此時相當於單進程。——該語句需要再深刻理解,尚未完全明白

參考:python的multiprocessing模塊進程創建、資源回收-Process,Pool

3.2 apply方式添加任務

import  multiprocessing as mp
import os
from time import sleep

def worker(msg):
    print(os.getpid())
    sleep(2)
    print(msg)
    return msg

#創建進程池對象
p = mp.Pool(processes = 4)#創建4條進程

pool_result = []
for i in range(10):
    msg = hello-%d%i
    r = p.apply(worker,(msg,))#向進程池中添加事件
    pool_result.append(r)

#獲取事件函數的返回值
for r in pool_result:
    print(return:,r) 

p.close()# 關閉進程池,不再接受請求
p.join() # 等待進程池中的事件執行完畢,回收進程池

運行

8419
hello-0
8418
hello-1
8420
hello-2
8421
hello-3
8419
hello-4
8418
hello-5
8420
hello-6
8421
hello-7
8419
hello-8
8418
hello-9
return: hello-0
return: hello-1
return: hello-2
return: hello-3
return: hello-4
return: hello-5
return: hello-6
return: hello-7
return: hello-8
return: hello-9

這段代碼運行較慢,和進程阻塞有關。相當於單線程!

當將代碼(22行)中的 print(return:,r) 修改為 print(return:,r.get())

8670
hello-0
8671
hello-1
8672
hello-2
8673
hello-3
8670
hello-4
8671
hello-5
8672
hello-6
8673
hello-7
8670
hello-8
8671
hello-9
Traceback (most recent call last):
  File "test1.py", line 22, in <module>
    print(return:,r.get())
AttributeError: str object has no attribute get

最後報錯: AttributeError: str object has no attribute get

3.3 applay_async方式添加任務

import multiprocessing as mp
import os 
from time import sleep 

def worker(msg):
    print(os.getpid())
    sleep(2)
    print(msg)
    return msg

#創建進程池對象
p = mp.Pool(processes = 4) #創建4條進程

pool_result = []
for i in range(10):
    msg = hello-%d%i
    r = p.apply_async(worker,(msg,)) #向進程池中添加事件
    pool_result.append(r)

#獲取事件函數的返回值
for r in pool_result:
    print(return:,r)

p.close()#關閉進程池,不再接受請求
p.join()# 等待進程池中的事件執行完畢,回收進程池

運行

return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37d68>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37e80>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e37f98>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e410f0>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41208>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41320>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41438>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41550>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41668>
return: <multiprocessing.pool.ApplyResult object at 0x7f66d0e41780>
8739
8740
8742
8741
hello-0
hello-3
8742
hello-1
8739
8740
hello-2
8741
hello-5
8739
hello-6
8740
hello-7
hello-4
hello-8
hello-9

註意:

(1)由於這個是異步方式添加任務,所以運行非常快

(2)由於for是內置循環函數,執行效率較高,所以在結果的前10行均為for語句執行結果

(3) r = p.apply_async(worker,(msg,)) 執行結果為進度對象。

(4)由於任務是異步執行,所以在結果中是“亂序”;並不像applay那樣有序打印。

同樣將代碼(22行)中的 print(return:,r) 修改為 print(return:,r.get()) 時,

運行結果

8839
8840
8841
8842
hello-0
hello-1
hello-3
8839
hello-2
8842
8841
8840
return: hello-0
return: hello-1
return: hello-2
return: hello-3
hello-4
hello-5
8839
hello-6
8842
hello-7
return: hello-4
return: hello-5
return: hello-6
return: hello-7
hello-9
hello-8
return: hello-8
return: hello-9

python學習筆記——multiprocess 多進程組件Pool