1. 程式人生 > >Python並行程式設計(十一):基於程序的並行

Python並行程式設計(十一):基於程序的並行

1、基本概念

      多程序主要用multiprocessing和mpi4py這兩個模組。

      multiprocessing是Python標準庫中的模組,實現了共享記憶體機制,可以讓執行在不同處理器核心的程序能讀取共享記憶體。

      mpi4py庫實現了訊息傳遞的程式設計範例(設計模式)。簡單來說就是程序之間不靠任何共享資訊來進行通訊,所有的交流都通過傳遞資訊代替。

      這與使用共享記憶體通訊、加鎖或類似機制實現互斥的技術形成對比。在資訊傳遞的程式碼中,程序通過send和receive進行交流。

 

2、建立一個程序

      由父程序建立子程序。父程序既可以在產生子程序之後繼續非同步執行,也可以暫停等待子程序建立完成之後再繼續執行。建立程序的步驟如下:

      1. 建立程序物件

      2. 呼叫start()方法,開啟程序的活動

      3. 呼叫join()方法,在程序結束之前一直等待

 

3、建立程序用例

# coding : utf-8

import multiprocessing

def foo(i):
    
print('called function in process: %s' %i) return if __name__ == '__main__': Process_jobs = [] for i in range(5): p = multiprocessing.Process(target=foo, args=(i, )) Process_jobs.append(p) p.start() p.join()

      執行結果:

      

      建立程序物件的時候需要分配一個函式,作為程序的執行任務,本例為foo()。最後程序物件呼叫join()方法,如果沒有join主程序退出之後子程序會留在idle中。

      提示:為了預防無限遞迴呼叫,可以在不同指令碼檔案中定義目標函式,然後匯入進來使用。

 

4、程序命名

      程序命名和執行緒命名大同小異。

      使用示例:

# coding:utf-8

import multiprocessing
import time

def foo():
    # get name of process
    name = multiprocessing.current_process().name
    print("Starting %s \n" %name)
    time.sleep(3)
    print("Exiting %s \n" %name)

if __name__ == '__main__':
    # create process with DIY name
    process_with_name = multiprocessing.Process(name='foo_process', target=foo)
    # process_with_name.daemon = True

    # create process with default name
    process_with_default_name = multiprocessing.Process(target=foo)

    process_with_name.start()
    process_with_default_name.start()

 

5、殺死一個程序

      通過terminate方法殺死一個程序,也可以使用is_alive方法判斷一個程序是否存活。

      測試用例:

import multiprocessing, time

def foo():
    print('Starting function')
    time.sleep(0.1)
    print('Finished function')

if __name__ == '__main__':
    p = multiprocessing.Process(target=foo)
    print('Process before execution:', p, p.is_alive())
    p.start()
    print('Process running:', p, p.is_alive())
    p.terminate()
    print('Process terminated:', p, p.is_alive())
    p.join()
    print('Process joined:', p, p.is_alive())
    print('Process exit code:',p.exitcode)

      執行結果:

      

      正常結束返回值為0,且foo會被執行。exitcode為0為正常結束,為負表示訊號殺死,大於0程序有錯誤。

 

6、子類中使用程序

      實現一個自定義的程序子類,需要以下三步:

      - 定義Process子類

      - 覆蓋__init__(self [,args])方法來新增額外的引數

      - 覆蓋run方法來實現Process啟動的時候執行的任務

      建立Process子類之後,可以建立它的例項。並且通過start方法啟動它,啟動之後會執行run方法。

      測試用例:

# coding:utf-8

import multiprocessing

class MyProcess(multiprocessing.Process):
    def run(self):
        print('called run method in process: %s' %self.name)
        return

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = MyProcess()
        jobs.append(p)
        p.start()
        p.join()

      執行結果:

      

 

7、程序之間交換物件

      並行應用常常需要在程序之間交換資料。Multiprocessing庫有兩個Communication Channel可以交換物件:佇列queue和管道pipe。

      使用佇列交換物件:

            Queue返回一個程序共享的佇列,是執行緒安全的,也是程序安全的。任何可序列化的物件(Python通過pickable模組序列化物件)都可以通過它進行交換。

      測試用例:

import multiprocessing
import random
import time

class Producer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        for i in range(10):
            item = random.randint(0,256)
            self.queue.put(item)
            print("Process Producer:item %d appended to queue %s" %(item, self.name))
            time.sleep(1)
            print("The size of queue is %s" % self.queue.qsize())

