1. 程式人生 > >微信群分享:Python協程入門

微信群分享:Python協程入門

Python語言是由Guido van Rossum大牛在1989年發明,它是當今世界最受歡迎的計算機程式語言之一,也是一門“學了有用、學了能用、學會能久用”的計算生態語言。

為此,CSDN作為國內最大的IT中文社群,特向廣大Python愛好者開設了Python學習班,幫助大家在學習的道路上少走彎路,事半功倍。3月16號晚上8點,我們特邀請知名Python技術專家陳舸老師在班級裡舉行分享活動。

陳舸,8年開發經驗,曾就職華為、烽火通訊,目前創業中。技術涉獵廣泛,嵌入式開發,Linux,Python,iOS,Web均有涉及。《Python Cookbook第三版》譯者,《Linux/Unix系統程式設計手冊 下卷》以及《演算法精解 C語言描述》合作譯者。

以下為昨晚的分享內容:
大家好,今天給大家介紹Python中的協程(coroutine),讓大家對協程能有一個基本的認識。本文將從迭代器、生成器的基礎講起,通過生成器實現協程,最後將簡單介紹Python3.5中新增關鍵字async/await對協程的支援。本次分享中的程式碼示例如不加特別說明將相容Python2和Python3。使用Python2的同學可以通過from future import print_function來匯入print函式。

迭代器(iterator)

大家都知道Python裡有一個for語句。我們可以用for來迴圈迭代一個序列。Python中可迭代的物件有很多,比如我們所熟悉的:

迭代列表

for x in [1,2,3,4,5]:
    print(x)

迭代字典

languages = {'Java':'James Gosling', 'Python':'Guido van Rossum'}
for lang, author in languages.items():
    print("{0} create {1}".format(author, lang))

迭代文字

for line in open("logfile.log"):
    print(line)

迭代字串

for char in "hello world":
    print(char)

為什麼可以迭代許多不同的物件呢?因為Python中存在著一個迭代協議。我們再來看看這個例子:

>>> items = [1, 2, 3]
>>> it = iter(items)
>>> it.next()
1
>>> it.next()
2
>>> it.next()
3
>>> it.next()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

這一次我們沒有通過for來迭代列表items,而是先通過Python的內建函式iter將列表items轉換成一個迭代器,然後不斷呼叫迭代器的next()方法來得到序列中的值,當超出可迭代範圍時,就丟擲了StopIteration異常。

因此對於如下的for迴圈

