1. 程式人生 > >python技巧——使用Pool實現多程序並行

python技巧——使用Pool實現多程序並行

簡介

可以使用 Pool來實現多程序並行。

Pool 模組來自於 multiprocessing 模組。

  • multiprocessing 模組是跨平臺版本的多程序模組,像執行緒一樣管理程序,與 threading 很相似,對多核CPU的利用率會比 threading 好的多。
  • Pool 類可以提供指定數量的程序供使用者呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。

函式

apply()

函式原型:apply(func[, args=()[, kwds={}]])

該函式用於傳遞不定引數,同python中的apply函式一致,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不在出現)。

apply_async()

函式原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一致,但它是非阻塞的且支援結果返回後進行回撥。

map()

函式原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到結果返回。

map_async()

函式原型:map_async(func, iterable[, chunksize[, callback]])

與map用法一致,但是它是非阻塞的。其有關事項見apply_async。

阻塞與非阻塞的講解見下面備註。

close()

關閉程序池(pool),使其不在接受新的任務。

terminal()

結束工作程序,不在處理未處理的任務。

join()

主程序阻塞等待子程序的退出, join方法要在close或terminate之後使用。

示例

比如我想同時讓伺服器執行多條 hive 命令,可程式設計如下:

from multiprocessing import Pool
import subprocess

# 定義所有並行語句都回呼叫的函式
def run_sh
(sh): ''' 執行一行shell命令 ''' (statusLoad, outputLoad) = subprocess.getstatusoutput(sh) return (statusLoad, outputLoad) # 將需要執行的多條語句放入到一個list中 sh_list = [] sh_list.append('hive -e "select * from A" > A_result') sh_list.append('hive -e "select * from B" > B_result') sh_list.append('hive -e "select * from C" > C_result') # 開始並行 pool = Pool(len(sh_list)) pool.map(run_sh, sh_list) # 表示將 sh_list 每個元素作為引數遞給 run_sh pool.close() # 將程序池關閉,不再接受新的程序 pool.join() # 主程序阻塞,只有池中所有程序都完畢了才會通過 # 開始處理結果檔案,此時三個 *_result 檔案肯定是存在並且已經寫入完畢的

備註

1、阻塞與非阻塞的區別

map() 會使程序阻塞,即通過 map() 開啟的多程序都結束之後,這個函式才會有返回結果,否則主程序會一直等待,不會往下進行 。

map_async() 為非阻塞,即通過 map_async() 開啟多程序之後,立刻會返回結果,主程序會繼續往下執行。

注意:

如果後面呼叫了 join() 函式,則不管之前用的是 map 還是 map_async,主程序都會等待,直到程序池中所有程序執行完畢,才會繼續往下執行。

2、starmap 函式

Pool 類中,python 3.X 還引入了 starmap 函式,與 map 的區別在於, starmap 支援將多個引數放入到佇列中,不同引數按照順序以元組形式存放,舉例如下:

from multiprocessing import Pool
def func(a, b):
    print(a + b)

if __name__=="__main__":
    args = [(1,2),(3,4),(5,6)]
    pool = Pool(3)
    pool.starmap(func, args)    

輸出

3
7
11

記憶體共享問題

多程序並行有一個特點:多個程序之間並不能共享記憶體。

比如一個人寫出了以下程式碼,期望可以對同一個數進行累加:

from multiprocessing import Pool
def func(dic, c):
    dic['count'] += c

if __name__=="__main__":
    d = dict() 
    d['count'] = 0
    args = [(d, 1), (d, 2), (d, 3)]
    pool = Pool(3)
    pool.starmap(func, args)   
    pool.close()
    pool.join()
    print(f'dic={d}')

但是輸出為:

dic={'count': 0}

不是我們想要的結果。

這是因為,多執行緒和多程序最大的不同在於,多程序中,同一個變數,各自有一份拷貝存在於每個程序中,互不影響,而多執行緒中,所有變數都由所有執行緒共享,所以,任何一個變數都可以被任何一個執行緒修改。

解決辦法

可以使用 multiprocessing.Manager 來建立物件,這樣的物件可以被共享,如:

from multiprocessing import Pool, Manager
def func(dic, c):
    dic['count'] += c

if __name__=="__main__":
    d = Manager().dict()  #生成一個字典,可以在多個程序中傳遞和共享。
    d['count'] = 0
    args = [(d, 1), (d, 2), (d, 3)]
    pool = Pool(3)
    pool.starmap(func, args)   
    pool.close()
    pool.join()
    print(f'dic={d}')

輸出是我們所期望的:

dic={'count': 6}

Manager() 內部有加鎖機制,不允許兩個程序同時修改一份資料,因為程序的資料是獨立的,因此資料是安全的。

另外,如果只要求並行,不要求必須是多程序,可以使用多執行緒來實現共享資料。 參照python技巧——使用threadpool實現多執行緒並行