1. 程式人生 > >生成器和協程乾貨

生成器和協程乾貨

理解生成器

定義生成器 

yield關鍵字,可以讓我們定義一個生成器函式。

def generator_func():
    print('a')
    yield 1

g = generator_func()
print(g)

>>> <generator object generator_func at 0x10e178b88>

推動生成器

使用next函式從生成器中取值

def generator_func():
    print('a')
    yield 1


g = generator_func()
ret1 = next(g)
print(ret1)

>>>
a
1

 

使用next可以推動生成器的執行,下面的程式碼,我們可以看到每一次執行next可以讓generator_func中的程式碼從上一個位置開始繼續執行到yield,並且將yield後面的值返回到函式外部,最終我們可以執行到yield 3。

def generator_func():
    print('a')
    yield 1
    print('b')
    yield 2
    print('c')
    yield 3
    print('d')

g = generator_func()
ret1 = next(g)
print(ret1)
ret2 = next(g)
print(ret2)
ret3 = next(g)
print(ret3)

>>>

  a
  1
  b
  2
  c
  3

當函式中已經沒有更多的yield時繼續執行next(g),遇到StopIteration

def generator_func():
    print('a')
    yield 1
    print('b')
    yield 2
    print('c')
    yield 3
    print('d')

g = generator_func()
ret1 = next(g)
print(ret1)
ret2 = next(g)
print(ret2)
ret3 = next(g)
print(ret3)
next(g)
next和StopIteration

 

send向生成器中傳送資料。send的作用相當於next,只是在驅動生成器繼續執行的同時還可以向生成器中傳遞資料。

import numbers
def cal_sum():
    sum_num = 0
    while True:
        num = yield
        if isinstance(num,numbers.Integral):
            sum_num += num
            print('sum :',sum_num)
        elif num is None:
            break

g = cal_sum()
g.send(None)   # 相當於next(g),預啟用生成器
g.send(31)
g.send(25)
g.send(17)
g.send(8)

>>>
sum : 31
sum : 56
sum : 73
sum : 81

  

生成器中的return和StopIteration

import numbers
def cal_sum():
    sum_num = 0
    while True:
        num = yield
        if isinstance(num,numbers.Integral):
            sum_num += num
            print('sum :',sum_num)
        elif num is None:
            break
    return sum_num

g = cal_sum()
g.send(None)   # 相當於next(g),預啟用生成器
g.send(31)
g.send(25)
g.send(17)
g.send(8)
g.send(None)   # 停止生成器

>>>
sum : 31
sum : 56
sum : 73
sum : 81
Traceback (most recent call last):
  File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 19, in <module>
    g.send(None)
StopIteration: 81
import numbers
def cal_sum():
    sum_num = 0
    while True:
        num = yield
        if isinstance(num,numbers.Integral):
            sum_num += num
            print('sum :',sum_num)
        elif num is None:
            break
    return sum_num

g = cal_sum()
g.send(None)   # 相當於next(g),預啟用生成器
g.send(31)
g.send(25)
g.send(17)
g.send(8)
try:
    g.send(None)   # 停止生成器
except StopIteration as e:
    print(e.value)
異常處理以及獲取return的值

生成器的close和throw

使用throw向生成器中拋一個異常

def throw_test():
    print('a')
    yield 1
    print('b')
    yield 2

g = throw_test()
next(g)
g.throw(Exception,'value error')

>>>
a
Traceback (most recent call last):
File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 32, in <module>
g.throw(ValueError,'value error') # throw和send、next相同,都是驅動生成器繼續執行,只不過throw用來向生成器中拋一個異常
File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 26, in throw_test
yield 1
ValueError: value error
def throw_test():
    print('a')
    try:
        yield 1
    except ValueError:
        pass
    print('b')
    yield 2

g = throw_test()
next(g)
ret = g.throw(ValueError,'value error')  # throw和send、next相同,都是驅動生成器繼續執行,只不過throw用來向生成器中拋一個異常
print(ret)

>>>
a
b
2
throw+異常處理

使用close關閉一個生成器

def throw_test():
    print('a')
    yield 1
    print('b')
    yield 2

g = throw_test()
ret1 = next(g)
print(ret1)
g.close()
next(g)

>>>
a
1
Traceback (most recent call last):
  File "/Users/jingliyang/PycharmProjects/python的進階/manager.py", line 45, in <module>
    next(g)
