1. 程式人生 > >Python並行程式設計(十三):程序池和mpi4py模組

Python並行程式設計(十三):程序池和mpi4py模組

1、基本概念

      多程序庫提供了Pool類來實現簡單的多程序任務。Pool類有以下方法:

      - apply():直到得到結果之前一直阻塞。

      - apply_async():這是apply()方法的一個變體,返回的是一個result物件。這是一個非同步的操作,在所有的子類執行之前不會鎖住主程序。

      - map():這是內建的map函式的並行版本,在得到結果之前一直阻塞,此方法將可迭代的資料的每一個元素作為程序池的一個任務來執行。

      - map_async():這是map的一個變體,返回一個result物件。如果指定了回撥函式,回撥函式應該是callable的,並且只接受一個引數。當result準備好時,會自動呼叫回撥函式,除非呼叫失敗。回撥函式應該立即完成,否則,持有result的程序將被阻塞。

2、測試用例

      建立四個程序池,然後使用map方法進行一個簡單的計算。

import multiprocessing

def function_square(data):
    result = data * data
    return result

if __name__ == "__main__":
    inputs = list(range(100))
    pool = multiprocessing.Pool(processes=4)
    pool_outputs = pool.map(function_square, inputs)
    pool.close()
    pool.join()
    
print("pool: ", pool_outputs)

      pool.map方法將一些獨立的任務提交給程序池。pool.map和內建map的執行結果相同,但pool.map是通過多個並行程序計算的。

3、mpi4py模組

      Python提供了很多MPI模組寫並行程式。其中mpi4py在MPI-1/2頂層構建,提供了面向物件的介面,緊跟C++繫結的MPI-2。MPI是C語言使用者可以無需學習新的介面就可以使用這個庫。

      此模組包含的主要的應用:

      - 點對點通訊

      - 集體通訊

      - 拓撲

4、安裝mpi4py

      安裝mpich:https://www.microsoft.com/en-us/download/confirmation.aspx?id=56727

      下載並安裝msmpisetup.exe

       安裝完成後安裝目錄如下:

       

      將bin目錄新增到系統環境中:

      

      用cmd輸入並顯示如下即為安裝成功

      

      安裝mpi4py

      pip install mpi4py

      MPI測試用例

from mpi4py import MPI

def mpi_test(rank):
    print("I am rank %s" %rank)


if __name__ == "__main__":

    comm = MPI.COMM_WORLD
    rank = comm.Get_rank()
    mpi_test(rank)
    print("Hello world from process", rank)

      使用mpi執行檔案

     

      在MPI中,並行程式中不同程序用一個非負整數來區別,如果我們有P個程序,那麼rank會從0到P-1分配。

      MPI拿到rank的函式如下:rank = comm.Get_rank()

      這個函式返回呼叫它的程序的rank,comm叫做交流者,用於區別不同的程序集合:comm = MPI.COMM_WORLD

 5、MPI點對點通訊

      MPI提供的最實用的一個特性是點對點通訊。兩個不同的程序之間可以通過點對點通訊交換資料:一個程序是接收者,一個程序是傳送者。

      Python的mpi4py通過下面兩個函式提供了點對點通訊功能:

      - Comm.Send(data, process_destination):通過它在交流組中的排名來區分發送給不同程序的資料。

      - Comm.Recv(process_source):接收來自源程序的資料,也是通過在交流組中的排名來分分的。

      Comm變量表示交流著,定義了可以互相通訊的程序組:

      comm  = MKPI.COMM_WORLD

      交換資訊測試用例: 

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("My rank is :",rank)

if rank == 0:
    data = 10000000
    destination_process = 4
    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" %(data, destination_process))

if rank == 1:
    destination_process = 8
    data = "hello,I am rank 1"

    comm.send(data, dest=destination_process)
    print("sending data %s to process %d" %(data, destination_process))

if rank == 4:
    data = comm.recv(source=0)
    print("data received is = %s" %data)

if rank == 8:
    data1 = comm.recv(source=1)
    print("data received is = %s" %data1)

      執行結果:

      

      通過mpiexec -n 9執行9個互相通訊的程序,使用rank的值來區分每個程序。

      整個過程分為兩部分,傳送者傳送資料,接收者接收資料,二者必須都指定傳送方/接收方,source=為指定傳送者。如果有傳送的資料沒有被接收,程式會阻塞。

      comm.send()和comm.recv()函式都是阻塞的函式,他們會一直阻塞呼叫者,直到資料使用完成,同時在MPI中,有兩種方式傳送和接收資料:

      - buffer模式

      - 同步模式

      在buffer模式中,只要需要傳送的資料被拷貝到buffer中,執行權就會交回到主程式,此時資料並非已經發送/接收完成。在同步模式中,只有函式真正的結束髮送/接收任務之後才會返回。

