深入Asyncio(八)異步叠代器
Async Iterators: async for
除了async def
和await
語法外,還有一些其它的語法,本章學習異步版的for循環與叠代器,不難理解,普通叠代器是通過__iter__
和__next__
兩個特殊方法實現的,如下例。
>>> class A: ... def __iter__(self): # 1 ... self.x = 0 # 2 ... return self # 3 ... def __next__(self): # 4 ... if self.x > 2: ... raise StopIteration ... else: ... self.x += 1 ... return self.x >>> for i in A(): ... print(i) ... 1 2 3
叠代器必須支持__iter__方法;
值初始化;
返回一個可叠代對象,這個對象可以執行__next__方法,這裏A本身就實現了__next__方法,所以返回它本身即可;
在每次叠代時調用。
將__next__()
方法聲明為異步函數,要允許await
某些IO操作,除開命名的差別外,幾乎是相同的定義,PEP 492中規範說明了如何實現一個異步叠代器:
1. 實現__aiter__()
方法(不需要用async def);
2. __aiter__()
方法必須返回一個支持__anext__()
方法的對象;
3. __anext__()
必須返回叠代器的每個值,並在結束叠代時拋出StopAsyncIteration
舉個例子,比如Redis的key對應的value是一個很大的集合,想叠代這些key的value會出現嚴重的網絡IO,異步叠代器可以這樣實現:
import asyncio from aioredis import create_redis async def main(): # 1 redis = await create_redis((‘localhost‘, 6379)) # 2 keys = [‘America‘, ‘Africa‘, ‘Europe‘, ‘Asia‘] # 3 async for value in OneAtTime(redis, keys): # 4 await process(value) # 5 class OneAtTime: def __init__(self, redis, keys): # 6 self.redis = redis self.keys = keys def __aiter__(self): # 7 self.ikeys = iter(self.keys) return self async def __anext__(self): # 8 try: k = next(self.ikeys) # 9 except StopIteration: # 10 raise StopAsyncIteration value = await redis.get(k) # 11 return value asyncio.get_event_loop().run_until_complete(main())
主程序入口,用於在loop.run_until_complete()方法中調用;
使用aioredis庫獲取異步連接;
假設每個key對應的value實例非常大;
使用async for循環,關鍵點是這裏的叠代器可以在等待數據時切換任務;
在得到返回值後用協程去處理這個值,假設這個函數也是IO綁定的;
用一個實例來存儲redis連接和keys表;
像普通叠代器一樣,初始化一些值,這裏我們創建一個叠代器作值,由於這個類也重載了__anext__方法,所以直接返回自身;
__anext__
方法用async def聲明;叠代這個普通的叠代器;
處理普通異常並重新拋出一個異步異常;
這個調用是網絡IO,因此用await切換它。
通過以上實現,將可以用一個異步for循環來叠代一些處理網絡IO的異步叠代器,好處是可以只用一個事件loop就能處理大量的數據。
深入Asyncio(八)異步叠代器