1. 程式人生 > >Python實現程序同步和通訊

Python實現程序同步和通訊

引例:

如之前建立多程序的例子

# -*- coding:utf-8 -*-
from multiprocessing import Process,Pool
import os,time

def run_proc(name):        ##定義一個函式用於程序呼叫
    for i in range(5):    
        time.sleep(0.2)    #休眠0.2秒
        print 'Run child process %s (%s)' % (name, os.getpid())
#執行一次該函式共需1秒的時間

if __name__ =='__main__'
: #執行主程序 print 'Run the main process (%s).' % (os.getpid()) mainStart = time.time() #記錄主程序開始的時間 p = Pool(8) #開闢程序池 for i in range(16): #開闢14個程序 p.apply_async(run_proc,args=('Process'+str(i),))#每個程序都呼叫run_proc函式, #args表示給該函式傳遞的引數。
print 'Waiting for all subprocesses done ...' p.close() #關閉程序池 p.join() #等待開闢的所有程序執行完後,主程序才繼續往下執行 print 'All subprocesses done' mainEnd = time.time() #記錄主程序結束時間 print 'All process ran %0.2f seconds.' % (mainEnd-mainStart) #主程序執行時間

執行結果:

Run the main process (36652).
Waiting for all subprocesses done …
Run child process Process0 (36708)Run child process Process1 (36748)

Run child process Process3 (36736)
Run child process Process2 (36716)
Run child process Process4 (36768)

如第3行的輸出,偶爾會出現這樣不如意的輸入格式,為什麼呢?
原因是多個程序爭用列印輸出資源的結果。前一個程序為來得急輸出換行符,該資源就切換給了另一個程序使用,致使兩個程序輸出在同一行上,而前一個程序的換行符在下一次獲得資源時才打印輸出。

Lock

為了避免這種情況,需在程序進入臨界區(使程序進入臨界資源的那段程式碼,稱為臨界區)時加鎖。
可以向如下這樣新增鎖後看看執行效果:

# -*- coding:utf-8 -*-

lock = Lock()   #申明一個全域性的lock物件
def run_proc(name):
    global lock      #引用全域性鎖
    for i in range(5):
        time.sleep(0.2)
        lock.acquire()  #申請鎖
        print 'Run child process %s (%s)' % (name, os.getpid())
        lock.release()   #釋放鎖

Semaphore

Semaphore為訊號量機制。當共享的資源擁有多個時,可用Semaphore來實現程序同步。其用法和Lock差不多,s = Semaphore(N),每執行一次s.acquire(),該資源的可用個數將減少1,當資源個數已為0時,就進入阻塞;每執行一次s.release(),佔用的資源被釋放,該資源的可用個數增加1。

多程序的通訊(資訊互動)

不同程序之間進行資料互動,可能不少剛開始接觸多程序的同學會想到共享全域性變數的方式,這樣通過向全域性變數寫入和讀取資訊便能實現資訊互動。但是很遺憾,並不能這樣實現。具體原因,看這篇文章。

下面通過例子,加深對那篇文章的理解:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pool
import os
import time

L1 = [1, 2, 3]

def add(a, b):
    global L1
    L1 += range(a, b)
    print L1

if __name__ == '__main__':
    p1 = Process(target=add, args=(20, 30))
    p2 = Process(target=add, args=(30, 40))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print L1

輸出結果:

