1. 程式人生 > >深入Asyncio(八)異步叠代器

深入Asyncio(八)異步叠代器

重載 相同 wait 關鍵點 exce 如何 事件 ash imp

Async Iterators: async for

除了async defawait語法外,還有一些其它的語法,本章學習異步版的for循環與叠代器,不難理解,普通叠代器是通過__iter____next__兩個特殊方法實現的,如下例。

>>> class A:
...     def __iter__(self):    # 1
...             self.x = 0     # 2
...             return self    # 3
...     def __next__(self):    # 4
...             if self.x > 2:
...                     raise StopIteration
...             else:
...                     self.x += 1
...                     return self.x
>>> for i in A():
...     print(i)
...
1
2
3
  1. 叠代器必須支持__iter__方法;

  2. 值初始化;

  3. 返回一個可叠代對象,這個對象可以執行__next__方法,這裏A本身就實現了__next__方法,所以返回它本身即可;

  4. 在每次叠代時調用。


__next__()方法聲明為異步函數,要允許await某些IO操作,除開命名的差別外,幾乎是相同的定義,PEP 492中規範說明了如何實現一個異步叠代器:
1. 實現__aiter__()方法(不需要用async def);
2. __aiter__()方法必須返回一個支持__anext__()方法的對象;
3. __anext__()必須返回叠代器的每個值,並在結束叠代時拋出StopAsyncIteration

異常。

舉個例子,比如Redis的key對應的value是一個很大的集合,想叠代這些key的value會出現嚴重的網絡IO,異步叠代器可以這樣實現:

import asyncio
from aioredis import create_redis

async def main():   # 1
    redis = await create_redis((‘localhost‘, 6379))    # 2
    keys = [‘America‘, ‘Africa‘, ‘Europe‘, ‘Asia‘]  # 3

    async for value in OneAtTime(redis, keys):  # 4
        await process(value)    # 5

class OneAtTime:
    def __init__(self, redis, keys):    # 6
        self.redis = redis
        self.keys = keys
    def __aiter__(self):    # 7
        self.ikeys = iter(self.keys)
        return self
    async def __anext__(self):  # 8
        try:
            k = next(self.ikeys)    # 9
        except StopIteration:   # 10
            raise StopAsyncIteration
        value = await redis.get(k)  # 11
        return value

asyncio.get_event_loop().run_until_complete(main())
  1. 主程序入口,用於在loop.run_until_complete()方法中調用;

  2. 使用aioredis庫獲取異步連接;

  3. 假設每個key對應的value實例非常大;

  4. 使用async for循環,關鍵點是這裏的叠代器可以在等待數據時切換任務;

  5. 在得到返回值後用協程去處理這個值,假設這個函數也是IO綁定的;

  6. 用一個實例來存儲redis連接和keys表;

  7. 像普通叠代器一樣,初始化一些值,這裏我們創建一個叠代器作值,由於這個類也重載了__anext__方法,所以直接返回自身;

  8. __anext__方法用async def聲明;

  9. 叠代這個普通的叠代器;

  10. 處理普通異常並重新拋出一個異步異常;

  11. 這個調用是網絡IO,因此用await切換它。


通過以上實現,將可以用一個異步for循環來叠代一些處理網絡IO的異步叠代器,好處是可以只用一個事件loop就能處理大量的數據。

深入Asyncio(八)異步叠代器