Python 中的黑暗角落(二):生成器協程的排程問題
前作介紹了 Python 中的 yield
關鍵字。此篇介紹如何使用 yield
表示式,在
Python 中實現一個最基本的協程排程示例,避免 I/O 操作佔用大量 CPU 計算時間。
協程及其特點
協程是一種特殊的子程式,它可以在特定的位置暫停/恢復(而不是像普通函式那樣在邏輯上順序執行);並且每當協程暫停時,呼叫者可以從協程中獲取狀態,決定呼叫者接下來的走向;以及每當協程恢復時,呼叫者可以傳遞資訊給協程,影響協程的行為。
從「可以暫停/恢復」來看,協程類似於 Python 中的迭代器。不過,迭代器僅只是將值返回給呼叫者,其內部的邏輯是確定的,無法與呼叫者做更多的互動。
因為協程可以暫停/恢復,所以,我們可以在多個協程中分別執行不同的任務;然後由排程器管理協程之間的執行,實現多工併發。
此外,協程和呼叫者在同一執行緒中執行;考慮到執行緒是作業系統進行任務排程的最小單元,協程和呼叫者之間的切換,沒有 CPU 上下文切換的開銷。因此,相對使用多執行緒、多程序實現多工併發,協程在這方面的開銷非常小。
同樣由於協程之間共享執行緒,所以使用協程實現的多工併發,無法實現真正的並行。因此,顯而易見,協程適合 I/O 密集型的任務併發,而不適合 CPU 密集型的任務併發。
協程排程基礎
最簡單的協程的例子,我們實際上已經見過了。在「使用 send()
方法與生成器函式通訊」一節中,func
就扮演了協程函式的角色。每當協程函式在 yield
表示式處暫停,呼叫者就收到上一步計算的結果;每當協程函式自 yield
在見識過最簡單的協程示例之後,我們試著看看在排程協程的過程中,需要怎樣處理。
coroutine_basic.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
from collections import deque # 1. class Dispatcher(object): # 2. def __init__(self, tasks): self.tasks = deque(tasks) # 3. |
這段程式碼中,有兩個主要角色:排程器 (2) 和任務 (9)。
從排程器的角度來說,我們自 collections
模組引入了 deque
容器
(1),用於在 (3) 處儲存任務。而後,我們在 (4) 定義了排程器 Dispatcher
的輪詢函式 next()
,它返回下一個尚未終止的任務。在排程器的 run()
函式中,(5)
和 (8) 保證了迴圈處理所有尚未完成的任務並清理已完成的任務,(6) 和 (7) 則負責觸發每個任務的下一步動作。
從任務的角度來說,greeting
是一個生成器函式,是具體的協程任務。在 (10) 處,yield
表示式標記了函式暫停/恢復的位置;它將邏輯上連續的任務,在時間上切分成了若干段。
這段程式碼執行起來結果大致是這樣:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Hello, Cancan.0! Hello, Sophia.0! Hello, Liam.0! Hello, Cancan.1! Hello, Sophia.1! Hello, Liam.1! Hello, Cancan.2! Hello, Sophia.2! Hello, Liam.2! Hello, Cancan.3! Hello, Sophia.3! Hello, Liam.3! Hello, Cancan.4! Hello, Liam.4! Hello, Cancan.5! |
看起來和多執行緒那種亂七八糟的輸出順序有點像,不是嗎?當然,此處由於使用 deque.pop()
輪詢任務佇列,所以輸出順序大致是有跡可循的。不過,這並不影響我們將其作為協程排程的示例。
在這個例子中,儘管呼叫者和協程之間沒有其他的通訊,協程函式內也沒有真正意義上的 I/O 操作,但我們仍可以進行一些總結。
首先,生成器函式充當了協程函式,實現了協程。
其次,協程任務在邏輯上是連續的,但是我們可以用 yield
表示式在時間上把協程任務分成若干部分。
再次,用 yield
分割的任務,需要有一個機制控制器暫停/恢復。這個機制此處由排程器提供。
再者,對於排程器來說,它需要知道「有哪些協程任務需要恢復」。因此,它必然直接或間接地維護一個事件佇列。此處,我們用 Dispatcher.tasks
完成了這一工作。
最後,對於每個協程(任務)來說,一旦被暫停,其恢復就必須依賴主動喚起。因此,排程器必須「恰到好處」地反覆喚起執行緒——不能多也不能少:多則浪費執行時間,甚至丟擲異常;少則留下未能完成的任務。因此,排程器必須恰當地維護上述佇列,確定何時從佇列中移除已完成的任務。在我們的例子中,(6) 和 (7) 協同完成了這一工作。
非同步 I/O 任務模擬
回顧一下剛才的協程任務。
1 2 3 4 |
def greeting(name, times): for i in range(times): yield print("Hello, %s.%d!" % (name, i)) |
在這個任務裡,yield
表示式將原本在邏輯上連續的迴圈,人為地在時間上切分成了若干份。然而,除了用於演示暫停/恢復的攜程排程之外,這個例子實際上沒有必要使用協程實現。這是因為,在協程任務中,去掉 yield
表示式之後,所有的操作都是立即完成的;不存在需要阻塞以等待
I/O 的空耗 CPU 的情況。
下列程式碼模擬了一個需要阻塞等待 I/O 的任務。
1 2 3 4 5 6 7 |
from time import sleep from random import random as rd def greeting(name, times, duration = 1): # 1. for i in range(times): sleep(2 * duration * rd()) # 2. print("Hello, %s.%d!" % (name, i)) |
此處,新定義的 greeting
函式 (1) 有一個新的引數:duration
。而後,在每次迴圈列印招呼資訊的之前,會現行阻塞一段時間
(2)。這一阻塞就模擬了實際情況中的 I/O 類操作:空佔 CPU 資源,但不進行任何計算。阻塞的時間是 2 * duration * rd()
,這是一個一 duration
為期望的隨機變數,用來模擬預計阻塞 duration
秒但實際情況會有波動的
I/O 任務。
假設 duration
設定為定值 1 而 times
設定為定值
3,那麼執行一次 greeting
函式,平均需要耗時 3 秒。如若順序執行 3 個這樣的函式,平均下來,一共需要耗費 9 秒的時間。而這 9 秒之中,大多數時間 CPU
都僅只在空耗,沒有執行實際的計算任務。因此,我們可以考慮用協程將它們併發起來執行,降低總的空耗的時間。為此,我們有如下思路。
- 將每個 I/O 任務理解為一個事件;
- 維護一個佇列,用於記錄尚在進行中的事件,以便後續操作;
- 當事件生成時,向上述佇列註冊(即將事件新增進佇列);
- 使用輪詢(polling)等方式,捕獲完成的事件;
- 對已完成的事件,進行後續操作(特別地,恢復協程函式),而後從佇列中刪除該事件。
現在,我們開始逐步在這一思路的指導下,實現協程併發。
引出休眠事件(SleepEvent
)
回顧一下新版的 greeting
函式。若要通過生成器實現協程,就必然要新增 yield
表示式。
1 2 3 4 5 6 7 |
from time import sleep from random import random as rd def greeting(name, times, duration = 1): for i in range(times): yield sleep(2 * duration * rd()) # 1. print("Hello, %s.%d!" % (name, i)) |
簡單粗暴地以 (1) 的方式加上 yield
表示式是不行的。這是因為,yield
表示式會對 sleep
函式求值,而後將該值返回給呼叫者並暫停。但是,對 sleep
函式求值的過程,就是模擬的
I/O 操作,會阻塞執行執行緒。在阻塞完畢之後,再通過 yield
暫停,這就沒有意義了。
1 2 3 4 5 6 7 |
def coroutine_sleep(duration): # 1. return SleepEvent(duration) # 2. def greeting(name, times, duration = 1): for i in range(times): yield coroutine_sleep(duration) # 3. print("Hello, %s.%d!" % (name, i)) |
因此,我們需要定義新的 coroutine_sleep
函式 (1)。這個函式會生成一個事件(SleepEvent
),然後不阻塞地立即返回
(2)。因此,在 (3) 處,yield
表示式會將 coroutine_sleep
返回的 SleepEvent
物件傳遞給協程函式的呼叫者,並暫停當前協程函式。
定義事件框架
接下來,我們需要定義事件框架。在實際動手之前,我們應該先分析一下一個事件類需要有哪些功能。
- 首先,事件應該有能力讓外部知道自身存在。因此事件類應該伴隨一個佇列;並且在生成事件物件時,將自身註冊進這個佇列。
- 其次,事件應該有能力讓外部知道自身狀態,以便檢查事件狀態,進而進行下一步操作。因此,事件類應該是一個閉包,儲存生成事件時的一些狀態;並提供一個介面,利用這些狀態檢查事件是否完成。
- 最後,事件應當提供一個介面,記錄在事件完成之後應當做什麼;並且在事件完成之後執行這些操作。
據此,我們應該有如下程式碼。
1 2 3 4 5 6 7 8 9 10 11 12 13 |
events_list = list() # 1. class Event(object): def __init__(self, *args, **kwargs): events_list.append(self) # 2. self._callback = lambda:None # 3. def is_ready(self): # 4. ready = self._is_ready() if ready: self._callback() # 5. return ready def set_callback(self, callback): # 6. self._callback = callback |
這裡,(1) 處我們定義了一個全域性的佇列,用於記錄尚在進行中的事件;與此同時,每當生成事件類物件時,(2) 會將當前事件物件註冊到佇列中。(3) 則定義了回撥函式,用於記錄事件完成之後執行什麼操作。
(4) 和 (6) 分別是對外的介面。(4) 讓外部有能力知道自身狀態,其中 _is_ready()
需要在子類中實現;而 (6) 允許外部記錄在事件完成之後應當做什麼。(5)
則保證了當事件完成之後,(6) 中的設定會被正確執行。
至此,我們可以定義出 SleepEvent
類。
1 2 3 4 5 6 7 8 9 10 |
from time import time as current_time from random import random as rd class SleepEvent(Event): # 1. def __init__(self, duration): super(SleepEvent, self).__init__(duration) self._duration = 2 * rd() * duration # 2. self._start_time = current_time() # 3. def _is_ready(self): return (current_time() - self._start_time >= self._duration)# 4. |
這裡,(1) 處定義了 SleepEvent
事件類,用來模擬 I/O 事件;模擬的核心在於 (2) 處定義的睡眠時長。(3) 則記錄了事件誕生時的狀態,用在 (4)
處確認事件是否已完成。
至此,協程函式這一側的程式碼我們已經完成了,接下來我們看看排程器一側的程式碼如何實現。
用輪詢捕捉已完成的事件
因為我們在 events_list
中儲存了所有尚在執行中的事件。這是相當簡單的工作,所以不作過多的解釋。
1 2 3 4 5 |
while len(events_list): for event in events_list: if event.is_ready(): events_list.remove(event) break |
喚醒邏輯
在 Event
類的定義中,is_ready()
函式會在事件完成後呼叫 _callback
函式。而對於協程函式來說,一個事件完成後,需要做的事情無非是:喚醒,恢復執行到下一個暫停點。因此可以有這樣的喚醒邏輯。
1 2 3 4 5 6 |
def _next(gen_task): try: yielded_event = next(gen_task) # 1. yielded_event.set_callback(lambda: _next(gen_task)) # 2. except StopIteration: pass # 3. |
這裡,(1) 呼叫 Python 內建的 next
函式,喚醒協程函式,執行到下一個暫停點,並接受其返回值,儲存在 yielded_event
當中。而後,在
(2) 處將該 Event
物件設定為 Lambda 函式 lambda:
_next(gen_task)
。顯然,這是一個遞迴呼叫 _next
函式自身的閉包——捕獲了需要繼續喚醒的生成器 gen_task
。若生成器執行完畢,則無需繼續喚醒。因此在
(3) 處,直接 pass
即可。
完整實驗
將上述程式碼整合起來,就可以做實驗了。
coroutine_async.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
#!/usr/bin/env python3 from time import time as current_time from random import random as rd events_list = list() class Event(object): def __init__(self, *args, **kwargs): events_list.append(self) self._callback = lambda:None def is_ready(self): ready = self._is_ready() if ready: self._callback() return ready def set_callback(self, callback): self._callback = callback class SleepEvent(Event): def __init__(self, duration): super(SleepEvent, self).__init__(duration) self._duration = 2 * rd() * duration self._start_time = current_time() def _is_ready(self): return (current_time() - self._start_time >= self._duration) class Dispatcher(object): def __init__(self, tasks): self.tasks = tasks self._start() def _next(self, gen_task): try: yielded_event = next(gen_task) yielded_event.set_callback(lambda: self._next(gen_task)) except StopIteration: pass def _start(self): for task in self.tasks: self._next(task) def polling(self): while len(events_list): for event in events_list: if event.is_ready(): events_list.remove(event) break def coroutine_sleep(duration): return SleepEvent(duration) def greeting(name, times, duration = 1): for i in range(times): yield coroutine_sleep(duration) print("Hello, %s.%d!" % (name, i)) if __name__ == '__main__': def test(): dispatcher = Dispatcher([greeting('Liam', 3), greeting('Sophia', 3), greeting('Cancan', 3)]) dispatcher.polling() import timeit timeit_times = 10 avg_cost = timeit.timeit(lambda: test(), number = timeit_times) / timeit_times print('%.3f' % (avg_cost)) |
可能的執行結果是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
$ python coroutine_async.py Hello, Liam.0! Hello, Liam.1! Hello, Liam.2! Hello, Cancan.0! Hello, Sophia.0! |