1. 程式人生 > >Python 中的黑暗角落(二):生成器協程的排程問題

Python 中的黑暗角落(二):生成器協程的排程問題

前作介紹了 Python 中的 yield 關鍵字。此篇介紹如何使用 yield 表示式,在 Python 中實現一個最基本的協程排程示例,避免 I/O 操作佔用大量 CPU 計算時間。

協程及其特點

協程是一種特殊的子程式,它可以在特定的位置暫停/恢復(而不是像普通函式那樣在邏輯上順序執行);並且每當協程暫停時,呼叫者可以從協程中獲取狀態,決定呼叫者接下來的走向;以及每當協程恢復時,呼叫者可以傳遞資訊給協程,影響協程的行為

從「可以暫停/恢復」來看,協程類似於 Python 中的迭代器。不過,迭代器僅只是將值返回給呼叫者,其內部的邏輯是確定的,無法與呼叫者做更多的互動。

因為協程可以暫停/恢復,所以,我們可以在多個協程中分別執行不同的任務;然後由排程器管理協程之間的執行,實現多工併發。

此外,協程和呼叫者在同一執行緒中執行;考慮到執行緒是作業系統進行任務排程的最小單元協程和呼叫者之間的切換,沒有 CPU 上下文切換的開銷。因此,相對使用多執行緒、多程序實現多工併發,協程在這方面的開銷非常小。

同樣由於協程之間共享執行緒,所以使用協程實現的多工併發,無法實現真正的並行。因此,顯而易見,協程適合 I/O 密集型的任務併發,而不適合 CPU 密集型的任務併發

協程排程基礎

最簡單的協程的例子,我們實際上已經見過了。在「使用 send() 方法與生成器函式通訊」一節中,func 就扮演了協程函式的角色。每當協程函式在 yield 表示式處暫停,呼叫者就收到上一步計算的結果;每當協程函式自 yield

 表示式處恢復,協程函式就用接收到的數進行下一輪計算。

在見識過最簡單的協程示例之後,我們試著看看在排程協程的過程中,需要怎樣處理。

coroutine_basic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
from collections import deque               # 1.

class Dispatcher(object):                   # 2.
    def __init__(self, tasks):
        self.tasks = deque(tasks)           # 3.
def next(self): return self.tasks.pop() # 4. def run(self): while len(self.tasks): # 5. task = self.next() try: next(task) # 6. except StopIteration: pass # 7. else: self.tasks.appendleft(task) # 8. def greeting(name, times): # 9. for i in range(times): yield # 10. print("Hello, %s.%d!" % (name, i)) dispatcher = Dispatcher([greeting('Liam', 5), greeting('Sophia', 4), greeting('Cancan', 6)]) dispatcher.run()

這段程式碼中,有兩個主要角色:排程器 (2) 和任務 (9)。

從排程器的角度來說,我們自 collections 模組引入了 deque 容器 (1),用於在 (3) 處儲存任務。而後,我們在 (4) 定義了排程器 Dispatcher 的輪詢函式 next(),它返回下一個尚未終止的任務。在排程器的 run() 函式中,(5) 和 (8) 保證了迴圈處理所有尚未完成的任務並清理已完成的任務,(6) 和 (7) 則負責觸發每個任務的下一步動作。

從任務的角度來說,greeting 是一個生成器函式,是具體的協程任務。在 (10) 處,yield 表示式標記了函式暫停/恢復的位置;它將邏輯上連續的任務,在時間上切分成了若干段。

這段程式碼執行起來結果大致是這樣:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Hello, Cancan.0!
Hello, Sophia.0!
Hello, Liam.0!
Hello, Cancan.1!
Hello, Sophia.1!
Hello, Liam.1!
Hello, Cancan.2!
Hello, Sophia.2!
Hello, Liam.2!
Hello, Cancan.3!
Hello, Sophia.3!
Hello, Liam.3!
Hello, Cancan.4!
Hello, Liam.4!
Hello, Cancan.5!

看起來和多執行緒那種亂七八糟的輸出順序有點像,不是嗎?當然,此處由於使用 deque.pop() 輪詢任務佇列,所以輸出順序大致是有跡可循的。不過,這並不影響我們將其作為協程排程的示例。

在這個例子中,儘管呼叫者和協程之間沒有其他的通訊,協程函式內也沒有真正意義上的 I/O 操作,但我們仍可以進行一些總結。

首先,生成器函式充當了協程函式,實現了協程。

其次,協程任務在邏輯上是連續的,但是我們可以用 yield 表示式在時間上把協程任務分成若干部分。

再次,用 yield 分割的任務,需要有一個機制控制器暫停/恢復。這個機制此處由排程器提供。

再者,對於排程器來說,它需要知道「有哪些協程任務需要恢復」。因此,它必然直接或間接地維護一個事件佇列。此處,我們用 Dispatcher.tasks 完成了這一工作。

最後,對於每個協程(任務)來說,一旦被暫停,其恢復就必須依賴主動喚起。因此,排程器必須「恰到好處」地反覆喚起執行緒——不能多也不能少:多則浪費執行時間,甚至丟擲異常;少則留下未能完成的任務。因此,排程器必須恰當地維護上述佇列,確定何時從佇列中移除已完成的任務。在我們的例子中,(6) 和 (7) 協同完成了這一工作。

非同步 I/O 任務模擬

回顧一下剛才的協程任務。

1
2
3
4
def greeting(name, times):
    for i in range(times):
        yield
        print("Hello, %s.%d!" % (name, i))

在這個任務裡,yield 表示式將原本在邏輯上連續的迴圈,人為地在時間上切分成了若干份。然而,除了用於演示暫停/恢復的攜程排程之外,這個例子實際上沒有必要使用協程實現。這是因為,在協程任務中,去掉 yield 表示式之後,所有的操作都是立即完成的;不存在需要阻塞以等待 I/O 的空耗 CPU 的情況。

下列程式碼模擬了一個需要阻塞等待 I/O 的任務。

1
2
3
4
5
6
7
from time import sleep
from random import random as rd

def greeting(name, times, duration = 1):    # 1.
    for i in range(times):
        sleep(2 * duration * rd())          # 2.
        print("Hello, %s.%d!" % (name, i))

此處,新定義的 greeting 函式 (1) 有一個新的引數:duration。而後,在每次迴圈列印招呼資訊的之前,會現行阻塞一段時間 (2)。這一阻塞就模擬了實際情況中的 I/O 類操作:空佔 CPU 資源,但不進行任何計算。阻塞的時間是 2 * duration * rd(),這是一個一 duration 為期望的隨機變數,用來模擬預計阻塞 duration 秒但實際情況會有波動的 I/O 任務。

假設 duration 設定為定值 1 而 times 設定為定值 3,那麼執行一次 greeting 函式,平均需要耗時 3 秒。如若順序執行 3 個這樣的函式,平均下來,一共需要耗費 9 秒的時間。而這 9 秒之中,大多數時間 CPU 都僅只在空耗,沒有執行實際的計算任務。因此,我們可以考慮用協程將它們併發起來執行,降低總的空耗的時間。為此,我們有如下思路。

  • 將每個 I/O 任務理解為一個事件;
  • 維護一個佇列,用於記錄尚在進行中的事件,以便後續操作;
  • 當事件生成時,向上述佇列註冊(即將事件新增進佇列);
  • 使用輪詢(polling)等方式,捕獲完成的事件;
  • 對已完成的事件,進行後續操作(特別地,恢復協程函式),而後從佇列中刪除該事件。

現在,我們開始逐步在這一思路的指導下,實現協程併發。

引出休眠事件(SleepEvent

回顧一下新版的 greeting 函式。若要通過生成器實現協程,就必然要新增 yield 表示式。

1
2
3
4
5
6
7
from time import sleep
from random import random as rd

def greeting(name, times, duration = 1):
    for i in range(times):
        yield sleep(2 * duration * rd())          # 1.
        print("Hello, %s.%d!" % (name, i))

簡單粗暴地以 (1) 的方式加上 yield 表示式是不行的。這是因為,yield 表示式會對 sleep 函式求值,而後將該值返回給呼叫者並暫停。但是,對 sleep 函式求值的過程,就是模擬的 I/O 操作,會阻塞執行執行緒。在阻塞完畢之後,再通過 yield 暫停,這就沒有意義了。

1
2
3
4
5
6
7
def coroutine_sleep(duration):              # 1.
    return SleepEvent(duration)             # 2.

def greeting(name, times, duration = 1):
    for i in range(times):
        yield coroutine_sleep(duration)     # 3.
        print("Hello, %s.%d!" % (name, i))

因此,我們需要定義新的 coroutine_sleep 函式 (1)。這個函式會生成一個事件(SleepEvent),然後不阻塞地立即返回 (2)。因此,在 (3) 處,yield 表示式會將 coroutine_sleep 返回的 SleepEvent 物件傳遞給協程函式的呼叫者,並暫停當前協程函式。

定義事件框架

接下來,我們需要定義事件框架。在實際動手之前,我們應該先分析一下一個事件類需要有哪些功能。

  • 首先,事件應該有能力讓外部知道自身存在。因此事件類應該伴隨一個佇列;並且在生成事件物件時,將自身註冊進這個佇列。
  • 其次,事件應該有能力讓外部知道自身狀態,以便檢查事件狀態,進而進行下一步操作。因此,事件類應該是一個閉包,儲存生成事件時的一些狀態;並提供一個介面,利用這些狀態檢查事件是否完成。
  • 最後,事件應當提供一個介面,記錄在事件完成之後應當做什麼;並且在事件完成之後執行這些操作。

據此,我們應該有如下程式碼。

1
2
3
4
5
6
7
8
9
10
11
12
13
events_list = list()                    # 1.

class Event(object):
    def __init__(self, *args, **kwargs):
        events_list.append(self)        # 2.
        self._callback = lambda:None    # 3.
    def is_ready(self):                 # 4.
        ready = self._is_ready()
        if ready:
            self._callback()            # 5.
        return ready
    def set_callback(self, callback):   # 6.
        self._callback = callback

這裡,(1) 處我們定義了一個全域性的佇列,用於記錄尚在進行中的事件;與此同時,每當生成事件類物件時,(2) 會將當前事件物件註冊到佇列中。(3) 則定義了回撥函式,用於記錄事件完成之後執行什麼操作。

(4) 和 (6) 分別是對外的介面。(4) 讓外部有能力知道自身狀態,其中 _is_ready() 需要在子類中實現;而 (6) 允許外部記錄在事件完成之後應當做什麼。(5) 則保證了當事件完成之後,(6) 中的設定會被正確執行。

至此,我們可以定義出 SleepEvent 類。

1
2
3
4
5
6
7
8
9
10
from time import time as current_time
from random import random as rd

class SleepEvent(Event):                                    # 1.
    def __init__(self, duration):
        super(SleepEvent, self).__init__(duration)
        self._duration = 2 * rd() * duration                # 2.
        self._start_time = current_time()                   # 3.
    def _is_ready(self):
        return (current_time() - self._start_time >= self._duration)# 4.

這裡,(1) 處定義了 SleepEvent 事件類,用來模擬 I/O 事件;模擬的核心在於 (2) 處定義的睡眠時長。(3) 則記錄了事件誕生時的狀態,用在 (4) 處確認事件是否已完成。

至此,協程函式這一側的程式碼我們已經完成了,接下來我們看看排程器一側的程式碼如何實現。

用輪詢捕捉已完成的事件

因為我們在 events_list 中儲存了所有尚在執行中的事件。這是相當簡單的工作,所以不作過多的解釋。

1
2
3
4
5
while len(events_list):
    for event in events_list:
        if event.is_ready():
            events_list.remove(event)
            break

喚醒邏輯

在 Event 類的定義中,is_ready() 函式會在事件完成後呼叫 _callback 函式。而對於協程函式來說,一個事件完成後,需要做的事情無非是:喚醒,恢復執行到下一個暫停點。因此可以有這樣的喚醒邏輯。

1
2
3
4
5
6
def _next(gen_task):
    try:
        yielded_event = next(gen_task)                      # 1.
        yielded_event.set_callback(lambda: _next(gen_task)) # 2.
    except StopIteration:
        pass                                                # 3.

這裡,(1) 呼叫 Python 內建的 next 函式,喚醒協程函式,執行到下一個暫停點,並接受其返回值,儲存在 yielded_event 當中。而後,在 (2) 處將該 Event 物件設定為 Lambda 函式 lambda: _next(gen_task)。顯然,這是一個遞迴呼叫 _next 函式自身的閉包——捕獲了需要繼續喚醒的生成器 gen_task。若生成器執行完畢,則無需繼續喚醒。因此在 (3) 處,直接 pass即可。

完整實驗

將上述程式碼整合起來,就可以做實驗了。

coroutine_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#!/usr/bin/env python3

from time import time as current_time
from random import random as rd

events_list = list()

class Event(object):
    def __init__(self, *args, **kwargs):
        events_list.append(self)
        self._callback = lambda:None
    def is_ready(self):
        ready = self._is_ready()
        if ready:
            self._callback()
        return ready
    def set_callback(self, callback):
        self._callback = callback

class SleepEvent(Event):
    def __init__(self, duration):
        super(SleepEvent, self).__init__(duration)
        self._duration = 2 * rd() * duration
        self._start_time = current_time()
    def _is_ready(self):
        return (current_time() - self._start_time >= self._duration)

class Dispatcher(object):
    def __init__(self, tasks):
        self.tasks = tasks
        self._start()
    def _next(self, gen_task):
        try:
            yielded_event = next(gen_task)
            yielded_event.set_callback(lambda: self._next(gen_task))
        except StopIteration:
            pass
    def _start(self):
        for task in self.tasks:
            self._next(task)
    def polling(self):
        while len(events_list):
            for event in events_list:
                if event.is_ready():
                    events_list.remove(event)
                    break

def coroutine_sleep(duration):
    return SleepEvent(duration)

def greeting(name, times, duration = 1):
    for i in range(times):
        yield coroutine_sleep(duration)
        print("Hello, %s.%d!" % (name, i))

if __name__ == '__main__':
    def test():
        dispatcher = Dispatcher([greeting('Liam', 3), greeting('Sophia', 3), greeting('Cancan', 3)])
        dispatcher.polling()

    import timeit
    timeit_times = 10
    avg_cost = timeit.timeit(lambda: test(), number = timeit_times) / timeit_times
    print('%.3f' % (avg_cost))

可能的執行結果是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ python coroutine_async.py
Hello, Liam.0!
Hello, Liam.1!
Hello, Liam.2!
Hello, Cancan.0!
Hello, Sophia.0!

            
           

相關推薦

Python 黑暗角落生成器排程問題

前作介紹了 Python 中的 yield 關鍵字。此篇介紹如何使用 yield 表示式,在 Python 中實現一個最基本的協程排程示例,避免 I/O 操作佔用大量 CPU 計算時間。 協程及其特點 協程是一種特殊的子程式,它可以在特定的位置暫停/恢復(而不是像

Python 黑暗角落模組與包

如果你用過 Python,那麼你一定用過 import 關鍵字載入過各式各樣的模組。但你是否熟悉 Python 中的模組與包的概念呢?或者,以下幾個問題,你是否有明確的答案? 什麼是模組?什麼又是包?from matplotlib.ticker import Format

Python 黑暗角落理解 yield 關鍵字

Python 是非常靈活的語言,其中 yield 關鍵字是普遍容易困惑的概念。 此篇將介紹 yield 關鍵字,及其相關的概念。 迭代、可迭代、迭代器 迭代(iteration)與可迭代(iterable) 迭代是一種操作;可迭代是物件的一種特性。 很多資料

Python-MongoDB連線搭建Python連線MongoDB

作者:WenWu_Both 出處:http://blog.csdn.net/wenwu_both/article/ 版權:本文版權歸作者和CSDN部落格共有 轉載:歡迎轉載,但未經作者同意,必須保留此段宣告;必須在文章中給出原文連結;否則必究法律責任

Python+OGR庫學習讀取點向量檔案,複製特定屬性值點並另存為shp檔案

程式碼思路: 1、匯入相關庫包,切換到當前資料夾 2、註冊驅動,開啟點向量檔案,獲取圖層 3、建立輸出檔案,並獲取圖層(沒有屬性定義) 4、定義輸出圖層欄位屬性:假設已知檔案所有屬性欄位定義(即ID和cover) (1)讀取輸入檔案中某一要素 (2)獲取ID、cover欄位定義 (3

Android使用OrmLite複雜條件查詢

在資料庫中經常會使用到複雜的條件查詢,來完成業務,下面學習下如何使用OrmLite進行復雜條件查詢。 在預設情況下,Android只打印info級別的日誌資訊。在ormlite中提供了AndroidLog類來設定adb log的級別。為了更方便的檢視OrmLite日誌,檢視增刪改查的sql語句,我們可以在

Python與機器學習Windows下科學計算環境搭建

【注意:安裝numpy和scipy模組時注意與Python版本保持一致】 1.安裝numpy 測試: 沒有報錯,bingo~ 2.安裝scipy 在官網中下載scipy3.4版本:scipy-0

python知識梳理-基礎資料型別

python3 pycharm   一、字串 用單引號、雙引號、三引號引用起來 a='test,abc' b="quit,wer" c='''i love_you,name!''' print(a,b,c) print(type(a),type(b),type(c))--------

Python socket程式設計之socket的選項設定

1.簡介 socket選項主要是由setsockopt和getsockopt函式完成 2.函式介紹 int getsockopt(int sockfd, int level, int optname,                    void *optval,

python使用SQLAlchemy

 由於目前工作比較忙,就借鑑其他大神的學習筆記,即使沒有聽課,但是也是看得懂下面的內容關於SQLAlchemy使用建表、插入和查詢內容,點選python中使用SQLAlchemy檢視。繼續以前面建立的student表作為示例進行後面的操作。1 2 3 4 5 6 7 8 9 10 11 12 13 14 15

深入理解JS的物件new 的工作原理

**目錄** - 序言 - 不同返回值的建構函式 - 深入 new 呼叫函式原理 - 總結 - 參考 **1.序言** 在 [深入理解JS中的物件(一):原型、原型鏈和建構函式](https://www.cnblogs.com/forcheng/p/12866827.html) 中,我們分析了JS中

Spark SQL筆記整理DataFrame編模型與操作案例

代碼 最重要的 ssi func nbu 產生 michael array image DataFrame原理與解析 Spark SQL和DataFrame 1、Spark SQL是Spark中的一個模塊,主要用於進行結構化數據的處理。它提供的最核心的編程抽象,就是Data

TensorFlow 從入門到精通MNIST 例原始碼分析

按照上節步驟, TensorFlow 預設安裝在 /usr/lib/python/site-packages/tensorflow/ (也有可能是 /usr/local/lib……)下,檢視目錄結構: # tree -d -L 3 /usr/lib/pyt

python關於操作時間的方法使用datetime模塊

log time模塊 bsp lib .py nth mon target ear 使用datetime模塊來獲取當前的日期和時間 1 import datetime 2 i=datetime.datetime.now() 3 print ("當前的日期和時間是%

python使用multiprocessing的常見問題

簡介在python的直譯器中,CPython是應用範圍最廣的一種,其具有豐富的擴充套件包,方便了開發者的使用。當然CPython也不是完美的,由於全域性解釋鎖(GIL)的存在,python的多執行緒可以近似看作單執行緒。為此,開發者推出了multiprocessing,這裡介紹一下使用中的常見問題。 環境&

Python和C|C++的混編利用Cython進行混編

cde uil 有時 當前 class def 將在 python 混編 還能夠使用Cython來實現混編 1 下載Cython。用python setup.py install進行安裝 2 一個實例 ① 創建helloworld文件夾創建hellowor

JavaString、StringBuilder、StringBuffer常用源碼分析及比較StringBuilder、StringBuffer源碼分析

string類型 character private 字符 代碼 less pri des over StringBuilder: 一、構造方法: /** * Constructs a string builder with no characters in i

觀察者模式(Observer Pattern)HeadFirst的氣象站的實現

att dex mov min first return 狀態 size sdi 1 觀察者模式的原理,首先由一個主題,當主題發送變化的時候,通知該主題的訂閱者 按照上面的分析我們來進行設計 1.抽象主題Subject public interface Subject {

在 Windows Server Container 運行 Azure Storage Emulator使用自定義的 SQL Server Instance

manage span contain target ros 結果 images 兩種方法 ini   上一章,我們解決了 Azure Storage Emulator 自定義監聽地址的問題,這遠遠不夠,因為在我們 DEV/QA 環境有各自的 SQL Server Inst

java枚舉即對java枚舉的例子進行拓展

枚舉/* 知識點:枚舉 枚舉是從java5開始提供的一種新的數據類型,是一個特殊的類,就是多個常量對象的集合 定義格式: [修飾符] enum 枚舉類名 { 常量A, 常量B, 常量C; } */ //定義枚舉 enum Weekday { Mond