Python並行程式設計(十一):基於程序的並行
1、基本概念
多程序主要用multiprocessing和mpi4py這兩個模組。
multiprocessing是Python標準庫中的模組,實現了共享記憶體機制,可以讓執行在不同處理器核心的程序能讀取共享記憶體。
mpi4py庫實現了訊息傳遞的程式設計範例(設計模式)。簡單來說就是程序之間不靠任何共享資訊來進行通訊,所有的交流都通過傳遞資訊代替。
這與使用共享記憶體通訊、加鎖或類似機制實現互斥的技術形成對比。在資訊傳遞的程式碼中,程序通過send和receive進行交流。
2、建立一個程序
由父程序建立子程序。父程序既可以在產生子程序之後繼續非同步執行,也可以暫停等待子程序建立完成之後再繼續執行。建立程序的步驟如下:
1. 建立程序物件
2. 呼叫start()方法,開啟程序的活動
3. 呼叫join()方法,在程序結束之前一直等待
3、建立程序用例
# coding : utf-8 import multiprocessing def foo(i):print('called function in process: %s' %i) return if __name__ == '__main__': Process_jobs = [] for i in range(5): p = multiprocessing.Process(target=foo, args=(i, )) Process_jobs.append(p) p.start() p.join()
執行結果:
建立程序物件的時候需要分配一個函式,作為程序的執行任務,本例為foo()。最後程序物件呼叫join()方法,如果沒有join主程序退出之後子程序會留在idle中。
提示:為了預防無限遞迴呼叫,可以在不同指令碼檔案中定義目標函式,然後匯入進來使用。
4、程序命名
程序命名和執行緒命名大同小異。
使用示例:
# coding:utf-8 import multiprocessing import time def foo(): # get name of process name = multiprocessing.current_process().name print("Starting %s \n" %name) time.sleep(3) print("Exiting %s \n" %name) if __name__ == '__main__': # create process with DIY name process_with_name = multiprocessing.Process(name='foo_process', target=foo) # process_with_name.daemon = True # create process with default name process_with_default_name = multiprocessing.Process(target=foo) process_with_name.start() process_with_default_name.start()
5、殺死一個程序
通過terminate方法殺死一個程序,也可以使用is_alive方法判斷一個程序是否存活。
測試用例:
import multiprocessing, time def foo(): print('Starting function') time.sleep(0.1) print('Finished function') if __name__ == '__main__': p = multiprocessing.Process(target=foo) print('Process before execution:', p, p.is_alive()) p.start() print('Process running:', p, p.is_alive()) p.terminate() print('Process terminated:', p, p.is_alive()) p.join() print('Process joined:', p, p.is_alive()) print('Process exit code:',p.exitcode)
執行結果:
正常結束返回值為0,且foo會被執行。exitcode為0為正常結束,為負表示訊號殺死,大於0程序有錯誤。
6、子類中使用程序
實現一個自定義的程序子類,需要以下三步:
- 定義Process子類
- 覆蓋__init__(self [,args])方法來新增額外的引數
- 覆蓋run方法來實現Process啟動的時候執行的任務
建立Process子類之後,可以建立它的例項。並且通過start方法啟動它,啟動之後會執行run方法。
測試用例:
# coding:utf-8 import multiprocessing class MyProcess(multiprocessing.Process): def run(self): print('called run method in process: %s' %self.name) return if __name__ == '__main__': jobs = [] for i in range(5): p = MyProcess() jobs.append(p) p.start() p.join()
執行結果:
7、程序之間交換物件
並行應用常常需要在程序之間交換資料。Multiprocessing庫有兩個Communication Channel可以交換物件:佇列queue和管道pipe。
使用佇列交換物件:
Queue返回一個程序共享的佇列,是執行緒安全的,也是程序安全的。任何可序列化的物件(Python通過pickable模組序列化物件)都可以通過它進行交換。
測試用例:
import multiprocessing import random import time class Producer(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def run(self): for i in range(10): item = random.randint(0,256) self.queue.put(item) print("Process Producer:item %d appended to queue %s" %(item, self.name)) time.sleep(1) print("The size of queue is %s" % self.queue.qsize()) class Consumer(multiprocessing.Process): def __init__(self, queue): multiprocessing.Process.__init__(self) self.queue = queue def run(self): while True: if self.queue.empty(): print("The queue is empty") break else: time.sleep(2) item = self.queue.get() print("Process Consumer:item %d popped from by %s \n" %(item, self.name)) time.sleep(1) if __name__ == "__main__": # create Queue in the main process queue = multiprocessing.Queue() # create process_producer = Producer(queue) process_consumer = Consumer(queue) process_producer.start() process_consumer.start() process_producer.join() process_consumer.join()
執行結果:
Process Producer:item 106 appended to queue Producer-1 The size of queue is 1 Process Producer:item 167 appended to queue Producer-1 The size of queue is 2 Process Producer:item 202 appended to queue Producer-1 Process Consumer:item 106 popped from by Consumer-2 The size of queue is 2 Process Producer:item 124 appended to queue Producer-1 The size of queue is 3 Process Producer:item 19 appended to queue Producer-1 The size of queue is 4 Process Producer:item 5 appended to queue Producer-1 Process Consumer:item 167 popped from by Consumer-2 The size of queue is 4 Process Producer:item 178 appended to queue Producer-1 The size of queue is 5 Process Producer:item 207 appended to queue Producer-1 The size of queue is 6 Process Producer:item 154 appended to queue Producer-1 Process Consumer:item 202 popped from by Consumer-2 The size of queue is 6 Process Producer:item 228 appended to queue Producer-1 The size of queue is 7 Process Consumer:item 124 popped from by Consumer-2 Process Consumer:item 19 popped from by Consumer-2 Process Consumer:item 5 popped from by Consumer-2 Process Consumer:item 178 popped from by Consumer-2 Process Consumer:item 207 popped from by Consumer-2 Process Consumer:item 154 popped from by Consumer-2 Process Consumer:item 228 popped from by Consumer-2 The queue is empty
佇列補充:
佇列還有一個JoinaleQueue子類,有以下兩個額外的方法:
- task_done():此方法意味著之前入隊的一個任務已經完成,比如,get方法從佇列取回item之後呼叫。所以此方法只能被佇列的消費者呼叫。
- join():此方法將程序阻塞,直到佇列中的item全部被取出並執行。
因為使用佇列進行通訊是一個單向的、不確定的過程,所以你不知道什麼時候佇列的元素被取出來了,所以使用task_done來表示佇列裡的一個任務已經完成,這個方法一般和join一起使用,當佇列的所有任務都處理之後,也就是說put到佇列的每個任務都呼叫task_done方法後,join才會完成阻塞。
JoinaleQueue測試用例:
from multiprocessing import Process, JoinableQueue import time,random def consumer(name, q): while True: time.sleep(1) get_res = q.get() print("%s got %s" %(name, get_res)) q.task_done() def producer(seq, q): for item in seq: # time.sleep(1) q.put(item) print("Produced %s" %item) # block main process and don't run "print("Ended")" q.join() if __name__ == "__main__": q = JoinableQueue() seq = ("item-%s" %i for i in range(10)) c1 = Process(target=consumer, args=("c1", q)) c2 = Process(target=consumer, args=("c2", q)) c3 = Process(target=consumer, args=("c3", q)) c1.daemon = True c2.daemon = True c3.daemon = True c1.start() c2.start() c3.start() # start producer producer(seq,q) # run the command when all the item is consumed print("Ended")
使用管道交換物件:
一個管道可以做一下事情:
- 返回一對被管道連線的連線物件
- 然後物件使用send/receive方法可以在程序之間通訊
簡單示例:
import multiprocessing def create_items(pipe): output_pipe, _ = pipe for item in range(10): output_pipe.send(item) output_pipe.close() def multiply_items(pipe_1, pipe_2): close, input_pipe = pipe_1 close.close() output_pipe, _ = pipe_2 try: while True: item = input_pipe.recv() output_pipe.send(item * item) except EOFError: output_pipe.close() if __name__ == "__main__": # The first pipe sends numbers pipe_1 = multiprocessing.Pipe(True) process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,)) process_pipe_1.start() # The second pipe receives numbers and Calculations pipe_2 = multiprocessing.Pipe(True) process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2)) process_pipe_2.start() pipe_1[0].close() pipe_2[0].close() try: while True: # print(pipe_2) print(pipe_2[1].recv()) except EOFError: print("End")
上述程式碼定義兩個程序,一個傳送數字0-9到管道pipe_1,另一個程序通過receive獲取pipe_1的數字,並進行平方,然後將結果輸出到管道pipe_2中。最後通過recv獲取pipe_2的資料。