alert("Hellofor x in iterableObj:
    print(x) 

Python直譯器是這樣為我們處理的:

_iter = iter(iterableObj)
while True:
    try:
        x = _iter.next()
    except StopIteration:
        break
    print(x)

所以,Python的迭代協議就是要求物件的iter()方法返回一個特殊的迭代器物件,並且該物件必須實現next()方法,並使用StopIteration異常來通知迭代的完成。滿足上述要求的物件,我們就認為它是迭代器物件。這樣,在for語句中迭代時,Python會自動為我們呼叫迭代器的next()方法。

也就是說,我們自定義的物件如果也想用在for語句中來迭代的話,只需要滿足迭代協議的要求,實現iter()和next()方法,並在next()中捕獲StopIteration異常就可以了。好啦,那我們就根據上述要求,自己實現一個迭代器物件吧。
自定義可迭代物件, 迭代器版

class LowerLetters(object):
    def __init__(self):
        self.current = 'a'

    def __next__(self):
        if self.current > 'z':
            raise StopIteration

        result = self.current
        self.current = chr(ord(result)+1)
        return result

def __iter__(self):
    ''' 只需要返回self即可 '''
        return self

letters = LowerLetters()
for i in letters:
    print(i)  # 輸出a-z的小寫字母

生成器(generator)

剛剛說了迭代器,我們再來看看Python中的生成器。先看看怎麼定義一個Python的生成器。

def countdown(n):
print("Counting down from", n)
while n > 0:
    yield n
    n -= 1

看起來和普通的Python函式並無什麼區別,都是用def來定義函式而已啊。只是while迴圈中的那個yield好像不太熟,而且函式裡沒有出現過return。先不管那麼多,我們呼叫一下上面這個函式看看。

>>> x = countdown(10)     # 注意,並沒有打印出任何內容
>>> x
<generator object at 0x58490>
>>>

奇怪了,明明函式定義裡有一個print列印啊,呼叫它居然沒有列印資訊出來,這說明函式就沒有開始執行嘛。沒錯,檢視x我們發現它是一個generator物件,也就是生成器物件。函式countdown呼叫之後,只是返回了一個生成器物件而已,函式中的語句並沒有立刻開始執行。那怎麼樣才能讓函式開始執行呢?我們在迭代器中講過的next()又要登場了。

注意,Python2中生成器物件的next方法在Python3中更改為next
這裡以Python3為示例,用Python2的同學需要用next()

x.__next__()
Counting down from 10
10
>>>
>>> x.__next__()
9
>>> x.__next__()
8
...
>>> x.__next__()
1
>>> x.__next__()
Traceback (most recent call last):
File "<stdin>", line 1, in ?
StopIteration
>>>

通過對生成器物件x呼叫next()方法,我們發現countdown函式體中的語句終於開始執行了,而且還伴隨著一個現象,那就是每次執行next(),函式就在yield語句處返回一個值,然後就停住不動了,直到下一次呼叫next()時才會又繼續執行下去,如此往復,直到函式返回。

剛剛我們看到丟擲StopIteration異常了,此時函式countdown已經返回了。這裡是不是有似曾相似的感覺?之前講的迭代器裡,也有StopIteration異常,也出現了next。那到底迭代器和生成器有什麼區別呢?

如果我們檢視生成器x所包含的方法:

>>> dir(x)
['__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__next__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'gi_code', 'gi_frame', 'gi_running', 'gi_yieldfrom', 'send', 'throw']
>>> '__iter__' in dir(x)
True
>>> '__next__' in dir(x)
True

我們發現生成器物件中也包含有iter()和next()方法! 那麼根據Python中的迭代協議,可以得知生成器其實也是一種迭代器。既然它們本質上都是迭代器,那生成器的好處體現在哪裡了呢?還記得我們在講迭代器時實現的那個自定義類LowerLetters嗎?為了滿足Python迭代協議,我們分別實現了iter()和next()方法,並在next()中處理StopIteration異常。可是你再看看我們的生成器函式countdown,我們搞了那麼多東西嗎?沒有!因為這裡Python自動幫我們搞定了迭代協議,簡單多了。讓我們用生成器重新定義一個LowerLetter。

自定義可迭代物件,生成器版

def lowerLetter():
lowerLetter.current = 'a'
while lowerLetter.current <= 'z':
    result = lowerLetter.current
    lowerLetter.current = chr(ord(result)+1)
    yield result   #用yield產生值

for i in lowerLetter():
...     print(i)  # 輸出a-z

可以看到,相比迭代器版本,生成器版本的實現簡潔多了,一個magic方法都不涉及,你甚至都不需要寫一個類。生成器可以用來簡化實現迭代器物件。

迭代器和生成器都有一種特質,它們都可迭代(iterable),但是隻可以迭代一輪,一旦迭代結束,所有產生的值都不會儲存,如果你要再次迭代,那麼需要重新呼叫迭代器/生成器一次。這和Python內建的列表等資料結構有很大不同,我們知道,一個list你是可以反覆迭代多次的。

>>> x = countdown(10)
>>> for v in x:
...     print(v)
... 
Counting down from 10
10
...
1
>>> for v in x:
...     print(v)
... 
>>> #再次迭代不會產生任何值了

看到了嗎,第一輪for結束後,再次迭代就不會產生任何值了,因為已經到StopIteration了。中間產生的值都是按需生成,每次由Python幫我們呼叫next()得到,不會儲存起來。這麼做的好處是體現了惰性求值,即,需要的時候再計算,而不是像list那樣不管你要不要,我一次性全扔記憶體裡。當迭代的序列較大時,生成器/迭代器相比list會顯著減少記憶體的佔用。而且,有時候如果無法預先知道要迭代的上限時,這時就只能用迭代器/生成器來解決了。

我們需要區分可迭代物件(iterable)和迭代器(iterator)。迭代器一定是可迭代物件,但可迭代物件不一定是迭代器,list就是最好的例子。list可迭代,但它不包含next方法,因此根據Python迭代協議,它不是迭代器。可以通過下列方法來判斷物件是否是迭代器。

>>> from collections import Iterator
>>> isinstance(lowerLetter(), Iterator)
True
>>> isinstance([], Iterator)
False
>>> isinstance({}, Iterator)
False

協程(coroutine)

好了,前面扯了那麼多,終於要到正題了。那麼什麼是協程呢?從名字上來看,協表示協作,協程就是互相協作的例程,也正對應了其英文稱謂cooperative routine。 據Donald Knuth所說,其實協程的概念早在1958年就由Melvin Conway提出了,而第一本介紹協程的出版物則在1963年出現。它到底是什麼?協程和我們前面提到的迭代器、生成器有什麼關係嗎?

還記得前面講生成器時給出的例子吧,我們用到了yield這個關鍵字。函式中出現了yield,使得函式不再是普通的函式,而變成了生成器。yield不但可以產生值,還會使得生成器儲存執行上下文後暫停執行,然後在下一次next()時從上次暫停的地方繼續接著執行。正是由於這一特點,使得Python中的生成器具有了協程的部分特徵(可自我暫停,稍後再恢復執行)。而自從Python2.5開始,通過PEP 342 – Coroutines via Enhanced Generators的引入,Python終於可以通過生成器來實現協程了。

Python2.5中為生成器物件增加了send()和close()方法,並且支援了yield表示式。這有什麼用處呢?我們還是先看一個簡單的例子。

簡單的字串匹配, 若傳送過來的字串中包含pattern則打印出來

def grep(pattern):
print("Looking for %s" % pattern)
while True:
    line = (yield)  # yield表示式. 注意和之前例子中yield的寫法做比較
    if pattern in line:
        print(line)

OK,我們來分析一下上面的程式碼。首先,因為出現了yield,grep不再是普通的函數了,呼叫它將產生一個生成器物件,這和我們之前講過的沒有區別。

>>> g = grep("python")
>>> g
<generator object grep at 0x106620cd0>
>>> 

由於g是生成器物件,函式體不會立刻執行。需要先呼叫一次next(),Python2中則是呼叫next()。

>>> g.__next__()
Looking for python

現在函式體開始執行了,然後在yield處暫停,這也和我們之前討論的行為是一致的。
我們再來看看line = (yield),這個時候yield寫在了=的右邊,使其成為了一個yield表示式。可以理解為通過yield得到了某個值然後賦值給了line。那這個值是什麼呢?又是如何通過yield得到這個值的呢?

這裡就是send()方法開始顯現威力的時候了。我們可以對生成器物件g呼叫send方法,傳送資料給它,而傳送的資料就通過yield得到並賦值給了line。

>>> g.send('I like python')
I like python  #匹配了pattern, 打印出來
>>> g.send('this is a test')
>>> # 沒有匹配pattern, 無列印

OK,我們再對grep的整個執行流程來一次梳理。g = grep(“python”)產生了一個生成器物件,此時函式體沒有開始執行。我們先對g呼叫一次next()或者通過g.send(None)來啟動生成器,這時函式體開始執行,到line = (yield)這一句時暫停執行。接下來,通過g.send(“I like python”)傳送資料給g,函式體又恢復了執行,此時傳送過來的字串”I like python”就通過yield賦值給了line,然後繼續執行後面的判斷邏輯,發現匹配到了pattern,打印出了字串。然後繼續while迴圈,又遇到了yield,此時函式的執行再次暫停。如此反覆通過send,函式體就不斷的yield出新值處理一下,然後再次在yield處暫停。

發現什麼了嗎?這裡的grep實際上就和我們的主程式通過send呼叫形成了一種協作式的處理流程。主程式通過呼叫g.send()喚起grep函式體的執行,grep函式體處理完後又在yield處自己暫停執行,等待主程式再次通過send喚起自己,有一種你方唱罷我登場的感覺。這正體現出了協程的特點:任務的執行流可自我掛起,其他程式又可以喚起任務,讓它繼續執行。
如果主程式呼叫了g.close()會如何呢?簡單,那樣的話grep協程就徹底退出了,不再是掛起。

協程的管道式處理

我們再來看一個稍複雜一點的例子。在這個例子裡我們會說明生成器和協程的區別。在給出具體的程式碼前,我們先定義一個幫助函式。

def coroutine(func):
def start(*args, **kwargs):
    cr = func(*args, **kwargs)
    cr.send(None)  # 自動幫我們啟動協程, 讓其在yield處掛起
    return cr
return start

有經驗的同學應該能意識到,這個函式是用來做裝飾器的。還記得之前我們定義好生成器之後,要讓生成器的函式體得到執行必須要先呼叫send(None)或者next()嗎?這是為了先啟動生成器讓它在yield處掛起。而這個啟動的步驟有可能會忘記做,那麼我們就用裝飾器來幫我們自動處理這個啟動的步驟。

接下來定義兩個函式。

import time
def follow(thefile, target):
thefile.seek(0, 2)  # go to the end of the file
while True:
    line = thefile.readline()  # 資料來源,這裡得到文字行
    if not line:
        time.sleep(0.1)
        continue
    target.send(line)           # 傳送給協程target

@coroutine
def printer():
while True:
    line = (yield)
    print(line)

把上面兩個函式組合起來使用

f = open("somefile.txt")
follow(f, printer())

圖片描述
用一個示意圖來表示上面程式碼的流程。實際上我們將follow和printer組成了一個管道。follow是管道的源頭,它負責產生資料,然後通過send把資料發給了協程printer做處理。
還可以在管道中增加協程嗎?當然可以,我們就把前面的grep拿過來改改,再放在管道中。現在變成了這樣:

@coroutine
def grep(pattern,target):
    while True:
        line = (yield)          # 從資料來源得到資料
        if pattern in line:
            target.send(line)   # 自己處理完再發給下一個協程繼續處理



f = open("access-log")
follow(f, grep('python', printer()))

圖片描述
這裡follow仍當做管道的源頭,是資料來源,驅動整個管道的執行。grep協程在這裡起到了過濾的作用,檢視follow發過來的資料中是否包含有字串python,若匹配到了就把資料發給下一個協程printer打印出來,然後自己在yield處掛起等待下一次follow的send呼叫。這裡的follow(),grep()和printer()一起協同工作,實現了檢視log檔案中是否包含有特定字串的功能。當然你還可以繼續擴充套件這個程式完成更多的功能。

看到這裡,熟悉Unix/Linux的同學應該倍感親切啊。這和Unix/Linux的哲學很相似:提供一組簡單的工具,每個工具只處理一種任務,但你可以通過各種不同的組合將它們連在一起處理更復雜的任務。其實這個簡單的例子也可以用更一般化的迭代來處理:

def do_process_in_one_func(thefile, pattern):
thefile.seek(0, 2)  # go to the end of the file
while True:
    line = thefile.readline()
    if not line:
        time.sleep(0.1)
        continue
    else:
        if pattern in line:
            print(line)

我們稍微修改了一下例子,把所有的處理邏輯都放在一個函式裡,通過迭代檔案的每一行來達到相同的目的。只是這樣一來就把讀取檔案行、過濾、列印三種不同的任務都寫在了一起,這其實不利於任務的劃分,而且這個函式也沒法再和其他的工具一起協作了,喪失了靈活性。也就是說,協程其實可以幫助我們劃分程式模組,使得每個任務變得簡單單一,同時也能夠和其他的程式協作,提高複用性。越是複雜的程式,這麼做就越有利。

下面該說說生成器和協程的區別了。到這裡估計很多同學把生成器和協程搞混了,這兩貨也確實很相似,因為都包含有yield。生成器我們可以看做是生產者,它負責產生資料,通常是用來做迭代使用的。而協程是消費者,通過yield接收其他程式send給它的資料然後處理。那協程有沒有可能也是生產者呢?有可能哦,上面例子裡的grep就有雙重身份,它既處理(消費)管道上游發來的資料,而處理完之後又通過呼叫下游協程的send方法把處理過的資料發給下游(生產)。因此協程一定會消費,如果看不到消費而只有生產,那麼是生成器,否則就是協程。好在Python 3.5引入的新關鍵字async/await,徹底解放了我們,這個我們稍後再談。

用協程來模擬任務,實現使用者態執行緒

由於協程具有自我掛起(不能被其他協程搶佔控制權,除非自己放棄)稍後再恢復執行的特質,我們可以利用協程來模擬任務,進而實現使用者態執行緒。為什麼說是使用者態執行緒呢?因為協程的排程執行完全在使用者空間,作業系統核心不感知。也就是說,協程的排程需要由我們自己來控制。相比系統級的執行緒和程序,協程佔用的資源極少,任務的切換也不需要核心來排程,省去了上下文切換的開銷。這樣的特性使得單機建立大量協程成為可能(百萬級)。下面的例子我們就來實現一個簡單的使用者態排程器,並用協程來模擬作業系統中的任務。

from queue import Queue

模擬任務

class Task(object):
taskid = 0
def __init__(self, target):
    Task.taskid += 1
    self.tid = Task.taskid
    self.target = target
    self.sendval = None

def run(self):
    return self.target.send(self.sendval)

排程器類, mainloop方法實現了簡單的eventloop

class Scheduler(object):
def __init__(self):
    self.ready = Queue()
    self.taskmap = {}

def new(self, target):
    newtask = Task(target)
    self.taskmap[newtask.tid] = newtask
    self.schedule(newtask)
    return newtask.tid

def exit(self, task):
    print("Task %d terminated" % task.tid)
    del self.taskmap[task.tid]

def schedule(self, task):
    self.ready.put(task)

def mainloop(self):
    while self.taskmap:
        task = self.ready.get()
        try:
            result = task.run()
            if isinstance(result, SystemCall):
                result.task = task
                result.sched = self
                result.handle()
                continue
        except StopIteration:
            self.exit(task)
            continue
        self.schedult(task)

系統呼叫基類

class SystemCall(object): 
def handle(self):
    pass

獲取任務id的系統呼叫

class GetTid(SystemCall):
def handle(self):
    self.task.sendval = self.task.tid
    self.sched.schedule(self.task)

定義兩個協程, 將作為我們的任務由排程器排程執行

def foo():
mytid = yield GetTid()
print("I am foo", mytid)
yield

def bar():
mytid = yield GetTid()
print("I am bar", mytid)
yield

if __name__ == '__main__':
sched = Scheduler()

迴圈一百萬次, 共建立兩百萬個任務

for task in range(1000000):
    sched.new(foo())
    sched.new(bar())
sched.mainloop()

上面這個程式一共建立了兩百萬個任務,通過我們簡單實現的事件迴圈不斷交錯排程執行(完全沒有用到執行緒,當然除了程式自身的主執行緒之外)。程式大部分時間花在建立任務上了,真正執行的時候是非常快的。看到這裡,有的同學會問了,如果要用協程,那豈不是還要我們自己寫排程程式,而實現一個功能完備的事件迴圈並不是人人都能輕鬆完成的啊?由作業系統排程系統原生的執行緒、程序不是更方便嗎?沒錯,的確是這樣,由於協程完全處於使用者態,要做到互相交錯執行,的確需要把OS的排程功能移到使用者態來完成,好在許多優秀的庫(如gevent,以及Python3.4中加入標準庫的asynio)已經幫我們完成了這些任務,真正要用的時候並不需要我們自己完成,這裡僅僅只是一個簡單的例子用以說明。學從難處學,用從易處用。

總結一下,目前業界用來處理高併發的方案一般有兩種:
1.以Node.js為代表的非同步回撥方案。Python的Twisted網路庫也是同樣的思路。這種方案利用事件迴圈、非阻塞IO和非同步回撥機制。簡單來說就是,每當遇到IO或者其他耗時的操作時,註冊一個回撥到事件迴圈中,這時程式接著幹其他的事情,當IO完成後由事件迴圈回撥我們之前註冊的callback。這種方式讓程式儘可能的執行,而不需要我們自己建立其他執行緒。
這種方式的缺點是容易遇到callback hell,回撥套回撥。因為所有的阻塞操作都必須是非同步的,否則只要有一環阻塞,系統就卡死了。另外就是非同步的方式有些違反人類的思維習慣,人類還是習慣用同步的方式來思考。當然,針對這些問題,現在也已經有了比較好的解決方法,這裡就不多說了。

2.協程。協程似乎天生就適合於處理這類問題(IO密集型,程式大部分時間在等待IO變得可用,但實際處理的任務相對簡單,並不會佔用大量CPU資源)。協程是執行在使用者態的輕量級執行緒,資源佔用極少,因此單機建立大量的協程成為了可能。而且協程的任務切換完全在使用者空間解決,無需作業系統核心干預,大量減少了因為任務切換而產生的排程開銷。

本文僅介紹Python的協程,關於非同步IO以及高併發程式設計等主題,這裡就不適合深入討論了。

Python 3.5對協程的支援

看到這裡的同學應該已經對Python的協程有了較清晰的認識了,但可能還是有一件事覺得不爽。Python語言一向號稱簡潔優美,程式碼可讀性高,可是剛剛講了這麼多,Python的協程功能居然從2.5版本後才可以通過生成器變相實現出來,有時候必須讀程式碼才能搞清楚到底是生成器還是協程(其實在語言層面上Python並不認識什麼協程,我們只是用生成器實現了程式設計概念中的協程。可以說,在Python3.5之前,語言原生不支援協程,我們前面看到的只是利用生成器實現了協程的功能)。如果能像Go語言定義goroutine(可簡單理解為Go語言對協程的實現)那樣簡單就好了,不然怎麼也對不起Python簡潔優美易讀的美譽啊。Python 3.5中新增的關鍵字async/await解決了這個“痛點”。
Python3.5中定義協程的方式

async def native_coroutine():
await awaitableObj

現在好了,凡是由async def定義的函式,在Python 3.5中將被認為是原生協程(雖然在Python直譯器中仍然是利用生成器來實現的)。

>>> async def native_coro():
...     pass
... 
>>> a = native_coro()
>>> a #a現在是一個coroutine物件了!
<coroutine object native_coro at 0x109893678>
>>> 

而由async def定義的協程中將不再支援使用yield了,否則會被認為是語法錯誤。取而代之的是await,await表示式用於獲得協程執行的結果。只是await只可以接受awaitable物件。具體的語法細節和規則請參考PEP 492:Coroutines with async and await syntax,
這裡也有一篇中文翻譯,讀一讀可以瞭解到Python社群為什麼要做出這樣的改變,我這裡就不再贅述了,官方文件永遠是最好的學習資料。

參考資料

歡迎學習Python語言,熱愛交流技術的同學加入我們的CSDN Python學習班。入群請掃下方群二維碼。

目前群已滿,請掃描下面的小助手賬號,備註:Python 申請入群
圖片描述