python語法——使用Pool實現多程序並行
簡介
Pool
模組來自於 multiprocessing
模組。
multiprocessing
模組是跨平臺版本的多程序模組,像執行緒一樣管理程序,與 threading 很相似,對多核CPU的利用率會比 threading 好的多。Pool
類可以提供指定數量的程序供使用者呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。
函式
apply()
函式原型:apply(func[, args=()[, kwds={}]])
該函式用於傳遞不定引數,同python中的apply函式一致,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不在出現)。
apply_async
函式原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])
與apply用法一致,但它是非阻塞的且支援結果返回後進行回撥。
map()
函式原型:map(func, iterable[, chunksize=None])
Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到結果返回。
map_async()
函式原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。
阻塞與非阻塞的講解見下面備註。
close()
關閉程序池(pool),使其不在接受新的任務。
terminal()
結束工作程序,不在處理未處理的任務。
join()
主程序阻塞等待子程序的退出, join方法要在close或terminate之後使用。
示例
比如我想同時讓伺服器執行多條 hive 命令,可程式設計如下:
from multiprocessing import Pool
import subprocess
# 定義所有並行語句都回呼叫的函式
def run_sh (sh):
'''
執行一行shell命令
'''
(statusLoad, outputLoad) = subprocess.getstatusoutput(sh)
return (statusLoad, outputLoad)
# 將需要執行的多條語句放入到一個list中
sh_list = []
sh_list.append('hive -e "select * from A" > A_result')
sh_list.append('hive -e "select * from B" > B_result')
sh_list.append('hive -e "select * from C" > C_result')
# 開始並行
pool = Pool(len(sh_list))
pool.map(run_sh, sh_list) # 表示將 sh_list 每個元素作為引數遞給 run_sh
pool.close() # 將程序池關閉,不再接受新的程序
pool.join() # 主程序阻塞,只有池中所有程序都完畢了才會通過
# 開始處理結果檔案,此時三個 *_result 檔案肯定是存在並且已經寫入完畢的
備註
阻塞與非阻塞的區別
map()
會使程序阻塞,即通過 map()
開啟的多程序都結束之後,這個函式才會有返回結果,否則主程序會一直等待,不會往下進行 。
map_async()
為非阻塞,即通過 map_async()
開啟多程序之後,立刻會返回結果,主程序會繼續往下執行。
注意:
如果後面呼叫了 join()
函式,則不管之前用的是 map
還是 map_async
,主程序都會等待,直到程序池中所有程序執行完畢,才會繼續往下執行。
starmap
函式
Pool
類中,python 3.X 還引入了 starmap
函式,與 map
的區別在於, starmap
支援將多個引數放入到佇列中,不同引數按照順序以元組形式存放,舉例如下:
from multiprocessing import Pool
def func(a, b):
print(a + b)
if __name__=="__main__":
args = [(1,2),(3,4),(5,6)]
pool = Pool(3)
pool.starmap(func, args)
輸出
3
7
11