[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[1, 2, 3, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3]

該程式的原本目的是想將兩個子程序生成的列表加到全域性變數L1中,但用該方法並不能達到想要的效果。既然不能通過全域性變數來實現不同程序間的資訊互動,那有什麼辦法呢。
mutiprocessing為我們可以通過Queue和Pipe來實現程序間的通訊。

Queue

按上面的例子通過Queue來實現:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Queue, Lock

L = [1, 2, 3]


def add(q, lock, a, b):
    lock.acquire()  # 加鎖避免寫入時出現不可預知的錯誤
    L1 = range(a, b)
    lock.release()
    q.put(L1)
    print L1

if __name__ == '__main__':
    q = Queue()
    lock = Lock()
    p1 = Process(target=add, args=(q, lock, 20, 30))
    p2 = Process(target=add, args=(q, lock, 30, 40))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

    L += q.get() + q.get()
    print L

執行結果:

[20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
[30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
[1, 2, 3, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]

下面介紹Queue的常用方法:

  • 定義時可用q = Queue(maxsize = 10)來指定佇列的長度,預設時或maxsize值小於1時佇列為無限長度。
  • q.put(item)方法向佇列放入元素,其還有一個可選引數block,預設為True,此時若佇列已滿則會阻塞等待,直到有空閒位置。而當black值為 False,在該情況下就會丟擲Full異 常
  • Queue是不可迭代的物件,不能通過for迴圈取值,取值時每次呼叫q.get()方法。同樣也有可選引數block,預設為True,若此時佇列為空則會阻塞等待。而black值為False時,在該情況下就會丟擲Empty異常
  • Queue.qsize() 返回佇列的大小
  • Queue.empty() 如果佇列為空,返回True,反之False
  • Queue.full() 如果佇列滿了,返回True,反之False
  • Queue.get([block[, timeout]]) 獲取佇列,timeout等待時間Queue.get_nowait() 相當Queue.get(False) 非阻塞 Queue.put(item) 寫入佇列,timeout等待時間
  • Queue.put_nowait(item) 相當Queue.put(item, False)

Pipe

Pipe管道,可以是單向(half-duplex),也可以是雙向(duplex)。我們通過mutiprocessing.Pipe(duplex=False)建立單向管道 (預設為雙向)。雙向Pipe允許兩端的進即可以傳送又可以接受;單向的Pipe只允許前面的埠用於接收,後面的埠用於傳送。
下面給出例子:

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe

def proc1(pipe):
    s = 'Hello,This is proc1'
    pipe.send(s)

def proc2(pipe):
    while True:
        print "proc2 recieve:", pipe.recv()

if __name__ == "__main__":
    pipe = Pipe()
    p1 = Process(target=proc1, args=(pipe[0],))
    p2 = Process(target=proc2, args=(pipe[1],))
    p1.start()
    p2.start()
    p1.join()
    p2.join(2)   #限制執行時間最多為2秒
    print '\nend all processes.'

執行結果如下:

proc2 recieve: Hello,This is proc1
proc2 recieve:
end all processes.

當第二行輸出後,因為管道中沒有資料傳來,Proc2處於阻塞狀態,2秒後被強制結束。
以下是單向管道的例子,注意pipe[0],pipe[1]的分配。

# -*- coding:utf-8 -*-
from multiprocessing import Process, Pipe

def proc1(pipe):
    s = 'Hello,This is proc1'
    pipe.send(s)

def proc2(pipe):
    while True:
        print "proc2 recieve:", pipe.recv()

if __name__ == "__main__":
    pipe = Pipe(duplex=False)
    p1 = Process(target=proc1, args=(pipe[1],)) #pipe[1]為傳送端
    p2 = Process(target=proc2, args=(pipe[0],)) #pipe[0]為接收端
    p1.start()
    p2.start()
    p1.join()
    p2.join(2)  # 限制執行時間最多為2秒
    print '\nend all processes.'

執行結果同上。

強大的Manage

Queue和Pipe實現的資料共享方式只支援兩種結構 Value 和 Array。Python中提供了強大的Manage專門用來做資料共享,其支援的型別非常多,包括: Value,Array,list, dict,Queue, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event等
其用法如下:

from multiprocessing import Process, Manager
def func(dt, lt):
    for i in range(10):
        key = 'arg' + str(i)
        dt[key] = i * i

    lt += range(11, 16)

if __name__ == "__main__":
    manager = Manager()
    dt = manager.dict()
    lt = manager.list()

    p = Process(target=func, args=(dt, lt))
    p.start()
    p.join()
    print dt, '\n', lt

執行結果:

{‘arg8’: 64, ‘arg9’: 81, ‘arg0’: 0, ‘arg1’: 1, ‘arg2’: 4, ‘arg3’: 9, ‘arg4’: 16, ‘arg5’: 25, ‘arg6’: 36, ‘arg7’: 49}
[11, 12, 13, 14, 15]