1. 程式人生 > >Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型

Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型

.net post this fab htm true 底層實現 自己 print

使用協程建立自己的異步非阻塞模型


接下來例子中,將使用純粹的Python編碼搭建一個異步模型,相當於自己構建的一個asyncio模塊,這也許能對asyncio模塊底層實現的理解有更大的幫助。主要參考為文末的鏈接,以及自己的補充理解。

完整代碼

技術分享圖片
  1 #!/usr/bin/python
  2 # =============================================================
  3 # File Name: async_base.py
  4 # Author: LI Ke
  5 # Created Time: 1/29/2018 09:18:50
  6
# ============================================================= 7 8 9 import types 10 import time 11 12 13 @types.coroutine 14 def switch(): 15 print(Switch: Start) 16 yield 17 print(Switch: Done) 18 19 async def coro_1(): 20 print(C1: Start) 21 await switch()
22 print(C1: Stop) 23 24 25 async def coro_2(): 26 print(C2: Start) 27 print(C2: 1) 28 print(C2: 2) 29 print(C2: 3) 30 print(C2: Stop) 31 32 c_1 = coro_1() 33 c_2 = coro_2() 34 35 try: 36 c_1.send(None) 37 except StopIteration: 38 pass 39
try: 40 c_2.send(None) 41 except StopIteration: 42 pass 43 try: 44 c_1.send(None) 45 except StopIteration: 46 pass 47 48 print(--------------------------------) 49 50 def run(coros): 51 coros = list(coros) 52 53 while coros: 54 # Duplicate list for iteration so we can remove from original list 55 for coro in list(coros): 56 try: 57 coro.send(None) 58 except StopIteration: 59 coros.remove(coro) 60 61 c_1 = coro_1() 62 c_2 = coro_2() 63 run([c_1, c_2]) 64 65 print(--------------------------------) 66 67 @types.coroutine 68 def action(t): 69 trace=[] 70 while True: 71 trace.append(time.time()) 72 if trace[-1] - trace[0] > t: 73 break # This break will end this function and raise a StopIteration 74 yield 75 76 async def coro_1(): 77 print(C1: Start) 78 await action(2) 79 print(C1: Stop) 80 81 82 async def coro_2(): 83 print(C2: Start) 84 await action(3) 85 print(C2: Stop) 86 87 def timeit(f): 88 def _wrapper(*args, **kwargs): 89 start = time.time() 90 re = f(*args, **kwargs) 91 end = time.time() 92 print(Time cost:, f.__name__, end-start) 93 return re 94 return _wrapper 95 96 c_1 = coro_1() 97 c_2 = coro_2() 98 timeit(run)([c_1]) 99 timeit(run)([c_2]) 100 101 print(--------------------------------) 102 103 c_1 = coro_1() 104 c_2 = coro_2() 105 timeit(run)([c_1, c_2])
View Code

分段解釋

首先會導入需要的模塊,這裏僅僅使用types和time兩個模塊,放棄異步I/O的asyncio模塊。

1 import types
2 import time

接下來定義一個switch函數,利用types.coroutine裝飾器將switch裝飾成一個協程,這個協程將完成一個切換功能。

1 @types.coroutine
2 def switch():
3     print(Switch: Start)
4     yield
5     print(Switch: Done)

隨後定義第一個協程,協程啟動後,會進入一個await,即切入剛才的switch協程,這裏使用async和await關鍵字完成對協程的定義。

1 async def coro_1():
2     print(C1: Start)
3     await switch()
4     print(C1: Stop)

同樣的,再定義第二個協程,第二個協程將從頭到尾順序執行。

1 async def coro_2():
2     print(C2: Start)
3     print(C2: 1)
4     print(C2: 2)
5     print(C2: 3)
6     print(C2: Stop)

有了上面的兩個協程,但我們在異步時,希望在執行完C_1的start後,切換進協程C_2,執行完成後再切換回來。那麽此時就需要一個對協程切換進行控制的程序,具體順序如下,

  1. 啟動協程c_1,啟動後會切換進switch函數,
  2. Switch中由於yield而切出,並保留上下文環境
  3. c_1.send()將獲得返回結果(如果有的話),並繼續執行
  4. 此時c_1已經被中止,啟動c_2,則完成所有執行步驟,捕獲生成器的中止異常
  5. 這時c_2以執行完畢,再次切回c_1(此時會從switch yield之後開始執行)繼續執行。
 1 c_1 = coro_1()
 2 c_2 = coro_2()
 3 
 4 try:
 5     c_1.send(None)
 6 except StopIteration:
 7     pass
 8 try:
 9     c_2.send(None)
10 except StopIteration:
11     pass
12 try:
13     c_1.send(None)
14 except StopIteration:
15     pass

