PEP525--非同步生成器
PEP 525 --非同步生成器
簡述
PEP492引入了對Python 3.5的原生協程和async/await句法的支援。本次提案添加了對非同步生成器的支援進而來擴充套件Python的非同步功能。
理論和目標
常規生成器(在PEP 255中引入)的實現,使得編寫複雜資料變得更優雅,它們的行為類似於迭代器。
當時沒有提供async for使用的非同步生成器。 編寫非同步資料生成器變得非常複雜,因為必須定義一個實現__aiter__和__anext__的方法,才能在async for語句中使用它。
為了說明非同步生成器的重要性,專門做了效能測試,測試結果表明使用非同步生成器要比使用非同步迭代器快2倍多。
下面的程式碼是演示了在迭代的過程中等待幾秒
class Ticker: """Yield numbers from 0 to `to` every `delay` seconds.""" def __init__(self, delay, to): self.delay = delay self.i = 0 self.to = to def __aiter__(self): return self async def __anext__(self): i = self.i if i >= self.to: raise StopAsyncIteration self.i += 1 if i: await asyncio.sleep(self.delay) return i
我們那可以使用下面的程式碼實現同樣的功能:
async def ticker(delay, to): """Yield numbers from 0 to `to` every `delay` seconds.""" for i in range(to): yield i await asyncio.sleep(delay)
詳細說明
非同步生成器
我們直到在函式中使用一個或多個yield該函式將變成一個生成器。
def func():# 方法 return def genfunc():# 生成器方法 yield
我們提議使用類似的功能實現下面非同步生成器:
async def coro():# 一個協程方法 await smth() async def asyncgen():# 一個非同步生成器方法 await smth() yield 42
呼叫非同步生成器函式的結果是非同步生成器物件,它實現了PEP 492中定義的非同步迭代協議。
注意:在非同步生成器中使用非空return語句會引發SyntaxError錯誤。
對非同步迭代協議的支援
該協議需要實現兩種特殊方法:
__aiter__方法返回一個非同步迭代器。
__anext__方法返回一個awaitable物件,它使用StopIteration異常來捕獲yield的值,使用StopAsyncIteration異常來表示迭代結束。
非同步生成器定義了這兩種方法。 讓我們實現一個一個簡單的非同步生成器:
import asyncio async def genfunc(): yield 1 yield 2 gen = genfunc() async def start(): assert gen.__aiter__() is gen assert await gen.__anext__() == 1 assert await gen.__anext__() == 2 await gen.__anext__()# This line will raise StopAsyncIteration. if __name__ == '__main__': asyncio.run(start())
終止
PEP 492提到需要使用事件迴圈或排程程式來執行協程。 因為非同步生成器是在協程使用的,所以還需要建立一個事件迴圈來執行。
非同步生成器可以有try..finally塊,也可以用async with非同步上下文管理程式碼快。 重要的是提供一種保證,即使在部分迭代時,也可以進行垃圾收集,生成器可以安全終止。
async def square_series(con, to): async with con.transaction(): cursor = con.cursor( 'SELECT generate_series(0, $1) AS i', to) async for row in cursor: yield row['i'] ** 2 async for i in square_series(con, 1000): if i == 100: break
上面程式碼演示了非同步生成器在async with中使用,然後使用async for對非同步生成器物件進行迭代處理,同時我們也可以設定一箇中斷條件。
square_series()生成器將被垃圾收集,並沒有非同步關閉生成器的機制,Python直譯器將無法執行任何操作。
為了解決這個問題,這裡提出以下改進建議:
1.在非同步生成器上實現一個aclose方法,返回一個特殊awaittable 物件。 當awaitable丟擲GeneratorExit異常的時候,丟擲到掛起的生成器中並對其進行迭代,直到發生GeneratorExit或StopAsyncIteration。這就是在常規函式中使用close方法關閉物件一樣,只不過aclose需要一個事件迴圈去執行。
2.不要在非同步生成器中使用yield語句,只能用await。
3.在sys模組中加兩個方法:set_asyncgen_hooks() and get_asyncgen_hooks().
sys.set_asyncgen_hooks()背後的思想是允許事件迴圈攔截非同步生成器的迭代和終結,這樣終端使用者就不需要關心終結問題了,一切正常。
sys.set_asyncgen_hooks() 可以結束兩個引數
firstiter:一個可呼叫的,當第一次迭代非同步生成器時將呼叫它。
finalizer:一個可呼叫的,當非同步生成器即將被GC時將被呼叫。
當第一迭代非同步生成器時,它會引用到當前的finalizer。
當非同步生成器即將被垃圾收集時,它會呼叫其快取的finalizer。假想在事件迴圈啟用非同步生成器開始迭代的時候,finalizer將呼叫一個aclose()方法.
例如,以下是如何修改asyncio以允許安全地完成非同步生成器:
# asyncio/base_events.py class BaseEventLoop: def run_forever(self): ... old_hooks = sys.get_asyncgen_hooks() sys.set_asyncgen_hooks(finalizer=self._finalize_asyncgen) try: ... finally: sys.set_asyncgen_hooks(*old_hooks) ... def _finalize_asyncgen(self, gen): self.create_task(gen.aclose())
第二個引數firstiter,允許事件迴圈維護在其控制下例項化的弱非同步生成器集。這使得可以實現“shutdown”機制,來安全地開啟的生成器並關閉事件迴圈。
sys.set_asyncgen_hooks()是特定執行緒,因此在多個事件迴圈並行的時候是安全的。
sys.get_asyncgen_hooks()返回一個帶有firstiter和finalizer欄位的類似於類的結構。
asyncio
asyncio事件迴圈將使用sys.set_asyncgen_hooks()API來維護所有被排程的弱非同步生成器,並在生成器被垃圾回收時侯排程它們的aclose()方法。
為了確保asyncio程式可以可靠地完成所有被排程的非同步生成器,我們建議新增一個新的事件迴圈協程方法loop.shutdown_asyncgens()。 該方法將使用aclose()呼叫關閉所有當前開啟的非同步生成器。
在呼叫loop.shutdown_asyncgens()方法之後,首次迭代新的非同步生成器,事件迴圈就會發出警告。 我們的想法是,在請求關閉所有非同步生成器之後,程式不應該執行迭代新非同步生成器的程式碼。
下面是一個關於如何使用Ashutdown_asyncgens的例子:
try: loop.run_forever() finally: loop.run_until_complete(loop.shutdown_asyncgens())#關閉所有非同步迭代器 loop.close()
非同步生成器物件
該物件以標準Python生成器物件為模型。 本質上非同步生成器的行為複製了同步生成器的行為,唯一的區別在於API是非同步的。
定義了以下方法和屬性:
1.agen.__aiter__(): 返回agen.
2.agen.__anext__(): 返回一個awaitable物件, 呼叫一次非同步生成器的元素。
3.agen.asend(val): 返回一個awaitable物件,它在agen生成器中推送val物件。 當agen還沒迭代時,val必須為None。
上面的方法類似同步生成器的使用。
程式碼例子:
import asyncio async def gen(): await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) async def start(): g = gen() await g.asend(None)# Will return 42 after sleeping # for 0.1 seconds. await g.asend('hello')# Will print 'hello' and # raise StopAsyncIteration # (after sleeping for 0.2 seconds.) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(start()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
4.agen.athrow(typ, [val, [tb]]): 返回一個awaitable物件, 這會向agen生成器丟擲一個異常。
程式碼如下:
import asyncio async def gen(): try: await asyncio.sleep(0.1) yield 'hello' except IndexError: await asyncio.sleep(0.2) yield 'world' async def start(): g = gen() v = await g.asend(None) print(v)# Will print 'hello' after # sleeping for 0.1 seconds. v = await g.athrow(IndexError) print(v)# Will print 'world' after # $ sleeping 0.2 seconds. if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(start()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
5.agen.aclose(): 返回一個awaitable物件, 呼叫該方法會丟擲一個異常給生成器。
import asyncio async def gen(): try: await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) except: print("執行結束") async def start(): g = gen() v=await g.asend(None) print(v) await g.aclose() #不做異常處理會報錯 if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(start()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close()
6.agen.__name__ and agen.__qualname__:可以返回非同步生成器函式的名字。
async def gen(): try: await asyncio.sleep(0.1) v = yield 42 print(v) await asyncio.sleep(0.2) except: print("執行結束") async def start(): g = gen() print(g.__aiter__())#輸出async_generator物件 print(g.__name__)#輸出gen print(g.__qualname__)#輸出gen
其他的方法
agen.ag_await: 正等待的物件(None). 類似當前可用的 gi_yieldfrom for generators and cr_await for coroutines. agen.ag_frame, agen.ag_running, and agen.ag_code: 同生成器一樣
StopIteration and StopAsyncIteration 被替換為 RuntimeError,並且不上拋。
原始碼實現細節
非同步生成器物件(PyAsyncGenObject)與PyGenObject共享結構佈局。 除此之外,參考實現還引入了三個新物件:
PyAsyncGenASend:實現__anext__和asend()方法的等待物件。
PyAsyncGenAThrow:實現athrow()和aclose()方法的等待物件。
_PyAsyncGenWrappedValue:來自非同步生成器的每個直接生成的物件都隱式地裝入此結構中。 這就是生成器實現如何使用常規迭代協議從使用非同步迭代協議生成的物件中分離出的物件。
PyAsyncGenASend和PyAsyncGenAThrow是awaitable物件(它們有__await__方法返回self)類似於coroutine的物件(實現__iter,__ next ,send()和throw()方法)。 本質上,它們控制非同步生成器的迭代方式
PyAsyncGenASend and PyAsyncGenAThrow
PyAsyncGenASend類似生成器物件驅動__anext__ and asend() 方法,實裝了非同步迭代協議。
agen.asend(val) 和agen.__anext__() 返回一個PyAsyncGenASend物件的一個引用。 (它將引用儲存回父類agen物件。)
資料流定義如下:
1.首次呼叫PyAsyncGenASend.send(val)時, val將推入到父類agen物件 (PyGenObject利用現有物件。)
對PyAsyncGenASend物件進行後續迭代,將None推送到agen。
2.首次呼叫_PyAsyncGenWrappedValue物件時,它將被拆箱,並且以未被裝飾的值作為引數會引發StopIteration異常。
3.非同步生成器中的return語句引發StopAsyncIteration異常,該異常通過PyAsyncGenASend.send()和PyAsyncGenASend.throw()方法傳播。
4.PyAsyncGenAThrow與PyAsyncGenASend非常相似。 唯一的區別是PyAsyncGenAThrow.send()在第一次呼叫時會向父類agen物件丟擲異常(而不是將值推入其中。)
新的標準庫方法和Types
1.types.AsyncGeneratorType -- 判斷是否是非同步生成器物件
2.sys.set_asyncgen_hooks()和 sys.get_asyncgen_hooks()--
在事件迴圈中設定非同步生成器終結器和迭代攔截器。
3.inspect.isasyncgen()和 inspect.isasyncgenfunction() :方法內省。
4.asyncio加入新方法:loop.shutdown_asyncgens().
5.collections.abc.AsyncGenerator:抽象基類的新增。
是否支援向後相容
該提案完全支援向後相容
在python3.5,async def裡使用yield會報錯,因此在python3.6引入了安全的非同步生成器
效能展示
常規生成器
import time def gen(): i = 0 while i < 100000000: yield i i += 1 if __name__ == '__main__': start = time.time() list(gen()) end = time.time() print("totals time", end - start)
輸出
totals time 14.837260007858276
15s左右
非同步迭代器的改進
import time import asyncio N = 10 ** 7 class AIter: def __init__(self): self.i = 0 def __aiter__(self): return self async def __anext__(self): i = self.i if i >= N: raise StopAsyncIteration self.i += 1 return i async def start(): [_ async for _ in AIter()] if __name__ == '__main__': s=time.time() loop=asyncio.get_event_loop() try: loop.run_until_complete(start()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() e=time.time() print("total time",e-s)
輸出
total time 5.441649913787842
很明顯迭代非同步生成器的速度比迭代普通生成器不只是快了兩倍。
我們可以做一個更簡單的非同步生成器
import time import asyncio async def ticker(delay, to): """Yield numbers from 0 to `to` every `delay` seconds.""" for i in range(to): yield i await asyncio.sleep(delay) async def start(): async for item in ticker(0.000001,100): print(item) if __name__ == '__main__': s=time.time() loop=asyncio.get_event_loop() try: loop.run_until_complete(start()) finally: loop.run_until_complete(loop.shutdown_asyncgens()) loop.close() e=time.time() print("total time",e-s)
設計中要注意的事項
內建函式:aiter() and anext()
最初,PEP 492將__aiter__定義為應返回等待物件的方法,從而產生非同步迭代器。
但是,在CPython 3.5.2中,重新定義了__aiter__可以直接返回非同步迭代器。
為了避免破壞向後相容性,決定Python 3.6將支援兩種方式:__aiter__仍然可以在發出DeprecationWarning時返回等待狀態。由於Python 3.6中__aiter__的這種雙重性質,我們無法新增內建的aiter()的同步實現。 因此,建議等到Python 3.7。
非同步list/dict/set 推導式
將放在單獨的pep中也就是後來的pep530.
非同步yield from
對於非同步生成器,yield from也不那麼重要,因為不需要提供在協程之上實現另一個協同程式協議的機制。為了組合非同步生成器,可以使用async for簡化這個過程:
async def g1(): yield 1 yield 2 async def g2(): async for v in g1(): yield v
為了asend()和athrow()是必須的
它們可以使用非同步生成器實現類似於contextlib.contextmanager的概念。 例如,可以實現以下模式:
@async_context_manager async def ctx(): await open() try: yield finally: await close() async with ctx(): await ...
另一個原因是從__anext__物件返回的物件來推送資料並將異常丟擲到非同步生成器中,很難正確地執行此操作。 新增顯式的asend()和athrow()更獲取異常後的資料。
在實現方面,asend()是__anext__更通用的版本,而athrow()與aclose()非常相似。 因此,為非同步生成器定義這些方法不會增加任何額外的複雜性。
程式碼示例
async def ticker(delay, to): for i in range(to): yield i await asyncio.sleep(delay) async def run(): async for i in ticker(1, 10): print(i) import asyncio loop = asyncio.get_event_loop() try: loop.run_until_complete(run()) finally: loop.close()
這程式碼將打出0-9,每個數字之間的間隔為1s。
提議者
Guido, 2016年9月6日
參考資料
[1] https://github.com/1st1/cpython/tree/async_gen [2] https://mail.python.org/pipermail/python-dev/2016-September/146267.html [3] http://bugs.python.org/issue28003
版權宣告
本文章翻譯整理自,pep525
Source: https://github.com/python/peps/blob/master/pep-0525.txt
本文由CXA翻譯自:2019年1月25日,翻譯能力有限還請大家多多指教。