1. 程式人生 > >我說帶你五分鐘學會Python協程就五分鐘!不會超過六分鐘!

我說帶你五分鐘學會Python協程就五分鐘!不會超過六分鐘!

協程由於由程式主動控制切換,沒有執行緒切換的開銷,所以執行效率極高。對於IO密集型任務非常適用,如果是cpu密集型,推薦多程序+協程的方式。

在Python3.4之前,官方沒有對協程的支援,存在一些三方庫的實現,比如gevent和Tornado。3.4之後就內建了asyncio標準庫,官方真正實現了協程這一特性。

而Python對協程的支援,是通過Generator實現的,協程是遵循某些規則的生成器。因此,我們在瞭解協程之前,我們先要學習生成器。

def test():

print("generator start")

n = 1

while True:

yield_expression_value = yield n

print("yield_expression_value = %d" % yield_expression_value)

n += 1

# ①建立generator物件

generator = test()

print(type(generator))

print("---------------")

# ②啟動generator

next_result = generator.__next__()

print("next_result = %d" % next_result)

print("---------------")

# ③傳送值給yield表示式

send_result = generator.send(666)

print("send_result = %d" % send_result)

執行結果:

---------------

generator start

next_result = 1

---------------

yield_expression_value = 666

send_result = 2

理解這個demo的關鍵是:生成器啟動或恢復執行一次,將會在yield處暫停。上面的第②步僅僅執行到了yield n,並沒有執行到賦值語句,到了第③步,生成器恢復執行才給yield_expression_value賦值。

生產者和消費者模型

進群:943752371 即可獲取數十套PDF!

上面的例子中,程式碼中斷-->切換執行,體現出了協程的部分特點。

我們再舉一個生產者、消費者的例子,

傳統的生產者-消費者模型是一個執行緒寫訊息,一個執行緒取訊息,通過鎖機制控制佇列和等待,但一不小心就可能死鎖。

現在改用協程,生產者生產訊息後,直接通過yield跳轉到消費者開始執行,待消費者執行完畢後,切換回生產者繼續生產,效率極高。

def consumer():

print("[CONSUMER] start")

r = 'start'

while True:

n = yield r

if not n:

print("n is empty")

continue

print("[CONSUMER] Consumer is consuming %s" % n)

r = "200 ok"

def producer(c):

# 啟動generator

start_value = c.send(None)

print(start_value)

n = 0

while n < 3:

n += 1

print("[PRODUCER] Producer is producing %d" % n)

r = c.send(n)

print('[PRODUCER] Consumer return: %s' % r)

# 關閉generator

c.close()

# 建立生成器

c = consumer()

# 傳入generator

producer(c)

執行結果:

[CONSUMER] start

start

[PRODUCER] producer is producing 1

[CONSUMER] consumer is consuming 1

[PRODUCER] Consumer return: 200 ok

[PRODUCER] producer is producing 2

[CONSUMER] consumer is consuming 2

[PRODUCER] Consumer return: 200 ok

[PRODUCER] producer is producing 3

[CONSUMER] consumer is consuming 3

[PRODUCER] Consumer return: 200 ok

注意到consumer函式是一個generator,把一個consumer傳入produce後:

1.首先呼叫c.send(None)啟動生成器;

2.然後,一旦生產了東西,通過c.send(n)切換到consumer執行;

3.consumer通過yield拿到訊息,處理,又通過yield把結果傳回;

4.produce拿到consumer處理的結果,繼續生產下一條訊息;

5.produce決定不生產了,通過c.close()關閉consumer,整個過程結束。

整個流程無鎖,由一個執行緒執行,produce和consumer協作完成任務,所以稱為“協程”,而非執行緒的搶佔式多工。

yield from表示式

Python3.3版本新增yield

from語法,新語法用於將一個生成器部分操作委託給另一個生成器。此外,允許子生成器(即yield

from後的“引數”)返回一個值,該值可供委派生成器(即包含yield from的生成器)使用。並且在委派生成器中,可對子生成器進行優化。

我們先來看最簡單的應用,例如:

# 子生成器

def test(n):

i = 0

while i < n:

yield i

i += 1

# 委派生成器

def test_yield_from(n):

print("test_yield_from start")

yield from test(n)

print("test_yield_from end")

for i in test_yield_from(3):

print(i)

輸出:

test_yield_from start

0

1

2

test_yield_from end

這裡我們僅僅給這個生成器添加了一些列印,如果是正式的程式碼中,你可以新增正常的執行邏輯。

如果上面的test_yield_from函式中有兩個yield from語句,將序列執行。比如將上面的test_yield_from函式改寫成這樣:

def test_yield_from(n):

print("test_yield_from start")

yield from test(n)

print("test_yield_from doing")

yield from test(n)

print("test_yield_from end")

將輸出:

test_yield_from start

0

1

2