StopIteration

yield from關鍵字

yield from關鍵字可以直接返回一個生成器

l = ['h','e','l']
dic = {'l':'v1','o':'v2'}
s = 'eva'
def yield_from_gen():
    for i in l:
        yield i
    for j in dic:
        yield j
    for k in s:
        yield k
for item in yield_from_gen():
    print(item,end='')

>>>helloeva

l = ['h','e','l']
dic = {'l':'v1','o':'v2'}
s = 'eva'
def yield_from_gen():
    yield from l
    yield from dic
    yield from s
for item in yield_from_gen():
    print(item,end='')

>>>helloeva
from itertools import chain
l = ['h','e','l']
dic = {'l':'v1','o':'v2'}
s = 'eva'
def yield_from_gen():
    yield from chain(l,dic,s)

for item in yield_from_gen():
    print(item,end='')
chain和yield from

利用yield from完成股票的計算,yield from能夠完成一個委派生成器的作用,在子生成器和呼叫者之間建立起一個雙向通道。

def son_gen():
    avg_num = 0
    sum_num = 0
    count = 1
    while True:
        num = yield avg_num
        if num:
            sum_num += num
            avg_num = sum_num/count
            count += 1
        else:break
    return avg_num

def depute_gen(key):
    while True:
        ret = yield from son_gen()
        print(key,ret)

def main():
    shares_list= {
        'sogou':[6.4,6.5,6.6,6.2,6.1,6.6,6.7],
        'alibaba':[181.72,184.58,183.54,180,88,169.88,178.21,189.32],
        '美團':[59.7,52.6,47.2,55.4,60.7,66.1,74.0]
    }
    for key in shares_list:
        g = depute_gen(key)
        next(g)
        for v in shares_list[key]:
            g.send(v)
        g.send(None)

main()

協程

概念

  根據維基百科給出的定義,“協程 是為非搶佔式多工產生子程式的計算機程式元件,協程允許不同入口點在不同位置暫停或開始執行程式”。從技術的角度來說,“協程就是你可以暫停執行的函式”。如果你把它理解成“就像生成器一樣”,那麼你就想對了。

使用yield實現協程

#基於yield實現非同步
import time
def consumer():
    '''任務1:接收資料,處理資料'''
    while True:
        x=yield

def producer():
    '''任務2:生產資料'''
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

producer()

 

使用yield from實現的協程

import datetime
import heapq    # 堆模組
import types
import time


class Task:
    def __init__(self, wait_until, coro):
        self.coro = coro
        self.waiting_until = wait_until

    def __eq__(self, other):
        return self.waiting_until == other.waiting_until

    def __lt__(self, other):
        return self.waiting_until < other.waiting_until


class SleepingLoop:

    def __init__(self, *coros):
        self._new = coros
        self._waiting = []

    def run_until_complete(self):
        for coro in self._new:
            wait_for = coro.send(None)
            heapq.heappush(self._waiting, Task(wait_for, coro))
        while self._waiting:
            now = datetime.datetime.now()
            task = heapq.heappop(self._waiting)
            if now < task.waiting_until:
                delta = task.waiting_until - now
                time.sleep(delta.total_seconds())
                now = datetime.datetime.now()
            try:
                print('*'*50)
                wait_until = task.coro.send(now)
                print('-'*50)
                heapq.heappush(self._waiting, Task(wait_until, task.coro))
            except StopIteration:
                pass

def sleep(seconds): now = datetime.datetime.now() wait_until = now + datetime.timedelta(seconds=seconds) print('before yield wait_until') actual = yield wait_until # 返回一個datetime資料型別的時間 print('after yield wait_until') return actual - now def countdown(label, length, *, delay=0): print(label, 'waiting', delay, 'seconds before starting countdown') delta = yield from sleep(delay) print(label, 'starting after waiting', delta) while length: print(label, 'T-minus', length) waited = yield from sleep(1) length -= 1 print(label, 'lift-off!') def main(): loop = SleepingLoop(countdown('A', 5), countdown('B', 3, delay=2), countdown('C', 4, delay=1)) start = datetime.datetime.now() loop.run_until_complete() print('Total elapsed time is', datetime.datetime.now() - start) if __name__ == '__main__': main()

