1. 程式人生 > >Python多程序併發操作程序池Pool

Python多程序併發操作程序池Pool

目錄:

  1. multiprocessing模組
  2. Pool類
  3. apply
  4. apply_async
  5. map
  6. close
  7. terminate
  8. join
  9. 程序例項

multiprocessing模組

如果你打算編寫多程序的服務程式,Unix/Linux無疑是正確的選擇。由於Windows沒有fork呼叫,難道在Windows上無法用Python編寫多程序的程式?由於Python是跨平臺的,自然也應該提供一個跨平臺的多程序支援。multiprocessing模組就是跨平臺版本的多程序模組。multiprocessing模組提供了一個Process

類來代表一個程序物件,這個模組表示像執行緒一樣管理程序,這個是multiprocessing的核心,它與threading很相似,對多核CPU的利用率會比threading好的多。

 看一下Process類的構造方法:

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

引數說明: 
group:程序所屬組。基本不用 
target:表示呼叫物件。 
args:表示呼叫物件的位置引數元組。 
name:別名 
kwargs:表示呼叫物件的字典。

下面看一個簡單的例子

複製程式碼

 1 #coding=utf-8
 2 import multiprocessing
 3 
 4 def do(n) :
 5   #獲取當前執行緒的名字
 6   name = multiprocessing.current_process().name
 7   print(name,'starting')
 8   print("worker ", n)
 9   return 
10 
11 if __name__ == '__main__' :
12   numList = []
13   for i in xrange(5) :
14     p = multiprocessing.Process(target=do, args=(i,))
15     numList.append(p)
16     p.start()
17     p.join()
18     print("Process end.")

複製程式碼

執行結果

複製程式碼

Process-1 starting
worker  0
Process end.
Process-2 starting
worker  1
Process end.
Process-3 starting
worker  2
Process end.
Process-4 starting
worker  3
Process end.
Process-5 starting
worker  4
Process end.

複製程式碼

建立子程序時,只需要傳入一個執行函式和函式的引數,建立一個Process例項,並用其start()方法啟動,這樣建立程序比fork()還要簡單。 join()方法表示等待子程序結束以後再繼續往下執行,通常用於程序間的同步。

注意: 
在Windows上要想使用程序模組,就必須把有關程序的程式碼寫在當前.py檔案的if __name__ == ‘__main__’ :語句的下面,才能正常使用Windows下的程序模組。Unix/Linux下則不需要。

Pool類

 Pool類可以提供指定數量的程序供使用者呼叫,當有新的請求提交到Pool中時,如果池還沒有滿,就會建立一個新的程序來執行請求。如果池滿,請求就會告知先等待,直到池中有程序結束,才會建立新的程序來執行這些請求。 
下面介紹一下multiprocessing 模組下的Pool類下的幾個方法:

1.apply()

函式原型:apply(func[, args=()[, kwds={}]])

該函式用於傳遞不定引數,同python中的apply函式一致,主程序會被阻塞直到函式執行結束(不建議使用,並且3.x以後不在出現)。

2.apply_async

函式原型:apply_async(func[, args=()[, kwds={}[, callback=None]]])

與apply用法一致,但它是非阻塞的且支援結果返回後進行回撥。

3.map()

 函式原型:map(func, iterable[, chunksize=None])

Pool類中的map方法,與內建的map函式用法行為基本一致,它會使程序阻塞直到結果返回。 
注意:雖然第二個引數是一個迭代器,但在實際使用中,必須在整個佇列都就緒後,程式才會執行子程序。

4.map_async()

函式原型:map_async(func, iterable[, chunksize[, callback]])
與map用法一致,但是它是非阻塞的。其有關事項見apply_async。

5.close()

關閉程序池(pool),使其不在接受新的任務。

6.terminal()

結束工作程序,不在處理未處理的任務。

7.join()

主程序阻塞等待子程序的退出, join方法要在close或terminate之後使用。

下面我們看一個簡單的multiprocessing.Pool類的例項:

複製程式碼

 1 # -*- coding: utf-8 -*-
 2 import time
 3 from multiprocessing import Pool
 4 def run(fn):
 5   #fn: 函式引數是資料列表的一個元素
 6   time.sleep(1)
 7   print(fn*fn)
 8 
 9 if __name__ == "__main__":
10   testFL = [1,2,3,4,5,6]
11   print ('shunxu:') #順序執行(也就是序列執行,單程序)
12   s = time.time()
13   for fn in testFL:
14     run(fn)
15   t1 = time.time()
16   print ("順序執行時間:", int(t1 - s))
17 
18   print ('concurrent:') #建立多個程序,並行執行
19   pool = Pool(10)  #建立擁有10個程序數量的程序池
20   #testFL:要處理的資料列表,run:處理testFL列表中資料的函式
21   pool.map(run, testFL)
22   pool.close()#關閉程序池,不再接受新的程序
23   pool.join()#主程序阻塞等待子程序的退出
24   t2 = time.time()
25   print ("並行執行時間:", int(t2-t1))

複製程式碼

輸出結果為:

複製程式碼

shunxu:
1
4
9
16
25
36
順序執行時間: 6
concurrent:
1
4
9
16
25
36
並行執行時間: 1

複製程式碼

上例是一個建立多個程序併發處理與順序執行處理同一資料,所用時間的差別。從結果可以看出,併發執行的時間明顯比順序執行要快很多,但是程序是要耗資源的,所以平時工作中,程序數也不能開太大。 對Pool物件呼叫join()方法會等待所有子程序執行完畢,呼叫join()之前必須先呼叫close(),讓其不再接受新的Process了。