1. 程式人生 > >python 多程序併發與多執行緒併發總結

python 多程序併發與多執行緒併發總結

本文對python支援的幾種併發方式進行簡單的總結。

Python支援的併發分為多執行緒併發與多程序併發(非同步IO本文不涉及)。概念上來說,多程序併發即執行多個獨立的程式,優勢在於併發處理的任務都由作業系統管理,不足之處在於程式與各程序之間的通訊和資料共享不方便;多執行緒併發則由程式設計師管理併發處理的任務,這種併發方式可以方便地線上程間共享資料(前提是不能互斥)。Python對多執行緒和多程序的支援都比一般程式語言更高階,最小化了需要我們完成的工作。

一.多程序併發

Mark Summerfield指出,對於計算密集型程式,多程序併發優於多執行緒併發。計算密集型程式指的程式的執行時間大部分消耗在CPU的運算處理過程,而硬碟和記憶體的讀寫消耗的時間很短;相對地,IO密集型程式指的則是程式的執行時間大部分消耗在硬碟和記憶體的讀寫上,CPU的運算時間很短。

對於多程序併發,python支援兩種實現方式,一種是採用程序安全的資料結構:multiprocessing.JoinableQueue,這種資料結構自己管理“加鎖”的過程,程式設計師無需擔心“死鎖”的問題;python還提供了一種更為優雅而高階的實現方式:採用程序池。下面一一介紹。

1.佇列實現——使用multiprocessing.JoinableQueue

multiprocessing是python標準庫中支援多程序併發的模組,我們這裡採用multiprocessing中的資料結構:JoinableQueue,它本質上仍是一個FIFO的佇列,它與一般佇列(如queue中的Queue)的區別在於它是多程序安全的,這意味著我們不用擔心它的互斥和死鎖問題。JoinableQueue主要可以用來存放執行的任務和收集任務的執行結果。舉例來看(以下皆省去匯入包的過程):

def read(q):
    while True:
        try:
            value = q.get()
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        finally:
            q.task_done()

def main():
    q = multiprocessing.JoinableQueue()
    pw1 = multiprocessing.Process(target=read, args=(q,))
    pw2 = multiprocessing.Process(target=read, args=(q,))
    pw1.daemon = True
pw2.daemon = True pw1.start() pw2.start() for c in [chr(ord('A')+i) for i in range(26)]: q.put(c) try: q.join() except KeyboardInterrupt: print("stopped by hand") if __name__ == '__main__': main()

對於windows系統的多程序併發,程式檔案裡必須含有“入口函式”(如main函式),且結尾處必須呼叫入口點。例如以if __name__ == '__main__': main()結尾。

在這個最簡單的多程序併發例子裡,我們用多程序實現將26個字母打印出來。首先定義一個存放任務的JoinableQueue物件,然後例項化兩個Process物件(每個物件對應一個子程序),例項化Process物件需要傳送target和args引數,target是實現每個任務工作中的具體函式,args是target函式的引數。

pw1.daemon = True
pw2.daemon = True

這兩句話將子程序設定為守護程序——主程序結束後隨之結束。

pw1.start()
pw2.start()

一旦執行到這兩句話,子程序就開始獨立於父程序運行了,它會在單獨的程序裡呼叫target引用的函式——在這裡即read函式,它是一個死迴圈,將引數q中的數一一讀取並打印出來。

value = q.get()

這是多程序併發的要點,q是一個JoinableQueue物件,支援get方法讀取第一個元素,如果q中沒有元素,程序就會阻塞,直至q中被存入新元素。

因此執行完pw1.start() pw2.start()這兩句話後,子程序雖然開始運行了,但很快就堵塞住。

for c in [chr(ord('A')+i) for i in range(26)]:
        q.put(c)

將26個字母依次放入JoinableQueue物件中,這時候兩個子程序不再阻塞,開始真正地執行任務。兩個子程序都用value = q.get()來讀取資料,它們都在修改q物件,而我們並不用擔心同步問題,這就是multiProcessing.Joinable資料結構的優勢所在——它是多程序安全的,它會自動處理“加鎖”的過程。

try:
        q.join()