await和async關鍵字

使用 async function 可以定義一個 非同步函式,在async關鍵字定義的函式中不能出現yield和yield from

# 例1
async def download(url):   # 加入新的關鍵字 async ,可以將任何一個普通函式變成協程
    return 'eva'

ret = download('http://www.baidu.com/')
print(ret)  # <coroutine object download at 0x108b3a5c8>
ret.send(None)  # StopIteration: eva

# 例2
async def download(url):
    return 'eva'
def run(coroutine):
    try:
        coroutine.send(None)
    except StopIteration as e:
        return e.value

coro = download('http://www.baidu.com/')
ret = run(coro)
print(ret)

async關鍵字不能和yield一起使用,引入coroutine裝飾器來裝飾downloader生成器。

await 操作符後面必須跟一個awaitable物件(通常用於等待一個會有io操作的任務, 它只能在非同步函式 async function 內部使用

# 例3
import types

@types.coroutine      # 將一個生成器變成一個awaitable的物件
def downloader(url):
    yield 'aaa'


async def download_url(url):   # 協程
    waitable = downloader(url)
    print(waitable)   # <generator object downloader at 0x1091e2c78>     生成器
    html = await waitable
    return html

coro = download_url('http://www.baidu.com')
print(coro)                # <coroutine object download_url at 0x1091c9d48>
ret = coro.send(None)
print(ret)

 

asyncio模組

asyncio是Python 3.4版本引入的標準庫,直接內建了對非同步IO的支援。

asyncio的程式設計模型就是一個訊息迴圈。我們從asyncio模組中直接獲取一個EventLoop的引用,然後把需要執行的協程扔到EventLoop中執行,就實現了非同步IO。

coroutine+yield from
import asyncio

@asyncio.coroutine
def hello():
    print("Hello world!")
    # 非同步呼叫asyncio.sleep(1):
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()
async+await
import asyncio

async def hello():
    print("Hello world!")
    # 非同步呼叫asyncio.sleep(1):
    r = await asyncio.sleep(1)
    print("Hello again!")

# 獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行coroutine
loop.run_until_complete(hello())
loop.close()

 

執行多個任務

import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again!")
    return 'done'

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([hello(),hello()]))
loop.close()

 

獲取返回值

import asyncio

async def hello():
    print("Hello world!")
    await asyncio.sleep(1)
    print("Hello again!")
    return 'done'

loop = asyncio.get_event_loop()
task = loop.create_task(hello())
loop.run_until_complete(task)
ret = task.result()
print(ret)

 

執行多個任務獲取返回值

import asyncio

async def hello(i):
    print("Hello world!")
    await asyncio.sleep(i)
    print("Hello again!")
    return 'done',i

loop = asyncio.get_event_loop()
task1 = loop.create_task(hello(2))
task2 = loop.create_task(hello(1))
task_l = [task1,task2]
tasks = asyncio.wait(task_l)
loop.run_until_complete(tasks)
for t in task_l:
    print(t.result())

 

執行多個任務按照返回的順序獲取返回值

import asyncio

async def hello(i):
    print("Hello world!")
    await asyncio.sleep(i)
    print("Hello again!")
    return 'done',i

async def main():
    tasks = []
    for i in range(20):
        tasks.append(asyncio.ensure_future(hello((20-i)/10)))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

 

asyncio使用協程完成http訪問

import asyncio

async def get_url():
    reader,writer = await asyncio.open_connection('www.baidu.com',80)
    writer.write(b'GET / HTTP/1.1\r\nHOST:www.baidu.com\r\nConnection:close\r\n\r\n')
    all_lines = []
    async for line in reader:
        data = line.decode()
        all_lines.append(data)
    html = '\n'.join(all_lines)
    return html

async def main():
    tasks = []
    for url in range(20):
        tasks.append(asyncio.ensure_future(get_url()))
    for res in asyncio.as_completed(tasks):
        result = await res
        print(result)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())  # 處理一個任務
    loop.run_until_complete(asyncio.wait([main()]))  # 處理多個任務

    task = loop.create_task(main())  # 使用create_task獲取返回值
    loop.run_until_complete(task)
    loop.run_until_complete(asyncio.wait([task]))

 

gevent模組實現協程

http://www.cnblogs.com/Eva-J/articles/8324673.html

 

    &nbs