test_yield_from doing

0

1

2

test_yield_from end

在這裡,yield from起到的作用相當於下面寫法的簡寫形式

for item in test(n):

yield item

看起來這個yield

from也沒做什麼大不了的事,其實它還幫我們處理了異常之類的。具體可以看stackoverflow上的這個問題:In practice,

what are the main uses for the new “yield from” syntax in Python 3.3?

協程(Coroutine)

Python3.4開始,新增了asyncio相關的API,語法使用@asyncio.coroutine和yield from實現協程

Python3.5中引入async/await語法,參見PEP492

我們先來看Python3.4的實現。

@asyncio.coroutine

Python3.4中,使用@asyncio.coroutine裝飾的函式稱為協程。不過沒有從語法層面進行嚴格約束。

對於Python原生支援的協程來說,Python對協程和生成器做了一些區分,便於消除這兩個不同但相關的概念的歧義:

標記了@asyncio.coroutine裝飾器的函式稱為協程函式,iscoroutinefunction()方法返回True

呼叫協程函式返回的物件稱為協程物件,iscoroutine()函式返回True

舉個栗子,我們給上面yield from的demo中新增@asyncio.coroutine:

import asyncio

...

@asyncio.coroutine

def test_yield_from(n):

...

# 是否是協程函式

print(asyncio.iscoroutinefunction(test_yield_from))

# 是否是協程物件

print(asyncio.iscoroutine(test_yield_from(3)))

毫無疑問輸出結果是True。

可以看下@asyncio.coroutine的原始碼中檢視其做了什麼,我將其原始碼簡化下,大致如下:

import functools

import types

import inspect

def coroutine(func):

# 判斷是否是生成器

if inspect.isgeneratorfunction(func):

coro = func

else:

# 將普通函式變成generator

@functools.wraps(func)

def coro(*args, **kw):

res = func(*args, **kw)

res = yield from res

return res

# 將generator轉換成coroutine

wrapper = types.coroutine(coro)

# For iscoroutinefunction().

wrapper._is_coroutine = True

return wrapper

將這個裝飾器標記在一個生成器上,就會將其轉換成coroutine。

然後,我們來實際使用下@asyncio.coroutine和yield from:

import asyncio

@asyncio.coroutine

def compute(x, y):

print("Compute %s + %s ..." % (x, y))

yield from asyncio.sleep(1.0)

return x + y

@asyncio.coroutine

def print_sum(x, y):

result = yield from compute(x, y)

print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()

print("start")

# 中斷呼叫,直到協程執行結束

loop.run_until_complete(print_sum(1, 2))

print("end")

loop.close()

執行結果:

start

Compute 1 + 2 ...

1 + 2 = 3

end

print_sum這個協程中呼叫了子協程compute,它將等待compute執行結束才返回結果。

這個demo點呼叫流程如下圖:

EventLoop將會把print_sum封裝成Task物件

流程圖展示了這個demo的控制流程,不過沒有展示其全部細節。比如其中“暫停”的1s,實際上建立了一個future物件, 然後通過BaseEventLoop.call_later()在1s後喚醒這個任務。

值得注意的是,@asyncio.coroutine將在Python3.10版本中移除。

async/await

Python3.5開始引入async/await語法(PEP 492),用來簡化協程的使用並且便於理解。

async/await實際上只是@asyncio.coroutine和yield from的語法糖:

把@asyncio.coroutine替換為async

把yield from替換為await

即可。

比如上面的例子:

import asyncio

async def compute(x, y):

print("Compute %s + %s ..." % (x, y))

await asyncio.sleep(1.0)

return x + y

async def print_sum(x, y):

result = await compute(x, y)

print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()

print("start")

loop.run_until_complete(print_sum(1, 2))

print("end")

loop.close()

我們再來看一個asyncio中Future的例子:

import asyncio

future = asyncio.Future()

async def coro1():

print("wait 1 second")

await asyncio.sleep(1)

print("set_result")

future.set_result('data')

async def coro2():

result = await future

print(result)

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait([

coro1()

coro2()

]))

loop.close()

輸出結果:

wait 1 second

(大約等待1秒)

set_result

data

這裡await後面跟隨的future物件,協程中yield from或者await後面可以呼叫future物件,其作用是:暫停協程,直到future執行結束或者返回result或丟擲異常。

而在我們的例子中,await future必須要等待future.set_result('data')後才能夠結束。將coro2()作為第二個協程可能體現得不夠明顯,可以將協程的呼叫改成這樣:

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait([

# coro1(),

coro2(),

coro1()

]))

loop.close()

輸出的結果仍舊與上面相同。

其實,async這個關鍵字的用法不止能用在函式上,還有async with非同步上下文管理器,async for非同步迭代器. 對這些感興趣且覺得有用的可以網上找找資料,這裡限於篇幅就不過多展開了。