Python並行程式設計(十三):程序池和mpi4py模組
1、基本概念
多程序庫提供了Pool類來實現簡單的多程序任務。Pool類有以下方法:
- apply():直到得到結果之前一直阻塞。
- apply_async():這是apply()方法的一個變體,返回的是一個result物件。這是一個非同步的操作,在所有的子類執行之前不會鎖住主程序。
- map():這是內建的map函式的並行版本,在得到結果之前一直阻塞,此方法將可迭代的資料的每一個元素作為程序池的一個任務來執行。
- map_async():這是map的一個變體,返回一個result物件。如果指定了回撥函式,回撥函式應該是callable的,並且只接受一個引數。當result準備好時,會自動呼叫回撥函式,除非呼叫失敗。回撥函式應該立即完成,否則,持有result的程序將被阻塞。
2、測試用例
建立四個程序池,然後使用map方法進行一個簡單的計算。
import multiprocessing def function_square(data): result = data * data return result if __name__ == "__main__": inputs = list(range(100)) pool = multiprocessing.Pool(processes=4) pool_outputs = pool.map(function_square, inputs) pool.close() pool.join()print("pool: ", pool_outputs)
pool.map方法將一些獨立的任務提交給程序池。pool.map和內建map的執行結果相同,但pool.map是通過多個並行程序計算的。
3、mpi4py模組
Python提供了很多MPI模組寫並行程式。其中mpi4py在MPI-1/2頂層構建,提供了面向物件的介面,緊跟C++繫結的MPI-2。MPI是C語言使用者可以無需學習新的介面就可以使用這個庫。
此模組包含的主要的應用:
- 點對點通訊
- 集體通訊
- 拓撲
4、安裝mpi4py
安裝mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727
下載並安裝msmpisetup.exe
安裝完成後安裝目錄如下:
將bin目錄新增到系統環境中:
用cmd輸入並顯示如下即為安裝成功
安裝mpi4py
pip install mpi4py
MPI測試用例
from mpi4py import MPI def mpi_test(rank): print("I am rank %s" %rank) if __name__ == "__main__": comm = MPI.COMM_WORLD rank = comm.Get_rank() mpi_test(rank) print("Hello world from process", rank)
使用mpi執行檔案
在MPI中,並行程式中不同程序用一個非負整數來區別,如果我們有P個程序,那麼rank會從0到P-1分配。
MPI拿到rank的函式如下:rank = comm.Get_rank()
這個函式返回呼叫它的程序的rank,comm叫做交流者,用於區別不同的程序集合:comm = MPI.COMM_WORLD
5、MPI點對點通訊
MPI提供的最實用的一個特性是點對點通訊。兩個不同的程序之間可以通過點對點通訊交換資料:一個程序是接收者,一個程序是傳送者。
Python的mpi4py通過下面兩個函式提供了點對點通訊功能:
- Comm.Send(data, process_destination):通過它在交流組中的排名來區分發送給不同程序的資料。
- Comm.Recv(process_source):接收來自源程序的資料,也是通過在交流組中的排名來分分的。
Comm變量表示交流著,定義了可以互相通訊的程序組:
comm = MKPI.COMM_WORLD
交換資訊測試用例:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("My rank is :",rank) if rank == 0: data = 10000000 destination_process = 4 comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process)) if rank == 1: destination_process = 8 data = "hello,I am rank 1" comm.send(data, dest=destination_process) print("sending data %s to process %d" %(data, destination_process)) if rank == 4: data = comm.recv(source=0) print("data received is = %s" %data) if rank == 8: data1 = comm.recv(source=1) print("data received is = %s" %data1)
執行結果:
通過mpiexec -n 9執行9個互相通訊的程序,使用rank的值來區分每個程序。
整個過程分為兩部分,傳送者傳送資料,接收者接收資料,二者必須都指定傳送方/接收方,source=為指定傳送者。如果有傳送的資料沒有被接收,程式會阻塞。
comm.send()和comm.recv()函式都是阻塞的函式,他們會一直阻塞呼叫者,直到資料使用完成,同時在MPI中,有兩種方式傳送和接收資料:
- buffer模式
- 同步模式
在buffer模式中,只要需要傳送的資料被拷貝到buffer中,執行權就會交回到主程式,此時資料並非已經發送/接收完成。在同步模式中,只有函式真正的結束髮送/接收任務之後才會返回。
6、避免死鎖
mpi4py沒有提供特定的功能來解決這種情況,但是提供了一些程式設計師必須遵守的規則來避免死鎖的問題。
出現死鎖的情況:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
執行結果:
程序1和程序5產生阻塞,程式阻塞。
此時兩個程序都在等待對方,發生阻塞,因為recv和send都是阻塞的,兩個函式都先使用的recv,所以呼叫者都在等待他們完成。所以講上述程式碼改為如下即可解決阻塞:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 comm.send(data_send, dest=destination_process) data_received = comm.recv(source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 data_received = comm.recv(source=source_process) comm.send(data_send, dest=destination_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
將其中一個函式的recv和send順序調換。
執行結果:
也可通過Sendrecv函式解決,程式碼如下:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.rank print("my rank is :",rank) if rank == 1: data_send = "a" destination_process = 5 source_process = 5 # comm.send(data_send, dest=destination_process) # data_received = comm.recv(source=source_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" %(data_send, destination_process)) print("data received is = %s" %data_received) if rank == 5: data_send = "b" destination_process = 1 source_process = 1 # data_received = comm.recv(source=source_process) # comm.send(data_send, dest=destination_process) data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process) print("sending data %s to process %d" % (data_send, destination_process)) print("data received is = %s" % data_received)
執行結果:
7、集體通訊:Broadcast
在並行程式碼的開發中,會經常需要在多個程序間共享某個變數執行時的值,或操作多個程序提供的變數。MPI庫提供了在多個程序之間交換資訊的方法,將所有程序變成通訊者的這種方法叫做集體交流。因此,一個集體交流通常是2個以上的程序,也可以稱為廣播——一個程序將訊息傳送給其他程序。mpi4py模組通過以下方式提供廣播的功能:
buf = comm.bcast(data_to_share, rank_of_root_process)
這個函式將root訊息中包含的資訊傳送給屬於comm通訊組其他的程序,每個程序必須通過相同的root和comm來呼叫它。
測試程式碼:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() if rank == 0: variable_to_share = 100 else: variable_to_share = None variable_to_share = comm.bcast(variable_to_share, root=0) print("process = %d variable shared = %d" %(rank, variable_to_share))
執行結果:
rank等於0的root程序初始化了一個變數,variable_to_share,值為100,然後聲明瞭一個廣播variable_to_share = comm.bcast(variable_to_share, root=0)
這個變數將通過通訊組傳送給其他程序。
集體通訊允許組中的多個程序同時進行資料交流。在mpi4py模組中,只提供了阻塞版本的集體通訊(阻塞呼叫者,直到快取中的資料全部安全傳送。)
廣泛應用的集體通訊應該是:
- 組中的程序提供通訊的屏障
- 通訊方式包括:
- 將一個程序的資料廣播到組中其他程序中
- 從其他程序收集資料發給一個程序
- 從一個程序散播資料到其他程序中
- 減少操作
8、集體通訊:Scatter
scatter函式和廣播很像,但是不同的是comm.bcast將相同的資料傳送給所有在監聽的程序,comm.scatter可以將資料放在資料中,傳送給不同的程序。
comm.scatter函式接收一個array,根據程序的rank將其中的元素髮給不同的程序,第一個元素髮送給程序0,第二個元素髮給程序1,以此類推。
測試用例:
from mpi4py import MPI comm = MPI.COMM_WORLD rank = comm.Get_rank() # array_to_share = ["a","b","c","d","e","f","g","h","i","j"] if rank == 0: array_to_share = [0,1,2,3,4,5,6,7,8,9] else: array_to_share = None recvbuf = comm.scatter(array_to_share, root=0) print("Process = %d recvbuf = %s" %(rank, recvbuf))
執行結果:
注意:列表中的元素個數,需要個程序保持一致。否則會出現如下錯誤。
9、集體通訊:gather
gather函式基本上是反向的scatter,即收集所有程序傳送到root程序資料。方法如下:
recvbuf = comm.gather(sendbuf, rank_of_root_process)
sendbuf是要傳送的資料,rank_of_root_process代表要接收資料的程序。
測試用例:
from mpi4py import MPI comm = MPI.COMM_WORLD size = comm.Get_size() # print(size) rank = comm.Get_rank() data = "process %s" %rank # print("start %s"%data) data = comm.gather(data, root=0) # print(data) if rank == 0: print("rank = %s receiving data to other process" %rank) for i in range(1, size): #data[i] = (i+1) ** 2 value = data[i] print("process %s receiving %s from process %s" %(rank, value, i)) # print(data)
執行結果:
10、使用Alltoall通訊
Alltoall集體通訊結合了scatter和gather的功能。在mpi4py中,有以下三類的Alltoall集體通訊。
- comm.Alltoall(sendbuf, recvbuf);
- comm.Alltoallv(sendbuf, recvbuf);
- comm.Alltoallw(sendbuf, recvbuf);
Alltoall測試用例:
from mpi4py import MPI import numpy comm = MPI.COMM_WORLD size = comm.Get_size() rank = comm.Get_rank() a_size = 1 # print("numpy arange: %s" %numpy.arange(size, dtype=int)) senddata = (rank+1)*numpy.arange(size, dtype=int) recvdata = numpy.empty(size * a_size, dtype=int) print("senddata is %s , recvdata is %s" %(senddata, recvdata)) # print("Recvdata is %s: , \n numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int))) comm.Alltoall(senddata, recvdata) print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))
執行結果:
comm.alltoall方法將task j的sendbuf的第j個物件拷貝到task i中,recvbuf的第j個物件,一一對應。傳送過程如圖:
可以將左右兩個方格看做xy軸,結果一一對應,如左圖的(0,0)對應的值為0,其對應的有圖的值為右圖的(0,0)也為0。左圖的3,4對應的值為16,右圖(4,3)也為16。
P0包含的資料[0 1 2 3 4],它將值0賦值給自己,1傳給程序P1,2傳給程序P2,3傳給程序P3,以此類推。
相同的P1的資料為[0 2 4 6 8] , 它將0傳給P0,2傳給P1,4傳給P2,以此類推。
All-to-all定製通訊也叫全部交換,這種操作經常用於各種併發演算法中,比如快速傅立葉變換,矩陣變換,樣本排序以及一些資料庫的 Join 操作。
11、簡化操作
同comm.gather一樣,comm.reduce接收一個數組,每一個元素是一個程序的輸入,然後返回一個數組,每一個元素是程序的輸出,返回給root程序。輸出的元素包含了簡化的結果。
簡化定義如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)
這裡需要注意的是,引數op和comm.gather不同,它代表你想應用在資料上的操作,mpi4py模組代表定義了一系列的簡化操作,包括:
- MPI.MAX:返回最大的元素
- MPI.MIN:返回最小的元素
- MPI.SUM:對所有的元素相加
- MPI.PROD:對所有元素相乘
- MPI.LAND:對所有元素進行邏輯操作
- MPI.MAXLOC:返回最大值,以及擁有它的程序
- MPI.MINLOC:返回最小值,以及擁有它的程序
測試用例:
import numpy as np from mpi4py import MPI comm = MPI.COMM_WORLD size = comm.size rank = comm.rank array_size = 3 recvdata = np.zeros(array_size, dtype=np.int) senddata = (rank+1)*np.arange(size, dtype=np.int) print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata)) print("Process %s sending %s" %(rank, senddata)) comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM) print("on task %s, after Reduce: data = %s" %(rank, recvdata))
執行結果:
MPI.SUM為求和操作,過程如下:
簡化操作將每個task的第i個元素相加,然後放回到P0程序(root程序)的第i個元素中。