1. 程式人生 > >multiprocessing在python中的高階應用-共享資料與同步

multiprocessing在python中的高階應用-共享資料與同步

通常,程序之間彼此是完全孤立的,唯一的通訊方式是佇列或管道。但可以使用兩個物件來表示共享資料。其實,這些物件使用了共享記憶體(通過mmap模組)使訪問多個程序成為可能。

Value( typecode, arg1, … argN, lock )
在共享內容中常見ctypes物件。typecode要麼是包含array模組使用的相同型別程式碼(如’i’,’d’等)的字串,要麼是來自ctypes模組的型別物件(如ctypes.c_int、ctypes.c_double等)。所有額外的位置引數arg1, arg2 ….. argN將傳遞給指定型別的建構函式。lock是隻能使用關鍵字呼叫的引數,如果把它置為True(預設值),將建立一個新的鎖定來包含對值的訪問。如果傳入一個現有鎖定,比如Lock或RLock例項,該鎖定將用於進行同步。如果v是Value建立的共享值的例項,便可使用v.value訪問底層的值。例如,讀取v.value將獲取值,而賦值v.value將修改值。

RawValue( typecode, arg1, … ,argN)
同Value物件,但不存在鎖定。

Array( typecode, initializer, lock )
在共享記憶體中建立ctypes陣列。typecode描述了陣列的內容,意義與Value()函式中的相同。initializer要麼是設定陣列初始大小的整數,要麼是專案序列,其值和大小用於初始化陣列。lock是隻能使用關鍵字呼叫的引數,意義與Value()函式中相同。如果a是Array建立的共享陣列的例項,便可使用標準的python索引、切片和迭代操作訪問它的內容,其中每種操作均由鎖定進行同步。對於位元組字串,a還具有a.value屬性,可以吧整個陣列當做一個字串進行訪問。

RawArray(typecode, initializer )
同Array物件,但不存在鎖定。當所編寫的程式必須一次性操作大量的陣列項時,如果同時使用這種資料型別和用於同步的單獨鎖定(如果需要的話),效能將得到極大的提升。
除了使用Value()和Array()建立的共享值之外,multiprocessing模組還提供一下同步源於的共享版本。
這裡寫圖片描述
這些物件的行為與threading模組中定義的名稱相同的同步原語相似。請參考threading文件瞭解更多細節。
應該注意,使用多程序後,通常不必再擔心與鎖定、訊號量或類似構造的底層同步,這一點與執行緒不相伯仲。在某種程度上,管道上的send()和receive()操作,以及佇列上的put()和get()操作已經提供了同步功能。但是,在某寫特定的設定下還是需要用到共享值和鎖定。下面這個例子說明了如何使用共享陣列代替管道,將一個浮點數的python列表傳送給另一個程序:

import multiprocessing
class FloatChannel(object):
    def __init__(self,maxsize):
        self.buffer=multiprocessing.RawArray('d',maxsize)
        self.buffer_len=multiprocessing.Value('i')
        self.empty=multiprocessing.Semaphore(1)
        self.full=multiprocessing.Semaphore(0)
    def send(self,values):
        self.empty.acquire()  #只在快取為空時繼續
        nitems=len(values)  
        self.buffer_len=nitems  #設定緩衝區大小
        self.buffer[:nitems]=values #將複製到緩衝區中
        self.full.release() #發訊號通知緩衝區已滿
    def recv(self):
        self.full.acquire()     #只在緩衝區已滿時繼續
        values=self.buffer[:self.buffer_len.value]  #複製值
        self.empty.release()        #傳送訊號通知緩衝區為空
        return values
    #效能測試 接收多條訊息
def consume_test(count,ch):
    for i in xrange(count):
        values=ch.recv()

#效能測試 傳送多條訊息
def produce_test(count,values,ch):
    for i in xrange(count):
        ch.send(values)
if __name__=="__main__":
    ch=FloatChannel(100000)
    p=multiprocessing.Process(target=consume_test,args=(1000,ch))
    p.start()
    values=[float(x) for x in xrange(100000)]
    produce_test(1000,values,ch)
    print "Done"
    p.join()

在我的計算機上執行效能測試時,通過FloatChannel傳送一個較大的浮點數列表,速度比通過Pipe傳送快大約80%,因為後者必須對所有值進行序列化和反序列化。