Python非同步處理中的async with 和async for 用法說明
本文翻譯自Python的開發者指南PEP 492。
網上async with和async for的中文資料比較少,我把PEP 492中的官方陳述翻譯一下。
非同步上下文管理器”async with”
非同步上下文管理器指的是在enter
和exit
方法處能夠暫停執行的上下文管理器。
為了實現這樣的功能,需要加入兩個新的方法:__aenter__
和__aexit__
。這兩個方法都要返回一個
awaitable型別的值。
非同步上下文管理器的一種使用方法是:
class AsyncContextManager:
async def __aenter__(self):
await log('entering context' )
async def __aexit__(self, exc_type, exc, tb):
await log('exiting context')
新語法
非同步上下文管理器使用一種新的語法:
async with EXPR as VAR:
BLOCK
這段程式碼在語義上等同於:
mgr = (EXPR)
aexit = type(mgr).__aexit__
aenter = type(mgr).__aenter__(mgr)
exc = True
VAR = await aenter
try:
BLOCK
except:
if not await aexit(mgr, *sys.exc_info()):
raise
else:
await aexit(mgr, None, None, None)
和常規的with
表示式一樣,可以在一個async with
表示式中指定多個上下文管理器。
如果向async with
表示式傳入的上下文管理器中沒有__aenter__
和__aexit__
方法,這將引起一個錯誤
。如果在async def
函式外面使用async with
,將引起一個SyntaxError
(語法錯誤)。
例子
使用async with
能夠很容易地實現一個數據庫事務管理器。
async def commit(session, data):
...
async with session.transaction():
...
await session.update(data)
...
需要使用鎖的程式碼也很簡單:
async with lock:
...
而不是:
with (yield from lock):
...
非同步迭代器 “async for”
一個非同步可迭代物件(asynchronous iterable)能夠在迭代過程中呼叫非同步程式碼,而非同步迭代器就是能夠在next
方法中呼叫非同步程式碼。為了支援非同步迭代:
1、一個物件必須實現__aiter__
方法,該方法返回一個非同步迭代器(asynchronous iterator)物件。
2、一個非同步迭代器物件必須實現__anext__
方法,該方法返回一個awaitable型別的值。
3、為了停止迭代,__anext__
必須丟擲一個StopAsyncIteration
異常。
非同步迭代的一個例子如下:
class AsyncIterable:
def __aiter__(self):
return self
async def __anext__(self):
data = await self.fetch_data()
if data:
return data
else:
raise StopAsyncIteration
async def fetch_data(self):
...
新語法
通過非同步迭代器實現的一個新的迭代語法如下:
async for TARGET in ITER:
BLOCK
else:
BLOCK2
這在語義上等同於:
iter = (ITER)
iter = type(iter).__aiter__(iter)
running = True
while running:
try:
TARGET = await type(iter).__anext__(iter)
except StopAsyncIteration:
running = False
else:
BLOCK
else:
BLOCK2
把一個沒有__aiter__
方法的迭代物件傳遞給 async for
將引起TypeError
。如果在async
def
函式外面使用async with
,將引起一個SyntaxError
(語法錯誤)。
和常規的for
表示式一樣, async for
也有一個可選的else
分句。.
例子1
使用非同步迭代器能夠在迭代過程中非同步地快取資料:
async for data in cursor:
...
這裡的cursor
是一個非同步迭代器,能夠從一個數據庫中每經過N次迭代預取N行資料。
下面的語法展示了這種新的非同步迭代協議的用法:
class Cursor:
def __init__(self):
self.buffer = collections.deque()
async def _prefetch(self):
...
def __aiter__(self):
return self
async def __anext__(self):
if not self.buffer:
self.buffer = await self._prefetch()
if not self.buffer:
raise StopAsyncIteration
return self.buffer.popleft()
接下來這個Cursor
類可以這樣使用:
async for row in Cursor():
print(row)
which would be equivalent to the following code:
i = Cursor().__aiter__()
while True:
try:
row = await i.__anext__()
except StopAsyncIteration:
break
else:
print(row)
例子2
下面的程式碼可以將常規的迭代物件變成非同步迭代物件。儘管這不是一個非常有用的東西,但這段程式碼說明了常規迭代器和非同步迭代器之間的關係。
class AsyncIteratorWrapper:
def __init__(self, obj):
self._it = iter(obj)
def __aiter__(self):
return self
async def __anext__(self):
try:
value = next(self._it)
except StopIteration:
raise StopAsyncIteration
return value
async for letter in AsyncIteratorWrapper("abc"):
print(letter)
為什麼要丟擲StopAsyncIteration?
協程(Coroutines)內部仍然是基於生成器的。因此在PEP 479之前,下面兩種寫法沒有本質的區別:
def g1():
yield from fut
return 'spam'
和
def g2():
yield from fut
raise StopIteration('spam')
自從 PEP 479 得到接受併成為協程 的預設實現,下面這個例子將StopIteration
包裝成一個RuntimeError
。
async def a1():
await fut
raise StopIteration('spam')
告知外圍程式碼迭代已經結束的唯一方法就是丟擲StopIteration
。因此加入了一個新的異常類StopAsyncIteration
。
由PEP 479的規定 , 所有協程中丟擲的StopIteration
異常都被包裝在RuntimeError
中。