Python中協程(coroutine)的初步認識
阿新 • • 發佈:2019-02-05
基本概念的認識
之前在瀏覽相關文章gevent原始碼分析時對協程和程序做了相關比較。
- 相同點:二者都是可以看做是一種執行流, 該執行流可以掛起,並且在將來又可以在 你掛起的地方恢復執行, 這實際上都可以看做是continuation, 我們來看看當我們掛 起一個執行流時我們要儲存的東西
- 棧, 因為如果你不儲存棧,那麼區域性變數你就無法恢復,同時函式的呼叫鏈你也無 法恢復,
- 暫存器的狀態: 這好理解, 比如說EIP,如果你不儲存,那麼你恢復執行流就不知道 到底執行哪一條指令, 在比如說ESP,EBP, 如果你不儲存,那麼你即便有完整的棧 你也不知道怎麼用.
這二者實際就是所謂的上下文,也可以說是continuation. 在執行流切換時必須儲存 這兩個東西, 核心排程程序時也是一回事.
- 不同點:
- 執行流的排程者不同, 程序是核心排程, 而協程是在使用者態排程, 也就是說程序 的上下文是在核心態儲存恢復的,而協程是在使用者態儲存恢復的. 很顯然使用者態的 代價更低
- 程序會被搶佔,而協程不會,也就是說協程如果不主動讓出CPU,那麼其他的協程是不 可能得到執行機會,這實際和早期的作業系統類似,比如DOS, 它有一個yield原語, 一個程序呼叫yield,那麼它就會讓出CPU, 其他的程序也就有機會執行了, 如果一 個程序進入了死迴圈,那麼整個系統也就掛起了,永遠無法執行其他的程序了, 但 對協程而言,這不是問題
- 對記憶體的佔用不同,實際上協程可以只需要4K的棧就夠了, 而程序佔用的記憶體要大 的多.
- 從作業系統的角度講, 多協程的程式是單執行緒,單程序的
- 相同點:二者都是可以看做是一種執行流, 該執行流可以掛起,並且在將來又可以在 你掛起的地方恢復執行, 這實際上都可以看做是continuation, 我們來看看當我們掛 起一個執行流時我們要儲存的東西
《Fluent Python》中對協程有這樣的解釋:
通過客戶呼叫 .send(…) 方法傳送資料或使用 yield from 結構驅動的生成器函式。
從生成器演變成協程
- 普通生成器generator通過呼叫
.send(value)
方法傳送資料,且該資料將作為yield
表示式的值,所有生成器能夠作為協程使用。 - 還添加了
.throw()
,用於丟擲呼叫方異常,在生成器中處理;.close()
,用於終止生成器。 yield from
用於巢狀生成器。
協程的基本操作
協程的基本例子。
def simple_coroutine
- 若建立協程後沒有預激協程
next()
或send(None)
,而是send()
了一個非None的物件,則會丟擲異常,當協程還未啟動時候,是不能呼叫send()
向其傳送一個非None的資料。 - 首次呼叫
next()
後,協程在yield
後暫停,且產出yield
後的值c
,若無c
,實際產出的是None
。 - 當協程暫停時,此時我們呼叫
send('test')
,向協程傳送資料test
,此時協程恢復,yield
表示式計算並得到資料test
並賦值給x
,協程恢復執行直至下一個yield
處暫停,併產出該yield
後的c
的值,即1
。 - 最後恢復執行協程,此時已經到了協程末尾,丟擲
StopIteration
異常。 - 協程的四種狀態。GEN_CREATED等待開始執行、GEN_RUNNING正在執行、GEN_SUSPENDED在yield表示式處暫停、GEN_CLOSED結束。
# 四種狀態 def simple_coroutine_for_state(): print('start') x = yield 1 print(inspect.getgeneratorstate(sc)) # GEN_RUNNING print('get -->%s' % x) sc = simple_coroutine_for_state() print(inspect.getgeneratorstate(sc)) # GEN_CREATED next(sc) print(inspect.getgeneratorstate(sc)) # GEN_SUSPENDED try: sc.send('test state') except StopIteration: print(inspect.getgeneratorstate(sc)) # GEN_CLOSED
- 若建立協程後沒有預激協程
預激協程的裝飾器:
- 如上所述,對於一個協程,
send()
方法的引數會成為yield
表示式的值,僅當協程處於暫停狀態時,才能呼叫send()
非None方法,所以當協程處於未啟用狀態時,必須呼叫next()
或send()
方法來啟用協程。 對於預激,《Fluent Python》中有如下描述:
最先呼叫 next(my_coro) 函式這一步通常稱為“預激”(prime)協程
(即,讓協程向前執行到第一個 yield 表示式,準備好作為活躍的協
程使用)。簡單的預激裝飾器:
def prime_decorator(func): @wraps(func) def wrapper(*args, **kwargs): g = func(*args, **kwargs) next(g) return g return wrapper @prime_decorator def cal_average(): total = 0 count = 0 average = None while True: get = yield average total += get count += 1 average = total / count cal_gen = cal_average() print(cal_gen.send(100)) # 100.0 print(cal_gen.send(500)) # 300.0
Tornado中的
gen.coroutine
也是與預激協程有關:def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True) def _make_coroutine_wrapper(func, replace_callback): ······ @functools.wraps(wrapped) def wrapper(*args, **kwargs): future = TracebackFuture() ······ if isinstance(result, GeneratorType): try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) ······ ······ return future ······ return wrapper
在預激裝飾器內,除了簡單的預激協程,也還能做一些其他更多的事情。
- 如上所述,對於一個協程,
終止協程和異常處理
- 終止協程:利用
generator.close()
方法來關閉協程,關閉後協程的狀態變為GEN_CLOSED
。 - 異常處理:利用
generator.throw()
使協程在暫停的yield表示式處丟擲指定異常,若該異常被處理,則協程向下執行且返回值是yield
的產出值;若該異常未被處理,則向上冒泡,傳到呼叫方的上下文中,此時協程也會停止。
class DemoException(Exception): """""" def handle_exc_gen(): print('start') yielded_value = 0 try: while True: try: x = yield yielded_value except DemoException: print('handle demo exception and continuing') yielded_value = 0 else: print('receive %s' % x) yielded_value += 1 finally: print('end') exc_coro = handle_exc_gen() print(next(exc_coro)) # 預激協程 start 0 print(exc_coro.send('test')) # receive test 1 print(exc_coro.send('throw')) # receive throw 2 res = exc_coro.throw(DemoException) # handle demo exception and continuing print(res) # 0,處理DemoException時 yieled_value重新變為0 # 丟擲未處理異常 exc_coro.throw(TypeError)
- 終止協程:利用
讓協程返回值:在Python3.3之後可以在協程中寫
return
語句而不會產生語法錯誤。此時協程返回資料的流程是:協程執行結束終止->丟擲StopIteration
異常,異常物件的value
屬性儲存著協程的返回值->補貨StopIteration
並獲得返回值。def get_average(): count = 0 total = 0.0 average = None while True: get = yield average if get is None: break total += get count += 1 average = total / count return total, count get_average_coro = get_average() next(get_average_coro) get_average_coro.send(10) get_average_coro.send(30) try: get_average_coro.send(None) except StopIteration as e: print(e) # (40.0, 2)
依舊符合生成器物件的行為:耗盡後丟擲
StopIteration
異常。
認識yield from
簡單使用
yield from
:簡化for迴圈中的yield
# 簡化for迴圈中的yield def for_gen(): for s in 'ABC': yield s for i in range(1, 3): yield i for_g = for_gen() print(list(for_g)) # ['A', 'B', 'C', 1, 2] def simplify_for_gen(): yield from 'ABC' yield from range(1, 3) simplify_for_g = simplify_for_gen() print(list(simplify_for_g)) # ['A', 'B', 'C', 1, 2]
連線可迭代物件
def chain(*iterables): for it in iterables: yield from it s = 'ABC' n = range(1, 3) print(list(chain(s, n))) # ['A', 'B', 'C', 1, 2] def flatten(items, ignore_types=(str, bytes)): for x in items: if isinstance(x, collections.Iterable) and not isinstance(x, ignore_types): yield from flatten(x) else: yield x items = ['a', ['b1', 'b2'], [['c11', 'c12'], ['c21', 'c22']]] print(list(flatten(items))) # ['a', 'b1', 'b2', 'c11', 'c12', 'c21', 'c22']
- 其他
小結
…