python中多程序(multiprocessing)
python中多程序(multiprocessing)
一、multiprocessing中使用子程序概念
from multiprocessing import Process
可以通過Process來構造一個子程序
p = Process(target=fun,args=(args))
再通過p.start()來啟動子程序
再通過p.join()方法來使得子程序執行結束後再執行父程序
1 |
|
from multiprocessing import Process import os # 子程序要執行的程式碼 def run_proc(name): print 'Run child process %s (%s)...' % (name, os.getpid()) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Process(target=run_proc, args=('test',)) print 'Process will start.' p.start() p.join() print 'Process end.'
1 |
|
1 |
|
如果需要多個子程序時可以考慮使用程序池(pool)來管理
from multiprocessing import Pool from multiprocessing import Pool import os, time def long_time_task(name): print 'Run task %s (%s)...' % (name, os.getpid()) start = time.time() time.sleep(3) end = time.time() print 'Task %s runs %0.2f seconds.' % (name, (end - start)) if __name__=='__main__': print 'Parent process %s.' % os.getpid() p = Pool() for i in range(5): p.apply_async(long_time_task, args=(i,)) print 'Waiting for all subprocesses done...' p.close() p.join() print 'All subprocesses done.'
pool建立子程序的方法與Process不同,是通過
p.apply_async(func,args=(args))實現,一個池子裡能同時執行的任務是取決你電腦的cpu數量,如我的電腦現在是有4個cpu,那會子程序task0,task1,task2,task3可以同時啟動,task4則在之前的一個某個程序結束後才開始
http://my.oschina.net/yangyanxing/blog/296052 結果見連線
上面的程式執行後的結果其實是按照上圖中1,2,3分開進行的,先列印1,3秒後列印2,再3秒後列印3
程式碼中的p.close()是關掉程序池子,是不再向裡面新增程序了,對Pool
join()
方法會等待所有子程序執行完畢,呼叫join()
之前必須先呼叫close()
,呼叫close()
之後就不能繼續新增新的Process
了。
當時也可以是例項pool的時候給它定義一個程序的多少
如果上面的程式碼中p=Pool(5)那麼所有的子程序就可以同時進行
三、多個子程序間的通訊
多個子程序間的通訊就要採用第一步中說到的Queue,比如有以下的需求,一個子程序向佇列中寫資料,另外一個程序從佇列中取資料,
#coding:gbk from multiprocessing import Process, Queue import os, time, random # 寫資料程序執行的程式碼: def write(q): for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) time.sleep(random.random()) # 讀資料程序執行的程式碼: def read(q): while True: if not q.empty(): value = q.get(True) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': # 父程序建立Queue,並傳給各個子程序: q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) # 啟動子程序pw,寫入: pw.start() # 等待pw結束: pw.join() # 啟動子程序pr,讀取: pr.start() pr.join() # pr程序裡是死迴圈,無法等待其結束,只能強行終止: print print '所有資料都寫入並且讀完'
四、關於上面程式碼的幾個有趣的問題
if __name__=='__main__': # 父程序建立Queue,並傳給各個子程序: q = Queue() p = Pool() pw = p.apply_async(write,args=(q,)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有資料都寫入並且讀完'
如果main函式寫成上面的樣本,本來我想要的是將會得到一個佇列,將其作為引數傳入程序池子裡的每個子程序,但是卻得到
RuntimeError: Queue objects should only be shared between processes through inheritance
的錯誤,查了下,大意是佇列物件不能在父程序與子程序間通訊,這個如果想要使用程序池中使用佇列則要使用multiprocess的Manager類
if __name__=='__main__': manager = multiprocessing.Manager() # 父程序建立Queue,並傳給各個子程序: q = manager.Queue() p = Pool() pw = p.apply_async(write,args=(q,)) time.sleep(0.5) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有資料都寫入並且讀完'
這樣這個佇列物件就可以在父程序與子程序間通訊,不用池則不需要Manager,以後再擴充套件multiprocess中的Manager類吧
關於鎖的應用,在不同程式間如果有同時對同一個佇列操作的時候,為了避免錯誤,可以在某個函式操作佇列的時候給它加把鎖,這樣在同一個時間內則只能有一個子程序對佇列進行操作,鎖也要在manager物件中的鎖
#coding:gbk from multiprocessing import Process,Queue,Pool import multiprocessing import os, time, random # 寫資料程序執行的程式碼: def write(q,lock): lock.acquire() #加上鎖 for value in ['A', 'B', 'C']: print 'Put %s to queue...' % value q.put(value) lock.release() #釋放鎖 # 讀資料程序執行的程式碼: def read(q): while True: if not q.empty(): value = q.get(False) print 'Get %s from queue.' % value time.sleep(random.random()) else: break if __name__=='__main__': manager = multiprocessing.Manager() # 父程序建立Queue,並傳給各個子程序: q = manager.Queue() lock = manager.Lock() #初始化一把鎖 p = Pool() pw = p.apply_async(write,args=(q,lock)) pr = p.apply_async(read,args=(q,)) p.close() p.join() print print '所有資料都寫入並且讀完'