1. 程式人生 > >Python中協程(coroutine)的初步認識

Python中協程(coroutine)的初步認識

基本概念的認識

  1. 之前在瀏覽相關文章gevent原始碼分析時對協程和程序做了相關比較。

    • 相同點:二者都是可以看做是一種執行流, 該執行流可以掛起,並且在將來又可以在 你掛起的地方恢復執行, 這實際上都可以看做是continuation, 我們來看看當我們掛 起一個執行流時我們要儲存的東西
      1. 棧, 因為如果你不儲存棧,那麼區域性變數你就無法恢復,同時函式的呼叫鏈你也無 法恢復,
      2. 暫存器的狀態: 這好理解, 比如說EIP,如果你不儲存,那麼你恢復執行流就不知道 到底執行哪一條指令, 在比如說ESP,EBP, 如果你不儲存,那麼你即便有完整的棧 你也不知道怎麼用.
        這二者實際就是所謂的上下文,也可以說是continuation. 在執行流切換時必須儲存 這兩個東西, 核心排程程序時也是一回事.
    • 不同點:
      1. 執行流的排程者不同, 程序是核心排程, 而協程是在使用者態排程, 也就是說程序 的上下文是在核心態儲存恢復的,而協程是在使用者態儲存恢復的. 很顯然使用者態的 代價更低
      2. 程序會被搶佔,而協程不會,也就是說協程如果不主動讓出CPU,那麼其他的協程是不 可能得到執行機會,這實際和早期的作業系統類似,比如DOS, 它有一個yield原語, 一個程序呼叫yield,那麼它就會讓出CPU, 其他的程序也就有機會執行了, 如果一 個程序進入了死迴圈,那麼整個系統也就掛起了,永遠無法執行其他的程序了, 但 對協程而言,這不是問題
      3. 對記憶體的佔用不同,實際上協程可以只需要4K的棧就夠了, 而程序佔用的記憶體要大 的多.
      4. 從作業系統的角度講, 多協程的程式是單執行緒,單程序的
  2. 《Fluent Python》中對協程有這樣的解釋:

    通過客戶呼叫 .send(…) 方法傳送資料或使用 yield from 結構驅動的生成器函式。

從生成器演變成協程

  1. 普通生成器generator通過呼叫.send(value)方法傳送資料,且該資料將作為yield表示式的值,所有生成器能夠作為協程使用。
  2. 還添加了.throw(),用於丟擲呼叫方異常,在生成器中處理;.close(),用於終止生成器。
  3. yield from用於巢狀生成器。

