Python並行程式設計(十四):非同步程式設計
1、基本概念
除了順序執行和並行執行的模型以外,還有非同步模型,這是事件驅動模型的基礎。非同步活動的執行模型可以只有一個單一的主控制流,能在單核心繫統和多核心繫統中執行。
在併發執行的非同步模型中,許多工被穿插在同一時間線上,所有的任務都由一個控制流執行(單一執行緒)。任務的執行可能被暫停或恢復,中間的這段時間執行緒將會執行其他任務。大致如下:
如上圖所示,任務(不同的顏色表示不同的任務)可能被其他任務插入,但是都處在同一個執行緒下。這表明當某一個任務執行的時候,其他任務都暫停了。與多執行緒程式設計模型很大的一點不同是,多執行緒的某個任務在時間線上什麼時候掛起某個活動或恢復某個活動由系統決定,而在非同步中,程式設計師必須假設執行緒可能在任何時間被掛起和替換。
程式設計師可以將任務編寫成許多可以間隔執行的小步驟,如果一個任務需要另一個任務的輸出,那麼被依賴的任務必須接收它的輸入。
2、使用Python的concurrent.futures模組
這個模組具有執行緒池和程序池、管理並行程式設計任務、處理非確定性的執行流程、程序/執行緒同步等功能。
此模組由一下部分組成:
- concurrent.futures.Executor:這是一個虛擬基類,提供了非同步執行的方法。
- submit(function, argument):排程函式(可呼叫的物件)的執行,將argument作為引數傳入。
- map(function, argument):將argument作為引數執行函式,以非同步的方式。
- shutdown(Wait=True):發出讓執行者釋放所有資源的訊號。
- concurrent.futures.Future:其中包括函式的非同步執行。Future物件是submit任務(即帶有引數的functions)到executor的例項。
Executor是抽象類,可以通過子類訪問,即執行緒或程序的ExecutorPools。因為執行緒或程序的例項是依賴於資源的任務,所以最好以池的形式將他們組織在一起,作為可以重用的launcher和executor。
執行緒池和程序池是用於在程式中優化和簡化執行緒/程序的使用。通過池可以提交任務給executor。池由兩部分組成,一部分是內部的佇列,存放著待執行的任務;另一部分是一系列的程序或執行緒,用於執行這些任務。池的概念主要目的是為了重用:讓執行緒或程序在生命週期內可以多次使用。他減少了建立執行緒和程序的開銷,提高了程式效能。重用不是必須的規則,但它是程式設計師在應用中使用池的主要原因。
current.Futures提供了兩種Executor的子類,各自獨立操作一個執行緒池和一個程序池。這兩個子類分別是:
- concurrent.futures.ThreadPoolExecutor(max_workers)
- concurrent.futures.ProcessPoolExecutor(max_workers)
max_workers引數表示最多有多少個worker並行執行任務
程式碼測試:
import concurrent.futures import time number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number if __name__ == "__main__": # Sequential execution start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time))) # Thread pool execution start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1))) # Process pool execution start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
執行結果:
10000000 20000000 30000000 40000000 50000000 60000000 70000000 80000000 90000000 100000000 Sequential execution in 8.975373029708862 seconds 10000000 20000000 30000000 40000000 60000000 70000000 50000000 80000000 90000000 100000000 Thread pool execution in 8.699156045913696 seconds Process pool execution in 5.916198968887329 seconds
建立一個list存放10個數字,然後使用一個迴圈計算從1加到10000000,打印出和與number_list的乘積。
number_list = [1,2,3,4,5,6,7,8,9,10] def evaluate_item(x): #For time consuming result_item = count(x) return result_item def count(number): for i in range(0, 10000000): i = i + 1 return i * number
在主程式中,首先順序執行了一次程式並列印其執行時間:
start_time = time.time() for item in number_list: print(evaluate_item(item)) print("Sequential execution in %s seconds" %(str(time.time() - start_time)))
其次使用futures.ThreadPoolExecutor模組的執行緒池並列印其時間:
start_time_1 = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] for future in concurrent.futures.as_completed(futures): print(future.result()) print("Thread pool execution in %s seconds" %(str(time.time() - start_time_1)))
ThreadPoolExecutor使用執行緒池中的一個執行緒執行給定任務。池中一共有5個執行緒,每一個執行緒從池中取得一個任務然後執行它,當任務執行完成,再從池中拿到另一個任務。
最後是使用程序池:
start_time_2 = time.time() with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor: futures = [executor.submit(evaluate_item, item) for item in number_list] print("Process pool execution in %s seconds" %(str(time.time() - start_time_2)))
和ThreadPoolExecutor一樣,ProcessPoolExecutor是一個executor,使用一個執行緒池來並行執行任務。因為ProcessPoolExecutor使用了多核處理的模組,讓我們可以不受GIL的限制,大大縮短執行時間。
幾乎所有需要處理多個客戶端請求的服務應用都會使用池。也有應用要求需要立即執行,或者要求對任務的執行緒有更多的控制器,這種情況下,池不是一個最佳選擇。
3、使用Asyncio管理事件迴圈
先入為主:
import asyncio import datetime import time def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop() def function_2(end_time, loop): print("function_2 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_3, end_time, loop) else: loop.stop() def function_3(end_time, loop): print("function_3 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_1, end_time, loop) else: loop.stop() def function_4(end_time, loop): print("function_4 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_4, end_time, loop) else: loop.stop() loop = asyncio.get_event_loop() print(loop.time()) end_loop = loop.time() + 9.0 print(end_loop) loop.call_soon(function_1, end_loop, loop) #loop.call_soon(function_4, end_loop, loop) loop.run_forever() loop.close()
執行結果:
上述例子定義了三個非同步任務,相繼執行,如圖所示:
首先,我們要得到這個事件迴圈:
loop = asyncio.get_event_loop()
然後我們通過call_soon方法呼叫了function_1()函式。
end_loop = loop.time() + 9.0
loop.call_soon(function_1, end_loop, loop)
function_1:
def function_1(end_time, loop): print("function_1 called") if (loop.time() + 1.0) < end_time: loop.call_later(1, function_2, end_time, loop) else: loop.stop()
- end_time定義了function_1可以執行的最長時間,並通過call_later方法傳入到function_2中作為引數
- loop通過get_event_loop()方法得到的事件迴圈
任務執行結束之後,它會比較loop.time() + 1s和設定的執行時間,如果沒有超過,使用call_later在1秒之後執行function_2(),function_2和3作用類似
如果執行時間超過了設定,時間迴圈終止。
概念解釋:
Python的Asyncio模組提供了管理事件、協程、任務和執行緒方法,以及編寫併發程式碼的原語。主要元件和概念包括:
- 事件迴圈:在Asyncio模組中,每一個程序都有一個事件迴圈。
- 協程:這是子程式的泛化概念。協程可以在執行期間暫停,這樣就可以等待外部的處理(例如IO)完成之後,從之前暫停的地方恢復執行。
- Futures:定義了Future物件,和concurrent.futures模組一樣,表示尚未完成的計算。
- Tasks:這是Asyncio的子類,用於封裝和管理並行模式下的協程。
事件迴圈:
在計算機系統中,可以產生事件的實體叫做事件源,能處理事件的實體叫做事件處理者,還有一些第三方實體叫做事件迴圈。它的作用是管理所有的事件,在整個程式執行過程中不斷迴圈執行,追蹤事件發生的順序將他們放到佇列中,當主執行緒空閒的時候,呼叫相應的事件處理者處理事件。
Asyncio管理事件迴圈的方法:
- loop = get_event_loop():得到當前上下文的事件迴圈。
- loop.call_later(time_delay, callback, argument):延後time_delay秒再執行callback方法。
- loop.call_soon(callback, argument):儘可能快的呼叫callback。call_soon()函式結束,主執行緒回到事件迴圈之後就會馬上呼叫callback。
- loop.time():以float型別返回當前時間迴圈的內部時間。
- asyncio.set_event_loop():為當前上下文設定時間迴圈。
- asyncio.new_event_loop():根據此策略建立一個新的時間迴圈並返回。
- loop.run_forever():在呼叫stop()之前將一直執行。run_forever真正開始執行函式。
4、使用Asyncio管理協程
上述例子中一個程式變得很大而且複雜時,將其劃分為子程式,每一部分實現特定的任務。子程式不能單獨執行,只能在主程式的請求下執行,主程式負責協調使用各個子程式。協程是子程式的泛化,和子程式一樣的是,協程只負責計算任務的一步;不同的是協程沒有主程式來進行排程。因為協程通過管道連線在一起,沒有監視函式負責順序呼叫他們。在協程中,執行點可以被掛起,可以被之前掛起的點恢復執行。通過協程池就可以插入到計算中:執行第一個任務,直到它返回yield執行權,然後執行下一個,這樣順著執行下去。
這種插入的控制組件就是前文提到的事件迴圈,它持續追蹤所有的協程並執行它們。
協程的另外一些重要特性如下:
- 協程可以有多個入口點,並可以yield多次
- 協程可以將執行權交給其他協程
yield表示協程在此暫停,並且將執行權交給其他協程,因為協程可以將值與控制權一起傳遞給另一個協程,所以yield一個值就表示將值傳給下一個執行的協程。
測試用例:
import asyncio import time from random import randint @asyncio.coroutine def StartState(): print("Start State called \n") input_value = randint(0,1) time.sleep(1) print("I am StartState.input_value is %s" %input_value) if (input_value == 0): result = yield from State2(input_value) else: result = yield from State1(input_value) print("Resume of the Transition : \nStart State calling %s" %result) @asyncio.coroutine def State1(transition_value): outputValue = str("State 1 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State1.input_value is %s" %input_value) if input_value == 0: result = yield from State3(input_value) else: result = yield from State2(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State2(transition_value): outputValue = str("State 2 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State2.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from State3(input_value) result = "State 2 calling %s" %result return outputValue + str(result) @asyncio.coroutine def State3(transition_value): outputValue = str("State 3 with transition value = %s \n" %transition_value) input_value = randint(0,1) time.sleep(1) print("...Evaluation...") print("I am State3.input_value is %s" %input_value) if input_value == 0: result = yield from State1(input_value) else: result = yield from EndState(input_value) result = "State 1 calling %s" %result return outputValue + str(result) @asyncio.coroutine def EndState(transition_value): outputValue = str("End State with transition value = %s \n" %transition_value) print("I am EndState.outputValue is %s" %outputValue) print("...Stop Computation...") return outputValue if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
上述程式碼為使用Asyncio的協程來模擬有限狀態機(一個數學模型,不僅在工程領域應用廣泛,在科學領域也很著名)。模擬的狀態機如下:
系統有四個狀態,0和1是狀態機可以從一個狀態到另一個狀態的值,這個過程叫轉換。
執行結果(結果不唯一):
每一個狀態都由一個裝飾器裝飾:@asyncio.coroutine
通過yield from命令呼叫下一個協程。
啟動事件迴圈:
if __name__ == "__main__": print("Finite State Machine simulation with Asyncio Coroutine") loop = asyncio.get_event_loop() loop.run_until_complete(StartState())
5、使用Asyncio控制任務
Asyncio是用來處理事件迴圈中的非同步程序和併發任務執行的。它還提供了asyncio.Task()類,可以在任務中使用協程。它的作用是在同一事件迴圈中,執行某一個任務的同時可以併發地執行多個任務。當協程被包在任務中,它會自動將任務和事件迴圈連線起來,當事件迴圈啟動的時候,任務自動執行。這樣就提供了一個可以自動驅動協程的機制。
Asyncio模組為我們提供了asyncio.Task(coroutine)方法來處理計算任務,它可以排程協程的執行。任務對協程物件在事件迴圈的執行負責。如果被包裹的協程要從future yield,那麼任務會被掛起,等待future的計算結果。
當future計算完成,被包裹的協程將會拿到future返回的結果或異常(exception)繼續執行。另外,需要注意的是事件迴圈一次只能執行一個任務,除非還有其它事件迴圈在不同的執行緒並行執行,此任務才有可能和其他任務並行。當一個任務在等待future執行的期間,事件迴圈會執行一個新的任務。
測試用例:
import asyncio @asyncio.coroutine def factorial(number): f = 1 for i in range(2, number + 1): print("Asyncio.Task: Compute factorial(%s)" %i) yield from asyncio.sleep(0.5) f *= i print("Asyncio.Task - factorial(%s) = %s" %(number, f)) @asyncio.coroutine def fibonacci(number): a,b = 0,1 for i in range(number): print("Asyncio.Task: Compute fibonacci(%s)" %i) yield from asyncio.sleep(0.5) a, b = b, a+b print("Asyncio.Task - fibonacci(%s) = %s" %(number, a)) @asyncio.coroutine def binomialCoeff(n, k): result = 1 for i in range(1, k+1): result = result * (n-i+1)/i print("Asyncio.Task:Compute binomialCoeff(%s)" %i) yield from asyncio.sleep(0.5) print("Asyncio.Task - binomialCoeff(%s, %s) = %s" %(n, k, result)) if __name__ == "__main__": tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close()
執行結果:
Asyncio.Task: Compute factorial(2) Asyncio.Task: Compute fibonacci(0) Asyncio.Task:Compute binomialCoeff(1) Asyncio.Task: Compute factorial(3) Asyncio.Task: Compute fibonacci(1) Asyncio.Task:Compute binomialCoeff(2) Asyncio.Task: Compute factorial(4) Asyncio.Task: Compute fibonacci(2) Asyncio.Task:Compute binomialCoeff(3) Asyncio.Task: Compute factorial(5) Asyncio.Task: Compute fibonacci(3) Asyncio.Task:Compute binomialCoeff(4) Asyncio.Task: Compute factorial(6) Asyncio.Task: Compute fibonacci(4) Asyncio.Task:Compute binomialCoeff(5) Asyncio.Task: Compute factorial(7) Asyncio.Task: Compute fibonacci(5) Asyncio.Task:Compute binomialCoeff(6) Asyncio.Task: Compute factorial(8) Asyncio.Task: Compute fibonacci(6) Asyncio.Task:Compute binomialCoeff(7) Asyncio.Task: Compute factorial(9) Asyncio.Task: Compute fibonacci(7) Asyncio.Task:Compute binomialCoeff(8) Asyncio.Task: Compute factorial(10) Asyncio.Task: Compute fibonacci(8) Asyncio.Task:Compute binomialCoeff(9) Asyncio.Task - factorial(10) = 3628800 Asyncio.Task: Compute fibonacci(9) Asyncio.Task:Compute binomialCoeff(10) Asyncio.Task - fibonacci(10) = 55 Asyncio.Task - binomialCoeff(20, 10) = 184756.0
上述例子定義了三個執行緒,factorial,fibonacci,binomialCoeff,每一個都帶有asyncio.coroutine裝飾器:
將三個task放入到一個list中:
tasks = [asyncio.Task(factorial(10)), asyncio.Task(fibonacci(10)), asyncio.Task(binomialCoeff(20, 10))]
通過run_until_complete並行執行三個協程,asyncio.wait(tasks)表示執行直到所有給定的協程都完成。
最後關閉事件迴圈:loop.close()
6、使用Asyncio和Futures
Asyncio模組的另一個重要的元件是Futures。它和concurrent.futures.Futures很像,但是針對Asyncio的事件迴圈做了很多定製。asyncio.Futures類代表還未完成的結果,有可能是一個Exception,所以綜合來說,它是一種抽象的代表還沒有做完的事情。
實際上,必須處理一些結果的回撥函式被加入到了這個類的例項中。
基本方法:
- cancel():取消future的執行,排程回撥函式
- result():返回future代表的結果
- exception():返回future中的Exception
- add_done_callback(fn):新增一個回撥函式,當future執行的時候會呼叫這個回撥函式
- remove_done_callback(fn):從call when done列表中移除所有的callback的例項
- set_result(result):將future標為執行完成,並且設定result的值
- set_exception(exception):將future標為執行完成,並設定Exception
測試用例:
# coding : utf-8 import asyncio import sys @asyncio.coroutine def first_coroutine(future, n): # 計算前n個數的和 count = 0 for i in range(1, n+1): count = count + i print("first yield") yield from asyncio.sleep(2) print("first_coroutine finished") # 將future標記為已完成,並設定result的值 future.set_result("first coroutine (sum of n integers) result = %s" %str(count)) @asyncio.coroutine def second_coroutine(future, n): count = 1 for i in range(2, n+1): count *= i print("second yield") yield from asyncio.sleep(1) print("second_coroutine finished") future.set_result("second coroutine (factorial) result = %s" %str(count)) def got_result(future): # 獲取future的set_result結果 print(future.result()) if __name__ == "__main__": N1 = int(sys.argv[1]) N2 = int(sys.argv[2]) loop = asyncio.get_event_loop() future1 = asyncio.Future() future2 = asyncio.Future() tasks = [first_coroutine(future1, N1), second_coroutine(future2, N2)] # 添加回調函式 future1.add_done_callback(got_result) future2.add_done_callback(got_result) loop.run_until_complete(asyncio.wait(tasks)) loop.close()
執行結果: