1. 程式人生 > >Python非同步處理中的async with 和async for 用法說明

Python非同步處理中的async with 和async for 用法說明

本文翻譯自Python的開發者指南PEP 492

網上async with和async for的中文資料比較少,我把PEP 492中的官方陳述翻譯一下。

非同步上下文管理器”async with”

非同步上下文管理器指的是在enterexit方法處能夠暫停執行的上下文管理器。

為了實現這樣的功能,需要加入兩個新的方法:__aenter__ 和__aexit__。這兩個方法都要返回一個 awaitable型別的值。

非同步上下文管理器的一種使用方法是:

class AsyncContextManager:
    async def __aenter__(self):
        await log('entering context'
) async def __aexit__(self, exc_type, exc, tb): await log('exiting context')

新語法

非同步上下文管理器使用一種新的語法:

async with EXPR as VAR:
    BLOCK

這段程式碼在語義上等同於:

mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
exc = True

VAR = await aenter
try:
    BLOCK
except:
    if
not await aexit(mgr, *sys.exc_info()): raise else: await aexit(mgr, None, None, None)

和常規的with表示式一樣,可以在一個async with表示式中指定多個上下文管理器。

如果向async with表示式傳入的上下文管理器中沒有__aenter__ 和__aexit__方法,這將引起一個錯誤 。如果在async def函式外面使用async with,將引起一個SyntaxError(語法錯誤)。

例子

使用async with能夠很容易地實現一個數據庫事務管理器。

async def commit(session, data):
    ...
async with session.transaction(): ... await session.update(data) ...

需要使用鎖的程式碼也很簡單:

async with lock:
    ...

而不是:

with (yield from lock):
    ...

非同步迭代器 “async for”

一個非同步可迭代物件(asynchronous iterable)能夠在迭代過程中呼叫非同步程式碼,而非同步迭代器就是能夠在next方法中呼叫非同步程式碼。為了支援非同步迭代:

1、一個物件必須實現__aiter__方法,該方法返回一個非同步迭代器(asynchronous iterator)物件。 
2、一個非同步迭代器物件必須實現__anext__方法,該方法返回一個awaitable型別的值。 
3、為了停止迭代,__anext__必須丟擲一個StopAsyncIteration異常。

非同步迭代的一個例子如下:

class AsyncIterable:
    def __aiter__(self):
        return self

    async def __anext__(self):
        data = await self.fetch_data()
        if data:
            return data
        else:
            raise StopAsyncIteration

    async def fetch_data(self):
        ...

新語法

通過非同步迭代器實現的一個新的迭代語法如下:

async for TARGET in ITER:
    BLOCK
else:
    BLOCK2

這在語義上等同於:

iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
    try:
        TARGET = await type(iter).__anext__(iter)
    except StopAsyncIteration:
        running = False
    else:
        BLOCK
else:
    BLOCK2

把一個沒有__aiter__方法的迭代物件傳遞給 async for將引起TypeError。如果在async def函式外面使用async with,將引起一個SyntaxError(語法錯誤)。

和常規的for表示式一樣, async for也有一個可選的else 分句。.

例子1

使用非同步迭代器能夠在迭代過程中非同步地快取資料:

async for data in cursor:
    ...

這裡的cursor是一個非同步迭代器,能夠從一個數據庫中每經過N次迭代預取N行資料。

下面的語法展示了這種新的非同步迭代協議的用法:

class Cursor:
    def __init__(self):
        self.buffer = collections.deque()

    async def _prefetch(self):
        ...

    def __aiter__(self):
        return self

    async def __anext__(self):
        if not self.buffer:
            self.buffer = await self._prefetch()
            if not self.buffer:
                raise StopAsyncIteration
        return self.buffer.popleft()

接下來這個Cursor 類可以這樣使用:

async for row in Cursor():
    print(row)
which would be equivalent to the following code:

i = Cursor().__aiter__()
while True:
    try:
        row = await i.__anext__()
    except StopAsyncIteration:
        break
    else:
        print(row)

例子2

下面的程式碼可以將常規的迭代物件變成非同步迭代物件。儘管這不是一個非常有用的東西,但這段程式碼說明了常規迭代器和非同步迭代器之間的關係。

class AsyncIteratorWrapper:
    def __init__(self, obj):
        self._it = iter(obj)

    def __aiter__(self):
        return self

    async def __anext__(self):
        try:
            value = next(self._it)
        except StopIteration:
            raise StopAsyncIteration
        return value

async for letter in AsyncIteratorWrapper("abc"):
    print(letter)

為什麼要丟擲StopAsyncIteration?

協程(Coroutines)內部仍然是基於生成器的。因此在PEP 479之前,下面兩種寫法沒有本質的區別:

def g1():
    yield from fut
    return 'spam'

def g2():
    yield from fut
    raise StopIteration('spam')

自從 PEP 479 得到接受併成為協程 的預設實現,下面這個例子將StopIteration包裝成一個RuntimeError

async def a1():
    await fut
    raise StopIteration('spam')    

告知外圍程式碼迭代已經結束的唯一方法就是丟擲StopIteration。因此加入了一個新的異常類StopAsyncIteration

由PEP 479的規定 , 所有協程中丟擲的StopIteration異常都被包裝在RuntimeError中。