1. 程式人生 > >Python黑魔法 --- 非同步IO( asyncio) 協程

Python黑魔法 --- 非同步IO( asyncio) 協程

python asyncio

網路模型有很多中,為了實現高併發也有很多方案,多執行緒,多程序。無論多執行緒和多程序,IO的排程更多取決於系統,而協程的方式,排程來自使用者,使用者可以在函式中yield一個狀態。使用協程可以實現高效的併發任務。Python的在3.4中引入了協程的概念,可是這個還是以生成器物件為基礎,3.5則確定了協程的語法。下面將簡單介紹asyncio的使用。實現協程的不僅僅是asyncio,tornado和gevent都實現了類似的功能。

  • event_loop 事件迴圈:程式開啟一個無限的迴圈,程式設計師會把一些函式註冊到事件迴圈上。當滿足事件發生的時候,呼叫相應的協程函式。
  • coroutine 協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。
  • task 任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含任務的各種狀態。
  • future: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
  • async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面。

上述的概念單獨拎出來都不好懂,比較他們之間是相互聯絡,一起工作。下面看例子,再回溯上述概念,更利於理解。

定義一個協程

定義一個協程很簡單,使用async關鍵字,就像定義普通函式一樣:

12345678910111213141516 import timeimport asyncionow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()loop.run_until_complete(coroutine)print('TIME: ',now()-start)

通過async關鍵字定義一個協程(coroutine),協程也是一種物件。協程不能直接執行,需要把協程加入到事件迴圈(loop),由後者在適當的時候呼叫協程。asyncio.get_event_loop方法可以建立一個事件迴圈,然後使用run_until_complete將協程註冊到事件迴圈,並啟動事件迴圈。因為本例只有一個協程,於是可以看見如下輸出:

12 Waiting:2TIME:0.0004658699035644531

建立一個task

協程物件不能直接執行,在註冊事件迴圈的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)物件。所謂task物件是Future類的子類。儲存了協程執行後的狀態,用於未來獲取協程的結果。

123456789101112131415161718 import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()# task = asyncio.ensure_future(coroutine)task=loop.create_task(coroutine)print(task)loop.run_until_complete(task)print(task)print('TIME: ',now()-start)

可以看到輸出結果為:

Python
1234 <Task pending coro=<do_some_work()running at/Users/ghost/Rsj217/python3.6/async/async-main.py:17>>Waiting:2<Task finished coro=<do_some_work()done,defined at/Users/ghost/Rsj217/python3.6/async/async-main.py:17>result=None>TIME:0.0003490447998046875

建立task後,task在加入事件迴圈之前是pending狀態,因為do_some_work中沒有耗時的阻塞操作,task很快就執行完畢了。後面列印的finished狀態。

asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以建立一個task,run_until_complete的引數是一個futrue物件。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)將會輸出True。

繫結回撥

繫結回撥,在task執行完畢的時候可以獲取執行的結果,回撥的最後一個引數是future物件,通過該物件可以獲取協程返回值。如果回撥需要多個引數,可以通過偏函式匯入。

123456789101112131415161718192021 import timeimport asyncionow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)return'Done after {}s'.format(x)def callback(future):print('Callback: ',future.result())start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)task.add_done_callback(callback)loop.run_until_complete(task)print('TIME: ',now()-start)
1234 def callback(t,future):print('Callback:',t,future.result())task.add_done_callback(functools.partial(callback,2))

可以看到,coroutine執行結束時候會呼叫回撥函式。並通過引數future獲取協程執行的結果。我們建立的task和回撥裡的future物件,實際上是同一個物件。

future 與 result

回撥一直是很多非同步程式設計的惡夢,程式設計師更喜歡使用同步的編寫方式寫非同步程式碼,以避免回撥的惡夢。回撥中我們使用了future物件的result方法。前面不繫結回撥的例子中,我們可以看到task有fiinished狀態。在那個時候,可以直接讀取task的result方法。

12345678910111213 async def do_some_work(x):print('Waiting {}'.format(x))return'Done after {}s'.format(x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: {}'.format(task.result()))print('TIME: {}'.format(now()-start))

可以看到輸出的結果:

123 Waiting:2Task ret:Done after2sTIME:0.0003650188446044922

阻塞和await

使用async可以定義協程物件,使用await可以針對耗時的操作進行掛起,就像生成器裡的yield一樣,函式讓出控制權。協程遇到await,事件迴圈將會掛起該協程,執行別的協程,直到其他的協程也掛起或者執行完畢,再進行下一個協程的執行。

耗時的操作一般是一些IO操作,例如網路請求,檔案讀取等。我們使用asyncio.sleep函式來模擬IO操作。協程的目的也是讓這些IO操作非同步化。

12345678910111213141516171819 import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)await asyncio.sleep(x)return'Done after {}s'.format(x)start=now()coroutine=do_some_work(2)loop=asyncio.get_event_loop()task=asyncio.ensure_future(coroutine)loop.run_until_complete(task)print('Task ret: ',task.result())print('TIME: ',now()-start)

在 sleep的時候,使用await讓出控制權。即當遇到阻塞呼叫的函式的時候,使用await方法將協程的控制權讓出,以便loop呼叫其他的協程。現在我們的例子就用耗時的阻塞操作了。

併發和並行

併發和並行一直是容易混淆的概念。併發通常指有多個任務需要同時進行,並行則是同一時刻有多個任務執行。用上課來舉例就是,併發情況下是一個老師在同一時間段輔助不同的人功課。並行則是好幾個老師分別同時輔助多個學生功課。簡而言之就是一個人同時吃三個饅頭還是三個人同時分別吃一個的情況,吃一個饅頭算一個任務。

asyncio實現併發,就需要多個協程來完成任務,每當有任務阻塞的時候就await,然後其他協程繼續工作。建立多個協程的列表,然後將這些協程註冊到事件迴圈中。

12345678910111213141516171819202122232425262728293031 import asyncioimport timenow=lambda:time.time()async def do_some_work(x):print('Waiting: ',x)await asyncio.sleep(x)return'Done after {}s'.format(x)start=now()coroutine1=do_some_work(1)coroutine2=do_some_work(2)coroutine3=do_some_work(4)tasks=[asyncio.ensure_future(coroutine1),asyncio.ensure_future(coroutine2),asyncio.ensure_future(coroutine3)]loop=asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))fortask intasks:print('Task ret: ',task.result())print('TIME: ',now()-start)

結果如下

1234567 Waiting:1Waiting:2Waiting:4Task ret:Done after1sTask ret:Done after2sTask ret:Done after4sTIME:4.003541946411133

總時間為4s左右。4s的阻塞時間,足夠前面兩個協程執行完畢。如果是同步順序的任務,那麼至少需要7s。此時我們使用了aysncio實現了併發。asyncio.wait(tasks) 也可以使用 asyncio.gather(*tasks) ,前者接受一個task列表,後者接收一堆task。

協程巢狀

使用async可以定義協程,協程用於耗時的io操作,我們也可以封裝更多的io操作過程,這樣就實現了巢狀的協程,即一個協程中await了另外一個協程,如此連線起來。