協程的基本操作

  1. 協程的基本例子。

      def simple_coroutine
    ():
    c = 0 print('coroutine start') x = yield c print('get --> %s' % x) c += 1 yield c sc = simple_coroutine() try: sc.send(1) except TypeError as e: print(e) # can't send non-None value to a just-started generator # coroutime start print(next(sc)) # 0 # get --> test print(sc.send('test')) # 1 try: next(sc) except StopIteration: print('coroutine end')
    • 若建立協程後沒有預激協程next()send(None),而是send()了一個非None的物件,則會丟擲異常,當協程還未啟動時候,是不能呼叫send()向其傳送一個非None的資料。
    • 首次呼叫next()後,協程在yield後暫停,且產出yield後的值c,若無c,實際產出的是None
    • 當協程暫停時,此時我們呼叫send('test'),向協程傳送資料test,此時協程恢復,yield表示式計算並得到資料test並賦值給x,協程恢復執行直至下一個yield處暫停,併產出該yield後的c的值,即1
    • 最後恢復執行協程,此時已經到了協程末尾,丟擲StopIteration異常。
    • 協程的四種狀態。GEN_CREATED等待開始執行、GEN_RUNNING正在執行、GEN_SUSPENDED在yield表示式處暫停、GEN_CLOSED結束。
      
      # 四種狀態
      
      def simple_coroutine_for_state():
         print('start')
         x = yield 1
         print(inspect.getgeneratorstate(sc))  # GEN_RUNNING
         print('get -->%s' % x)
      
      sc = simple_coroutine_for_state()
      print(inspect.getgeneratorstate(sc))  # GEN_CREATED
      next(sc)
      print(inspect.getgeneratorstate(sc))  # GEN_SUSPENDED
      try:
         sc.send('test state')
      except StopIteration:
         print(inspect.getgeneratorstate(sc))  # GEN_CLOSED
  2. 預激協程的裝飾器:

    • 如上所述,對於一個協程,send()方法的引數會成為yield表示式的值,僅當協程處於暫停狀態時,才能呼叫send()非None方法,所以當協程處於未啟用狀態時,必須呼叫next()send()方法來啟用協程。
    • 對於預激,《Fluent Python》中有如下描述:

      最先呼叫 next(my_coro) 函式這一步通常稱為“預激”(prime)協程
      (即,讓協程向前執行到第一個 yield 表示式,準備好作為活躍的協
      程使用)。

    • 簡單的預激裝飾器:

      def prime_decorator(func):
       @wraps(func)
       def wrapper(*args, **kwargs):
           g = func(*args, **kwargs)
           next(g)
           return g
       return wrapper
      
      
      @prime_decorator
      def cal_average():
       total = 0
       count = 0
       average = None
       while True:
           get = yield average
           total += get
           count += 1
           average = total / count
      
      cal_gen = cal_average()
      print(cal_gen.send(100))  # 100.0
      print(cal_gen.send(500))  # 300.0
    • Tornado中的gen.coroutine也是與預激協程有關:

      def coroutine(func, replace_callback=True):
          return _make_coroutine_wrapper(func, replace_callback=True)
      
       def _make_coroutine_wrapper(func, replace_callback):
           ······
           @functools.wraps(wrapped)
           def wrapper(*args, **kwargs):
               future = TracebackFuture()
               ······
               if isinstance(result, GeneratorType):
                   try:
                       orig_stack_contexts = stack_context._state.contexts
                       yielded = next(result)
                       ······
               ······
               return future
           ······
       return wrapper

      在預激裝飾器內,除了簡單的預激協程,也還能做一些其他更多的事情。

  3. 終止協程和異常處理

    • 終止協程:利用generator.close()方法來關閉協程,關閉後協程的狀態變為GEN_CLOSED
    • 異常處理:利用generator.throw()使協程在暫停的yield表示式處丟擲指定異常,若該異常被處理,則協程向下執行且返回值是yield的產出值;若該異常未被處理,則向上冒泡,傳到呼叫方的上下文中,此時協程也會停止。
         class DemoException(Exception):
            """"""
      
         def handle_exc_gen():
             print('start')
             yielded_value = 0
             try:
                 while True:
                     try:
                         x = yield yielded_value
                     except DemoException:
                         print('handle demo exception and continuing')
                         yielded_value = 0
                     else:
                         print('receive %s' % x)
                         yielded_value += 1
             finally:
                 print('end')
      
        exc_coro = handle_exc_gen()
        print(next(exc_coro))  # 預激協程 start  0
        print(exc_coro.send('test'))  # receive test   1
        print(exc_coro.send('throw'))  # receive throw  2
        res = exc_coro.throw(DemoException)  # handle demo exception and continuing
        print(res)  # 0,處理DemoException時 yieled_value重新變為0
        # 丟擲未處理異常
        exc_coro.throw(TypeError)
  4. 讓協程返回值:在Python3.3之後可以在協程中寫return語句而不會產生語法錯誤。此時協程返回資料的流程是:協程執行結束終止->丟擲StopIteration異常,異常物件的value屬性儲存著協程的返回值->補貨StopIteration並獲得返回值。

       def get_average():
           count = 0
           total = 0.0
           average = None
           while True:
               get = yield average
               if get is None:
                   break
               total += get
               count += 1
               average = total / count
           return total, count
    
       get_average_coro = get_average()
       next(get_average_coro)
       get_average_coro.send(10)
       get_average_coro.send(30)
       try:
           get_average_coro.send(None)
       except StopIteration as e:
           print(e)  # (40.0, 2)

    依舊符合生成器物件的行為:耗盡後丟擲StopIteration異常。

認識yield from

  1. 簡單使用yield from

    • 簡化for迴圈中的yield

         # 簡化for迴圈中的yield
         def for_gen():
             for s in 'ABC':
                 yield s
             for i in range(1, 3):
                 yield i
      
         for_g = for_gen()
         print(list(for_g))  # ['A', 'B', 'C', 1, 2]
      
      
         def simplify_for_gen():
             yield from 'ABC'
             yield from range(1, 3)
      
         simplify_for_g = simplify_for_gen()
         print(list(simplify_for_g))  # ['A', 'B', 'C', 1, 2]
    • 連線可迭代物件

       def chain(*iterables):
           for it in iterables:
               yield from it
      
       s = 'ABC'
       n = range(1, 3)
       print(list(chain(s, n)))  # ['A', 'B', 'C', 1, 2]
      
      
       def flatten(items, ignore_types=(str, bytes)):
           for x in items:
               if isinstance(x, collections.Iterable) and not isinstance(x, ignore_types):
                   yield from flatten(x)
               else:
                   yield x
      
       items = ['a', ['b1', 'b2'], [['c11', 'c12'], ['c21', 'c22']]]
       print(list(flatten(items)))  # ['a', 'b1', 'b2', 'c11', 'c12', 'c21', 'c22']   
  2. 其他

小結