1. 程式人生 > >python 歸納 (十五)_多進程使用Pool

python 歸納 (十五)_多進程使用Pool

res http nat epo 接受 read data ren alt

1 usePool.py

#coding: utf-8
"""
學習進程池使用 multiprocessing.Pool
總結:
  1. Pool 池用於處理 多進程,並不是多線程
  2. 池有大小的概念
  3. 並不是所有的子進程添加完了,才開始啟動子進程。 當第一個進程添加到池中的時候,馬上就啟動了

使用:
  1. 創建進程池對象 pool = multiprocessing.Pool(processes = 3)
  2. 往池中添加進程  主要:pool.apply_async(func, (參數, )) or pool.apply(func, (參數, ))
  3. 調用 pool.close(); pool.join() (一般伴隨 apply_async),等待所有子進程結束

其他:
terminate()    結束工作進程,不再處理未完成的任務
map(...) 將一個集合數據 映射到 同一個函數,  根據集合大小 執行多次子進程
get()  從子進程獲取返回結果
""" import multiprocessing import time # 進程代碼 def func(msg): print "sub begin:", msg time.sleep(2) print "sub end:",msg if __name__ == "__main__": pool = multiprocessing.Pool(processes = 3) # 創建進程池 for i in xrange(5): msg = " %d" %(i) # apply_async 非阻塞,一般和join一起使用, apply 阻塞 主進程等待子進程一個接一個執行完
# apply_async 維持執行的進程總數為processes,當一個進程執行完畢後會添加新的進程進去 # apply_async 這裏創建的都是守護進程 pool.apply_async(func, (msg, )) # 實際開發中,每個子線程執行不同的邏輯 time.sleep(1) print "alread start sub,%d\n" % i print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" pool.close()
# 關閉pool使其不在接受新的任務,必須有 pool.join() # 等待所有子進程結束 調用join之前,先調用close函數, print "Sub-process(es) done." """ pool.apply_async Out: sub begin: 0 alread start sub,0 sub begin: 1 alread start sub,1 sub begin: 2 sub end: 0 sub end: 1alread start sub,2 sub begin: 3 alread start sub,3 sub begin: 4 sub end: 2 sub end:alread start sub,4 3 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ sub end: 4 Sub-process(es) done. """ """ pool.apply Out: sub begin: 0 sub end: 0 alread start sub,0 sub begin: 1 sub end: 1 alread start sub,1 sub begin: 2 sub end: 2 alread start sub,2 sub begin: 3 sub end: 3 alread start sub,3 sub begin: 4 sub end: 4 alread start sub,4 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ Sub-process(es) done. """

2 usePoolmap.py

# -*- coding: utf-8 -*-
"""
使用 multiprocessing.Pool.map 執行多進程

邏輯:
   有10個大小的列表,進程池4個大小
   使用map執行完

總結:
   可以簡化啟動子進程代碼

使用:
    1. 創建進程池對象 pool = multiprocessing.Pool(processes = 3)
    2. 準備list 數據  i_list = range(10)
    3. 準備子進程執行代碼 函數 sub_process_code
    4. 調用 pool.map(sub_process_code, i_list)
       或
            pool.map_async(sub_process_code, i_list)
            pool.close()
            pool.join()

"""
import multiprocessing
import time
import os
def sub_process_code(x):
    # 打印  hh:ss 編號 進程ID
    print time.strftime(%M:%S,time.localtime(time.time())),x * x,os.getpid()
    time.sleep(3)

if __name__ == __main__:
    pool = multiprocessing.Pool(multiprocessing.cpu_count()) # 根據CPU數量創建進程池,這裏是4個
    i_list = range(10)
    pool.map(sub_process_code, i_list)

    ## 下面3行代碼 = 上面一行代碼
    # pool.map_async(sub_process_code, i_list) # 異步
    # pool.close()
    # pool.join()  # 如果沒有join,主進程 結束後,所有子進程馬上結束了
    print "end"

"""
Out:

24:20 0 5960
24:20 1 5840
24:20 4 5892
24:20 9 6944
24:23 16 5960
24:23 25 5840
24:23 36 5892
24:23 49 6944
24:26 64 5960
24:26 81 5840
end

"""

3 usePoolgetData.py

# -*- coding: utf-8 -*-
"""
使用進程池 multiprocessing.Pool,獲取子進程的返回數據

使用:
    1. 創建進程池對象 pool = multiprocessing.Pool(processes = 3)
    2. 往池中添加進程,同時拿到Result對象 p_ApplyResult_obj = pool.apply_async(func, (參數, ))
    3. 調用 pool.close(); pool.join() 等待所有子進程結束
    4. 獲取子進程的返回數據  p_ApplyResult_obj.get()
"""

import multiprocessing
import time

#  子進程代碼,會return 數據給主進程
def func(msg):
    time.sleep(3)
    print "end"
    return "return " + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4)
    result = []   # 存儲Result對象

    for i in xrange(3):
        msg = "hello %d" %(i)
        # 添加子進程的同時,獲取它的返回對象
        p_ApplyResult_obj = pool.apply_async(func, (msg, ))
        print id(p_ApplyResult_obj)  # 打印pool對象 ID
        result.append(p_ApplyResult_obj)
    pool.close()
    pool.join()
    for res in result:
        print ":::", res.get()   # 獲取子進程的return結果
    print "Sub-process(es) done."

"""
Out:

41974752
41974864
41975032
endend

end
::: return hello 0
::: return hello 1
::: return hello 2
Sub-process(es) done.
"""

參考:

python進程池:multiprocessing.pool

python 歸納 (十五)_多進程使用Pool