class Consumer(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            if self.queue.empty():
                print("The queue is empty")
                break
            else:
                time.sleep(2)
                item = self.queue.get()
                print("Process Consumer:item %d popped from by %s \n" %(item, self.name))
                time.sleep(1)

if __name__ == "__main__":
    # create Queue in the main process
    queue = multiprocessing.Queue()
    # create 
    process_producer = Producer(queue)
    process_consumer = Consumer(queue)

    process_producer.start()
    process_consumer.start()
    process_producer.join()
    process_consumer.join()

      執行結果:

Process Producer:item 106 appended to queue Producer-1
The size of queue is 1
Process Producer:item 167 appended to queue Producer-1
The size of queue is 2
Process Producer:item 202 appended to queue Producer-1
Process Consumer:item 106 popped from by Consumer-2 

The size of queue is 2
Process Producer:item 124 appended to queue Producer-1
The size of queue is 3
Process Producer:item 19 appended to queue Producer-1
The size of queue is 4
Process Producer:item 5 appended to queue Producer-1
Process Consumer:item 167 popped from by Consumer-2 

The size of queue is 4
Process Producer:item 178 appended to queue Producer-1
The size of queue is 5
Process Producer:item 207 appended to queue Producer-1
The size of queue is 6
Process Producer:item 154 appended to queue Producer-1
Process Consumer:item 202 popped from by Consumer-2 

The size of queue is 6
Process Producer:item 228 appended to queue Producer-1
The size of queue is 7
Process Consumer:item 124 popped from by Consumer-2 

Process Consumer:item 19 popped from by Consumer-2 

Process Consumer:item 5 popped from by Consumer-2 

Process Consumer:item 178 popped from by Consumer-2 

Process Consumer:item 207 popped from by Consumer-2 

Process Consumer:item 154 popped from by Consumer-2 

Process Consumer:item 228 popped from by Consumer-2 

The queue is empty

      佇列補充:

            佇列還有一個JoinaleQueue子類,有以下兩個額外的方法:

            - task_done():此方法意味著之前入隊的一個任務已經完成,比如,get方法從佇列取回item之後呼叫。所以此方法只能被佇列的消費者呼叫。

            - join():此方法將程序阻塞,直到佇列中的item全部被取出並執行。

            因為使用佇列進行通訊是一個單向的、不確定的過程,所以你不知道什麼時候佇列的元素被取出來了,所以使用task_done來表示佇列裡的一個任務已經完成,這個方法一般和join一起使用,當佇列的所有任務都處理之後,也就是說put到佇列的每個任務都呼叫task_done方法後,join才會完成阻塞。

            JoinaleQueue測試用例:

from multiprocessing import Process, JoinableQueue
import time,random
def consumer(name, q):
    while True:
        time.sleep(1)
        get_res = q.get()
        print("%s got %s" %(name, get_res))
        q.task_done()

def producer(seq, q):
    for item in seq:
        # time.sleep(1)
        q.put(item)
        print("Produced %s" %item)
    # block main process and don't run "print("Ended")"
    q.join()

if __name__ == "__main__":
    q = JoinableQueue()
    seq = ("item-%s" %i for i in range(10))

    c1 = Process(target=consumer, args=("c1", q))
    c2 = Process(target=consumer, args=("c2", q))
    c3 = Process(target=consumer, args=("c3", q))

    c1.daemon = True
    c2.daemon = True
    c3.daemon = True

    c1.start()
    c2.start()
    c3.start()

    # start producer
    producer(seq,q)

    # run the command when all the item is consumed
    print("Ended")

      使用管道交換物件:

      一個管道可以做一下事情:

      - 返回一對被管道連線的連線物件

      - 然後物件使用send/receive方法可以在程序之間通訊

      簡單示例:

import multiprocessing

def create_items(pipe):
    output_pipe, _ = pipe
    for item in range(10):
        output_pipe.send(item)
    output_pipe.close()

def multiply_items(pipe_1, pipe_2):
    close, input_pipe = pipe_1
    close.close()
    output_pipe, _ = pipe_2
    try:
        while True:
            item = input_pipe.recv()
            output_pipe.send(item * item)
    except EOFError:
        output_pipe.close()


if __name__ == "__main__":
    # The first pipe sends numbers
    pipe_1 = multiprocessing.Pipe(True)
    process_pipe_1 = multiprocessing.Process(target=create_items, args=(pipe_1,))
    process_pipe_1.start()

    # The second pipe receives numbers and Calculations
    pipe_2 = multiprocessing.Pipe(True)
    process_pipe_2 = multiprocessing.Process(target=multiply_items, args=(pipe_1, pipe_2))
    process_pipe_2.start()

    pipe_1[0].close()
    pipe_2[0].close()

    try:
        while True:
            # print(pipe_2)
            print(pipe_2[1].recv())
    except EOFError:
        print("End")

      上述程式碼定義兩個程序,一個傳送數字0-9到管道pipe_1,另一個程序通過receive獲取pipe_1的數字,並進行平方,然後將結果輸出到管道pipe_2中。最後通過recv獲取pipe_2的資料。