q.join()方法會查詢q中的資料是否已讀完——這裡指的就是任務是否執行完,如果沒有,程式會阻塞住等待q中資料讀完才開始繼續執行(可以用Ctrl+C強制停止)。

對Windows系統,呼叫工作管理員應該可以看到有多個子程序在執行。

2.程序池實現——使用concurrent.futures.ProcessPoolExecutor

Python還支援一種更為優雅的多程序併發方式,直接看例子:

def read(q):
        print('Get %s from queue.' % q)
        time.sleep(random.random())

def main():
    futures = set()
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for q in (chr(ord('A')+i) for i in range(26)):
            future = executor.submit(read, q)
            futures.add(future)
    try:
        for future in concurrent.futures.as_completed(futures):
            err = future.exception()
            if err is not None:
                raise err
    except KeyboardInterrupt:
        print("stopped by hand")

if __name__ == '__main__':
    main()

這裡我們採用concurrent.futures.ProcessPoolExecutor物件,可以把它想象成一個程序池,子程序往裡“填”。我們通過submit方法例項一個Future物件,然後把這裡Future物件都填到池——futures裡,這裡futures是一個set物件。只要程序池裡有future,就會開始執行任務。這裡的read函式更為簡單——只是把一個字元列印並休眠一會而已。

try:
        for future in concurrent.futures.as_completed(futures):

這是等待所有子程序都執行完畢。子程序執行過程中可能丟擲異常,err = future.exception()可以收集這些異常,便於後期處理。

可以看出用Future物件處理多程序併發更為簡潔,無論是target函式的編寫、子程序的啟動等等,future物件還可以向使用者彙報其狀態,也可以彙報執行結果或執行時的異常。

二.多執行緒併發

對於IO密集型程式,多執行緒併發可能要優於多程序併發。因為對於網路通訊等IO密集型任務來說,決定程式效率的主要是網路延遲,這時候是使用程序還是執行緒就沒有太大關係了。

1.佇列實現——使用queue.Queue

程式與多程序基本一致,只是這裡我們不必使用multiProcessing.JoinableQueue物件了,一般的佇列(來自queue.Queue)就可以滿足要求:

def read(q):
    while True:
        try:
            value = q.get()
            print('Get %s from queue.' % value)
            time.sleep(random.random())
        finally:
            q.task_done()

def main():
    q = queue.Queue()
    pw1 = threading.Thread(target=read, args=(q,))
    pw2 = threading.Thread(target=read, args=(q,))
    pw1.daemon = True
    pw2.daemon = True
    pw1.start()
    pw2.start()
    for c in [chr(ord('A')+i) for i in range(26)]:
        q.put(c)
    try:
        q.join()
    except KeyboardInterrupt:
        print("stopped by hand")

if __name__ == '__main__':
    main()

並且這裡我們例項化的是Thread物件,而不是Process物件,程式的其餘部分看起來與多程序並沒有什麼兩樣。

2. 執行緒池實現——使用concurrent.futures.ThreadPoolExecutor

直接看例子:

def read(q):
        print('Get %s from queue.' % q)
        time.sleep(random.random())

def main():
    futures = set()
    with concurrent.futures.ThreadPoolExecutor(multiprocessing.cpu_count()*4) as executor:
        for q in (chr(ord('A')+i) for i in range(26)):
            future = executor.submit(read, q)
            futures.add(future)
    try:
        for future in concurrent.futures.as_completed(futures):
            err = future.exception()
            if err is not None:
                raise err
    except KeyboardInterrupt:
        print("stopped by hand")

if __name__ == '__main__':
    main()

用ThreadPoolExecutor與用ProcessPoolExecutor看起來沒什麼區別,只是改了一下簽名而已。

不難看出,不管是使用佇列還是使用進/執行緒池,從多程序轉化到多執行緒是十分容易的——僅僅是修改了幾個簽名而已。當然內部機制完全不同,只是python的封裝非常好,使我們可以不用關心這些細節,這正是python優雅之處。

參考文獻:
[1]. Summerfield M. Python 3 程式開發指南[J]. 2011.
[2]. Summerfield M, Python程式設計實戰[J]. 2013.