6、避免死鎖

      mpi4py沒有提供特定的功能來解決這種情況,但是提供了一些程式設計師必須遵守的規則來避免死鎖的問題。

      出現死鎖的情況:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

      執行結果:

      

      程序1和程序5產生阻塞,程式阻塞。

      此時兩個程序都在等待對方,發生阻塞,因為recv和send都是阻塞的,兩個函式都先使用的recv,所以呼叫者都在等待他們完成。所以講上述程式碼改為如下即可解決阻塞:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    comm.send(data_send, dest=destination_process)
    data_received = comm.recv(source=source_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    data_received = comm.recv(source=source_process)
    comm.send(data_send, dest=destination_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

      將其中一個函式的recv和send順序調換。

      執行結果:

      

      也可通過Sendrecv函式解決,程式碼如下:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.rank
print("my rank is :",rank)

if rank == 1:
    data_send = "a"
    destination_process = 5
    source_process = 5
    # comm.send(data_send, dest=destination_process)
    # data_received = comm.recv(source=source_process)
    data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)

    print("sending data %s to process %d" %(data_send, destination_process))
    print("data received is = %s" %data_received)

if rank == 5:
    data_send = "b"
    destination_process = 1
    source_process = 1
    # data_received = comm.recv(source=source_process)
    # comm.send(data_send, dest=destination_process)
    data_received = comm.sendrecv(data_send, dest=destination_process, source=source_process)
    print("sending data %s to process %d" % (data_send, destination_process))
    print("data received is = %s" % data_received)

      執行結果:

      

7、集體通訊:Broadcast

      在並行程式碼的開發中,會經常需要在多個程序間共享某個變數執行時的值,或操作多個程序提供的變數。MPI庫提供了在多個程序之間交換資訊的方法,將所有程序變成通訊者的這種方法叫做集體交流。因此,一個集體交流通常是2個以上的程序,也可以稱為廣播——一個程序將訊息傳送給其他程序。mpi4py模組通過以下方式提供廣播的功能:

buf = comm.bcast(data_to_share, rank_of_root_process)

      這個函式將root訊息中包含的資訊傳送給屬於comm通訊組其他的程序,每個程序必須通過相同的root和comm來呼叫它。

      

      測試程式碼:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

if rank == 0:
    variable_to_share = 100
else:
    variable_to_share = None

variable_to_share = comm.bcast(variable_to_share, root=0)
print("process = %d  variable shared = %d" %(rank, variable_to_share))

      執行結果:

      

      rank等於0的root程序初始化了一個變數,variable_to_share,值為100,然後聲明瞭一個廣播variable_to_share = comm.bcast(variable_to_share, root=0)

      這個變數將通過通訊組傳送給其他程序。

      集體通訊允許組中的多個程序同時進行資料交流。在mpi4py模組中,只提供了阻塞版本的集體通訊(阻塞呼叫者,直到快取中的資料全部安全傳送。)

      廣泛應用的集體通訊應該是:

            - 組中的程序提供通訊的屏障

            - 通訊方式包括:

                  - 將一個程序的資料廣播到組中其他程序中

                  - 從其他程序收集資料發給一個程序

                  - 從一個程序散播資料到其他程序中

            - 減少操作

 8、集體通訊:Scatter

      scatter函式和廣播很像,但是不同的是comm.bcast將相同的資料傳送給所有在監聽的程序,comm.scatter可以將資料放在資料中,傳送給不同的程序。

      

      comm.scatter函式接收一個array,根據程序的rank將其中的元素髮給不同的程序,第一個元素髮送給程序0,第二個元素髮給程序1,以此類推。

      測試用例:

from mpi4py import MPI

comm = MPI.COMM_WORLD
rank = comm.Get_rank()

# array_to_share = ["a","b","c","d","e","f","g","h","i","j"]
if rank == 0:
    array_to_share = [0,1,2,3,4,5,6,7,8,9]
else:
    array_to_share = None

recvbuf = comm.scatter(array_to_share, root=0)
print("Process = %d  recvbuf = %s" %(rank, recvbuf))

      執行結果:

      

      注意:列表中的元素個數,需要個程序保持一致。否則會出現如下錯誤。

      

 9、集體通訊:gather

      gather函式基本上是反向的scatter,即收集所有程序傳送到root程序資料。方法如下:

recvbuf = comm.gather(sendbuf, rank_of_root_process)

      sendbuf是要傳送的資料,rank_of_root_process代表要接收資料的程序。

      

      測試用例:

from mpi4py import MPI


comm = MPI.COMM_WORLD
size = comm.Get_size()
# print(size)
rank = comm.Get_rank()
data = "process %s" %rank
# print("start %s"%data)
data = comm.gather(data, root=0)
# print(data)
if rank == 0:
    print("rank = %s receiving data to other process" %rank)
    for i in range(1, size):
        #data[i] = (i+1) ** 2
        value = data[i]
        print("process %s receiving %s from process %s" %(rank, value, i))
    # print(data)

      執行結果:

      

 10、使用Alltoall通訊

      Alltoall集體通訊結合了scatter和gather的功能。在mpi4py中,有以下三類的Alltoall集體通訊。

      - comm.Alltoall(sendbuf, recvbuf);

      - comm.Alltoallv(sendbuf, recvbuf);

      - comm.Alltoallw(sendbuf, recvbuf);

      Alltoall測試用例:

from mpi4py import MPI
import numpy

comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

a_size = 1

# print("numpy arange: %s" %numpy.arange(size, dtype=int))
senddata = (rank+1)*numpy.arange(size, dtype=int)
recvdata = numpy.empty(size * a_size, dtype=int)
print("senddata is %s , recvdata is %s" %(senddata, recvdata))
# print("Recvdata is %s: , \n numpy.empty is %s" %(recvdata, numpy.empty(size * a_size, dtype=int)))

comm.Alltoall(senddata, recvdata)
print("process %s sending %s, receiving %s" %(rank, senddata, recvdata))

      執行結果:

      

      comm.alltoall方法將task j的sendbuf的第j個物件拷貝到task i中,recvbuf的第j個物件,一一對應。傳送過程如圖:

      

      可以將左右兩個方格看做xy軸,結果一一對應,如左圖的(0,0)對應的值為0,其對應的有圖的值為右圖的(0,0)也為0。左圖的3,4對應的值為16,右圖(4,3)也為16。

      P0包含的資料[0 1 2 3 4],它將值0賦值給自己,1傳給程序P1,2傳給程序P2,3傳給程序P3,以此類推。

      相同的P1的資料為[0 2 4 6 8] , 它將0傳給P0,2傳給P1,4傳給P2,以此類推。

      All-to-all定製通訊也叫全部交換,這種操作經常用於各種併發演算法中,比如快速傅立葉變換,矩陣變換,樣本排序以及一些資料庫的 Join 操作。

 11、簡化操作

      同comm.gather一樣,comm.reduce接收一個數組,每一個元素是一個程序的輸入,然後返回一個數組,每一個元素是程序的輸出,返回給root程序。輸出的元素包含了簡化的結果。

      簡化定義如下:comm.Reduce(sendbuf, recvbuf, rank_of_root_process, op = type_of_reduction_operation)

      這裡需要注意的是,引數op和comm.gather不同,它代表你想應用在資料上的操作,mpi4py模組代表定義了一系列的簡化操作,包括:

      - MPI.MAX:返回最大的元素

      - MPI.MIN:返回最小的元素

      - MPI.SUM:對所有的元素相加

      - MPI.PROD:對所有元素相乘

      - MPI.LAND:對所有元素進行邏輯操作

      - MPI.MAXLOC:返回最大值,以及擁有它的程序

      - MPI.MINLOC:返回最小值,以及擁有它的程序

      測試用例:

import numpy as np
from mpi4py import MPI

comm = MPI.COMM_WORLD
size = comm.size
rank = comm.rank
array_size = 3
recvdata = np.zeros(array_size, dtype=np.int)
senddata = (rank+1)*np.arange(size, dtype=np.int)
print("+++++++++++++%s+++++++++++++%s++++++++++++" %(recvdata, senddata))
print("Process %s sending %s" %(rank, senddata))
comm.Reduce(senddata, recvdata, root=0, op=MPI.SUM)
print("on task %s, after Reduce: data = %s" %(rank, recvdata))

      執行結果:

      

      MPI.SUM為求和操作,過程如下:

      

      簡化操作將每個task的第i個元素相加,然後放回到P0程序(root程序)的第i個元素中。