最終得到結果如下,可以看到,整個過程完全按期望的流程進行,

C1: Start
Switch: Start
C2: Start
C2: 1
C2: 2
C2: 3
C2: Stop
Switch: Done
C1: Stop

但是這裏的協程運行部分仍需改善,於是接下來便定義一個run函數用於執行一個協程列表。

run函數首先會遍歷協程列表的副本,並不斷嘗試啟動列表中的協程,當協程結束後便將協程從協程列表中刪除,直到所有的協程都執行完畢為止。

 1 def run(coros):
 2     coros = list(coros)
 3 
 4     while coros:
 5         # Duplicate list for iteration so we can remove from original list
 6         for coro in list(coros):
 7             try:
 8                 coro.send(None)
 9             except StopIteration:
10                 coros.remove(coro)
11 
12 c_1 = coro_1()
13 c_2 = coro_2()
14 run([c_1, c_2])

測試一下run函數,得到結果與前面相同,

C1: Start
Switch: Start
C2: Start
C2: 1
C2: 2
C2: 3
C2: Stop
Switch: Done
C1: Stop

到目前為止,完成了一個簡單的異步模型的搭建,即c_2無需等待c_1執行完成再繼續執行,而是由c_1交出了控制權進行協作完成,同時也不存在多線程的搶占式任務,因為由始至終都只有一個線程在運行,而且也沒有混亂的回調函數存在。

但是,還存在一個阻塞問題沒有解決,也就是說,如果c_1中的switch函數是一個耗時的I/O操作或其他阻塞型操作,則此時需要等待switch的阻塞操作完成才能交出控制權,可如果希望在等待這個耗時操作時,先去執行c_2的任務,再回來檢測c_1中的耗時操作是否完成,則需要使用非阻塞的方式。

首先,對剛才的switch進行改造,完成一個action協程,這個協程會根據傳入的參數,執行對應時間後,再退出協程引發StopIteration,實現方式如下,每次切換進action中都會記錄下時間,然後將時間和第一次進入的時間進行對比,如果超過了設置的時間便退出,如果沒超過限制時間,則切出協程交還出控制權。

1 @types.coroutine
2 def action(t):
3     trace=[]
4     while True:
5         trace.append(time.time())
6         if trace[-1] - trace[0] > t:
7             break # This break will end this function and raise a StopIteration
8         yield

接著定義兩個協程,分別執行action時間為2秒和3秒,同時定義一個計算時間的裝飾器,用於時間記錄。

 1 async def coro_1():
 2     print(C1: Start)
 3     await action(2)
 4     print(C1: Stop)
 5 
 6 
 7 async def coro_2():
 8     print(C2: Start)
 9     await action(3)
10     print(C2: Stop)
11 
12 def timeit(f):
13     def _wrapper(*args, **kwargs):
14         start = time.time()
15         re = f(*args, **kwargs)
16         end = time.time()
17         print(Time cost:, f.__name__, end-start)
18         return re
19     return _wrapper

然後我們先分別運行兩個協程進行一個實驗,

1 c_1 = coro_1()
2 c_2 = coro_2()
3 timeit(run)([c_1])
4 timeit(run)([c_2])

從輸出的結果可以看到兩個協程的耗時與action執行的時間基本相同,且順序執行的時間為兩者之和,

C1: Start
C1: Stop
Time cost: run 2.030202865600586
C2: Start
C2: Stop
Time cost: run 3.0653066635131836

接下來,利用異步非阻塞的方式來執行這兩個協程,

1 c_1 = coro_1()
2 c_2 = coro_2()
3 timeit(run)([c_1, c_2])

最後得到結果

C1: Start
C2: Start
C1: Stop
C2: Stop
Time cost: run 3.0743072032928467

從結果中可以看到,此時的運行方式是異步的形式,c_1啟動後由於進入一個耗時action,且action被我們設置為非阻塞形式,因此c_1交出了控制權,控制權回到run函數後,啟動了c_2,而c_2同樣也進入到action中,這時兩個協程都在等待任務完成,而監視run則在兩個協程中不停輪詢,不斷進入action中查看各自的action操作是否完成,當有協程完成後,將繼續啟動這個協程的後續操作,直到最終所有協程結束。

按照非阻塞異步協程的方式,可以以單線程運行,避免資源鎖的建立,也消除了線程切換的開銷,並且最終獲得了類似多線程運行的時間性能。

相關閱讀


1. 協程和 async / await

參考鏈接


http://www.oschina.net/translate/playing-around-with-await-async-in-python-3-5

Python的異步編程[0] -> 協程[1] -> 使用協程建立自己的異步非阻塞模型