1. 程式人生 > >【Python】【五】【asyncio】

【Python】【五】【asyncio】

ces __name__ har pyc self. target mes 頻率 設定

# -*- coding:utf-8 -*-
"""
#18.1 線程&協程
#栗子18-1 threading
import sys
import time
import itertools
import threading

class Signal:
go = True

def spin(msg, signal):
write, flush = sys.stdout.write, sys.stdout.flush
for char in itertools.cycle(‘|/-\\‘):
status = char + ‘ ‘ + msg
write(status)
flush()
write(‘\x08‘ * len(status))
time.sleep(.1)
if not signal.go:
break
write(‘ ‘* len(status) + ‘\x08‘*len(status))

def slow_function():
time.sleep(1)
return 42



def supervisor():
signal = Signal()
spinner = threading.Thread(target=spin,args=(‘thinking!‘,signal))
print(‘spinner object:‘,spinner)
spinner.start()
result = slow_function()
signal.go = False
spinner.join()
return result

def main():
result = supervisor()
print(‘Answer:‘,result)
if __name__ == ‘__main__‘:
main()


‘‘‘
spinner object: <Thread(Thread-1, initial)>
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
Answer: 42
‘‘‘


#栗子18-2 asyncio 實現

import asyncio
import sys
import itertools

@asyncio.coroutine
def spin(msg):
write,flush = sys.stdout.write,sys.stdout.flush
for char in itertools.cycle(‘|/-\\‘):
status = char + ‘ ‘ + msg
write(status)
flush()
write(‘\x08‘*len(status)) #這是顯示文本式動畫的訣竅所在:使用退格符(\x08)把光標移回來
try:
yield from asyncio.sleep(.1)
except asyncio.CancelledError:
break
write(‘ ‘*len(status) + ‘\x08‘*len(status)) #使用空格清除狀態消息,把光標移回開頭
@asyncio.coroutine
def slow_function():
# 假裝等到I/O一段時間
yield from asyncio.sleep(1) #yield from asyncio.sleep(3) 表達式把控制權交給主循環,在休眠結束後恢復這個協程
return 42
@asyncio.coroutine
def supervisor():
spinner = asyncio.async(spin(‘thinking!‘))
print(‘spinner object:‘,spinner)
result = yield from slow_function() #驅動 slow_function() 函數。結束後,獲取返回值。同時,事件循環繼續運行,因為slow_function 函數最後使用 yield from asyncio.sleep(3) 表達式把控制權交回給了主循環。
spinner.cancel()
return result

def main():
loop = asyncio.get_event_loop() #獲取事件循環的引用。
result = loop.run_until_complete(supervisor()) #驅動 supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值
loop.close()
print(‘Answer :‘,result)
if __name__ == ‘__main__‘:
main()
‘‘‘
spinner object: <Task pending coro=<spin() running at C:/Users/wangxue1/PycharmProjects/fluentPython/kongzhiliucheng/asyncio/__init__.py:69>>
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
- thinking!
\ thinking!
| thinking!
/ thinking!
Answer : 42
‘‘‘


‘‘‘
#【比較】
這兩種 supervisor 實現之間的主要區別概述如下。
asyncio.Task 對象差不多與 threading.Thread 對象等效。 Victor Stinner(本章的
特約技術審校)指出, “Task 對象像是實現協作式多任務的庫(例如 gevent)中的
綠色線程(green thread) ”。
Task 對象用於驅動協程, Thread 對象用於調用可調用的對象。
Task 對象不由自己動手實例化,而是通過把協程傳給 asyncio.async(...) 函數或
loop.create_task(...) 方法獲取。
獲取的 Task 對象已經排定了運行時間(例如,由 asyncio.async 函數排
定); Thread 實例則必須調用 start 方法,明確告知讓它運行。
在線程版 supervisor 函數中, slow_function 函數是普通的函數,直接由線程調
用。在異步版 supervisor 函數中, slow_function 函數是協程,由 yield from
驅動。
沒有 API 能從外部終止線程,因為線程隨時可能被中斷,導致系統處於無效狀態。
如果想終止任務,可以使用 Task.cancel() 實例方法,在協程內部拋出
CancelledError 異常。協程可以在暫停的 yield 處捕獲這個異常,處理終止請
求。
supervisor 協程必須在 main 函數中由 loop.run_until_complete 方法執行。
上述比較應該能幫助你理解,與更熟悉的 threading 模型相比, asyncio 是如何編排並
發作業的。
線程與協程之間的比較還有最後一點要說明:如果使用線程做過重要的編程,你就知道寫
出程序有多麽困難,因為調度程序任何時候都能中斷線程。必須記住保留鎖,去保護程序
中的重要部分,防止多步操作在執行的過程中中斷,防止數據處於無效狀態。
而協程默認會做好全方位保護,以防止中斷。我們必須顯式產出才能讓程序的余下部分運
行。對協程來說,無需保留鎖,在多個線程之間同步操作,協程自身就會同步,因為在任
意時刻只有一個協程運行。想交出控制權時,可以使用 yield 或 yield from 把控制權
交還調度程序。這就是能夠安全地取消協程的原因:按照定義,協程只能在暫停的 yield
處取消,因此可以處理 CancelledError 異常,執行清理操作
‘‘‘



#18.1.1 故意不阻塞
‘‘‘
asyncio.Future 類與 concurrent.futures.Future 類的接口基本一致,不過實現方
式不同,不可以互換。 “PEP 3156—Asynchronous IO Support Rebooted:
the‘asyncio’Module”(https://www.python.org/dev/peps/pep-3156/)對這個不幸狀況是這樣說
的:
未來可能會統一 asyncio.Future 和 concurrent.futures.Future 類實現的期物
(例如,為後者添加兼容 yield from 的 __iter__ 方法)。

總之,因為 asyncio.Future 類的目的是與 yield from 一起使用,所以通常不需要使
用以下方法。
無需調用 my_future.add_done_callback(...),因為可以直接把想在期物運行結
束後執行的操作放在協程中 yield from my_future 表達式的後面。這是協程的一
大優勢:協程是可以暫停和恢復的函數。
無需調用 my_future.result(),因為 yield from 從期物中產出的值就是結果
(例如, result = yield from my_future)
‘‘‘




#18.2 使用asyncio和aiohttp下載

import os
import sys
import time

import requests

import asyncio
import aiohttp

BASE_URL = ‘http://images.cnblogs.com/cnblogs_com/suren2017/1102909‘

POP20_CC = ‘T_JINGSE2 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20‘.split()

DEST_DIR = ‘downloads‘

MAX_WORKERS = 20
def save_flag(img,filename):
path = os.path.join(sys.path[0],DEST_DIR,filename)
path = path.replace(‘\\‘,‘/‘)
with open(path,‘wb‘) as fp:
fp.write(img)

@asyncio.coroutine
def get_flag(cc):
url = ‘{}/{cc}.PNG‘.format(BASE_URL,cc=cc.lower())
resp = yield from aiohttp.request(‘GET‘,url)
image = yield from resp.read()
return image

def show(text):
print(text,end=‘ ‘)
sys.stdout.flush()

@asyncio.coroutine
def download_one(cc):
image = yield from get_flag(cc)
show(cc)
save_flag(image,cc.lower() + ‘.PNG‘)
return cc


def download_many(cc_list):
loop = asyncio.get_event_loop()
to_do = [download_one(cc) for cc in sorted(cc_list)]
wait_coro = asyncio.wait(to_do) #雖然函數的名稱是 wait,但它不是阻塞型函數。 wait 是一個協程,等傳給它的所有協程運行完畢後結束
‘‘‘
asyncio.wait(...) 協程的參數是一個由期物或協程構成的可叠代對象; wait 會分別
把各個協程包裝進一個 Task 對象。最終的結果是, wait 處理的所有對象都通過某種方
式變成 Future 類的實例。 wait 是協程函數,因此返回的是一個協程或生成器對
象; wait_coro 變量中存儲的正是這種對象。為了驅動協程,我們把協程傳給
loop.run_until_complete(...) 方法
‘‘‘
res,_ = loop.run_until_complete(wait_coro) #執行事件循環,直到 wait_coro 運行結束;事件循環運行的過程中,這個腳本會在這裏阻塞。我們忽略 run_until_complete 方法返回的第二個元素
‘‘‘
loop.run_until_complete 方法的參數是一個期物或協程。如果是協
程, run_until_complete 方法與 wait 函數一樣,把協程包裝進一個 Task 對象中。協
程、期物和任務都能由 yield from 驅動,這正是 run_until_complete 方法對 wait
函數返回的 wait_coro 對象所做的事。 wait_coro 運行結束後返回一個元組,第一個元
素是一系列結束的期物,第二個元素是一系列未結束的期物。在示例 18-5 中,第二個元
素始終為空,因此我們把它賦值給 _,將其忽略。但是 wait 函數有兩個關鍵字參數,如
果設定了可能會返回未結束的期物;這兩個參數是 timeout 和 return_when
‘‘‘
loop.close()
return len(res)

def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = ‘\n{} flags downloaded in {:.2f}s‘
print(msg.format(count,elapsed))

if __name__ == ‘__main__‘:
main(download_many) #19 flags downloaded in 0.25s
‘‘‘
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038C9470>
t_jingse7 t_jingse11 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388F128>
T_JINGSE4 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003877BE0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E8D0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388FE48>
t_jingse8 T_jingse17 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BC7B8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003872C88>
t_jingse6 t_jingse10 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BCBE0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388F5F8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000388FA20>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B0B00>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E3C8>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x000000000387E390>
T_JINGSE3 t_jingse9 T_JINGSE5 t_jingse13 T_jingse20 T_jingse16 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x0000000003868F28>
T_JINGSE2 t_jingse12 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B02B0>
Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038BC390>
T_jingse19 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038C9048>
T_jingse15 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B06D8>
T_jingse18 Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x00000000038B0F28>
T_jingse14
19 flags downloaded in 0.45s
‘‘‘
#【小結】
‘‘‘
使用 asyncio 包時,我們編寫的異步代碼中包含由 asyncio 本身驅動的
協程(即委派生成器),而生成器最終把職責委托給 asyncio 包或第三方庫(如
aiohttp)中的協程。這種處理方式相當於架起了管道,讓 asyncio 事件循環(通過我
們編寫的協程)驅動執行低層異步 I/O 操作的庫函數
‘‘‘





‘‘‘
18.3 避免阻塞型調用
Ryan Dahl(Node.js 的發明者)在介紹他的項目背後的哲學時說: “我們處理 I/O 的方式徹
底錯了。 ” 他把執行硬盤或網絡 I/O 操作的函數定義為阻塞型函數,主張不能像對待非
阻塞型函數那樣對待阻塞型函數。為了說明原因,他展示了表 18-1 中的前兩列。
“Introduction to Node.js”(https://www.youtube.com/watch?v=M-sc73Y-zQA)視頻 4:55 處。
表18-1:使用現代的電腦從不同的存儲介質中讀取數據的延遲情況;第三欄按比例換
算成具體的時間,便於人類理解
存儲介質 CPU 周期 按比例換算成“人類時間”
L1 緩存 3 3 秒
L2 緩存 14 14 秒
RAM 250 250 秒
硬盤 41 000 000 1.3 年
網絡 240 000 000 7.6 年
為了理解表 18-1,請記住一點:現代的 CPU 擁有 GHz 數量級的時鐘頻率,每秒鐘能運行
幾十億個周期。假設 CPU 每秒正好運行十億個周期,那麽 CPU 可以在一秒鐘內讀取 L1
緩存 333 333 333 次,讀取網絡 4 次(只有 4 次)。表 18-1 中的第三欄是拿第二欄中的各
個值乘以固定的因子得到的。因此,在另一個世界中,如果讀取 L1 緩存要用 3 秒,那麽
讀取網絡要用 7.6 年!
有兩種方法能避免阻塞型調用中止整個應用程序的進程:
在單獨的線程中運行各個阻塞型操作
把每個阻塞型操作轉換成非阻塞的異步調用使用
‘‘‘

#18.4 改進asyncio下載腳本
#示例 18-7 flags2_asyncio.py:腳本的前半部分;余下的代碼在示例 18-8 中
from enum import Enum
HTTPStatus = Enum(‘Status‘, ‘ok not_found error‘)
import collections
from collections import namedtuple
Result = namedtuple(‘Result‘,‘status cc‘)

import os
import sys
import time

import requests

BASE_URL = ‘http://images.cnblogs.com/cnblogs_com/suren2017/1102909‘

POP20_CC = ‘T_JINGSE200 T_JINGSE3 T_JINGSE4 T_JINGSE5 t_jingse6 t_jingse7 t_jingse8 t_jingse9 t_jingse10 t_jingse11 t_jingse12 t_jingse13 T_jingse14 T_jingse15 T_jingse16 T_jingse17 T_jingse18 T_jingse19 T_jingse20‘.split()

DEST_DIR = ‘downloads‘

MAX_WORKERS = 20

import asyncio
import aiohttp
from aiohttp import web
import tqdm

DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000

class FetchError(Exception):
def __init__(self,country_code):
self.country_code = country_code


def save_flag(img,filename):
path = os.path.join(sys.path[0],DEST_DIR,filename)
path = path.replace(‘\\‘,‘/‘)
with open(path,‘wb‘) as fp:
fp.write(img)

@asyncio.coroutine
def get_flag(cc):
url = ‘{}/{cc}.PNG‘.format(BASE_URL,cc=cc.lower())
#resp = yield from aiohttp.request(‘GET‘,url)
resp = yield from aiohttp.ClientSession().get(url)
if ‘200‘ in resp.text:
image = yield from resp.read()
return image
elif ‘404‘ in resp.text:
raise web.HTTPNotFound()
else:
raise aiohttp.HttpProcessingError(code=resp.status,message=resp.reason,headers=resp.headers)


@asyncio.coroutine
def download_one(cc,semaphore,verbose):
try:
with (yield from semaphore): #在 yield from 表達式中把 semaphore 當成上下文管理器使用,防止阻塞整個系統:如果 semaphore 計數器的值是所允許的最大值,只有這個協程會阻塞。
image = yield from get_flag(cc)
except web.HTTPNotFound as exc:
status = HTTPStatus.not_found
msg = ‘not found‘
res = exc.response
res.status_code = 404
res.reason = ‘NOT FOUND‘
raise
except Exception as exc:
raise FetchError(cc) from exc #引入的raise X from Y 句法鏈接原來的異常
else:
save_flag(image,cc.lower() + ‘.PNG‘)
status = HTTPStatus.ok
msg = ‘OK‘

if verbose and msg: #如果在命令行中設定了 -v/--verbose 選項,顯示國家代碼和狀態消息;這就是詳細模式中看到的進度信息
print(cc,msg)
return Result(status,cc)


from tqdm import tqdm
@asyncio.coroutine
def download_coro(cc_list,verbose,concur_req):
counter = collections.Counter()
semaphore = asyncio.Semaphore(concur_req)
to_do = [download_one(cc,semaphore,verbose) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
if not verbose:
to_do_iter = tqdm(to_do_iter,total=len(cc_list))
for future in to_do_iter:
try:
res = yield from future
except FetchError as exc:
country_code = exc.country_code
try:
error_msg = exc.__cause__.args[0]
except IndexError:
error_msg = exc.__cause__.__class__.__name__
if verbose and error_msg:
msg = ‘*** Error for {}: {}‘
print(msg.format(country_code,error_msg))
status = HTTPStatus.error
else:
status = res.status
counter[status] += 1

return counter

def download_many(cc_list,verbose,concur_req):
loop = asyncio.get_event_loop()
coro = download_coro(cc_list,verbose,concur_req) #download_many 函數只是實例化 downloader_coro 協程,然後通過run_until_complete 方法把它傳給事件循環
counts = loop.run_until_complete(coro)
loop.close()
if loop.is_closed():
sys.exit(0)
return counts

def main():
t0 = time.time()
count = download_many(POP20_CC,verbose=False,concur_req=2)
elapsed = time.time() - t0
msg = ‘\n{} flags downloaded in {:.2f}s‘
print(msg.format(count,elapsed))

if __name__ == ‘__main__‘:
main()



#自己栗子1
import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)

print(‘TIME: ‘,now() - start)



#自己栗子2
import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print(‘TIME: ‘,now() - start)
‘‘‘
<Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None>
TIME: 0.0010001659393310547

‘‘‘



#自己栗子3
‘‘‘
協程對象不能直接運行,在註冊事件循環的時候,其實是run_until_complete方法將協程包裝成為了一個任務(task)對象。所謂task對象是Future類的子類。保存了協程運行後的狀態,用於未來獲取協程的結果‘‘‘

import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)

start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
print(task)
loop.run_until_complete(task)
print(task)
print(‘TIME: ‘,now() - start)
‘‘‘
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以創建一個task,run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,task是Future的子類。isinstance(task, asyncio.Future)將會輸出True
‘‘‘
print(isinstance(task,asyncio.Future))
‘‘‘
<Task pending coro=<do_some_work() running at C:\Python36\lib\asyncio\coroutines.py:208>>
Waiting: 2
<Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=None>
TIME: 0.0009999275207519531
True
‘‘‘


#自己栗子4 :綁定回調

import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)
return ‘Done after {}s‘.format(x)

def callback(future):
print(‘Result: ‘,future)


start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)
loop.run_until_complete(task)

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting: 2
Result: <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=‘Done after 2s‘>
TIME: 0.002000093460083008
‘‘‘

#自己栗子5:綁定回調 ,如回調需要多個參數


import asyncio
import time

now = lambda : time.time()

@asyncio.coroutine
def do_some_work(x):
print(‘Waiting: ‘,x)
return ‘Done after {}s‘.format(x)

def callback(t,future):
print(‘Result: ‘,t,future)


start = now()

coroutine = do_some_work(2)

loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
import functools
task.add_done_callback(functools.partial(callback,2))
loop.run_until_complete(task)

print(‘TIME: ‘,now() - start)

‘‘‘
Waiting: 2
Result: 2 <Task finished coro=<do_some_work() done, defined at C:\Python36\lib\asyncio\coroutines.py:208> result=‘Done after 2s‘>
TIME: 0.002000093460083008
‘‘‘


#自己栗子6: future 和 result 。回調一致是很多異步編程的噩夢,程序員更喜歡用同步的編寫方式寫異步代碼

import asyncio
import time

now = lambda : time.time()


async def do_some_work(x):
print(‘Waiting {}‘.format(x))
return ‘Done after {}s‘.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print(‘Task result:{}‘.format(task.result))
print(‘TIME: {}‘.format(now() - start))


‘‘‘
Waiting 2
Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73AE8>
TIME: 0.002000093460083008
‘‘‘


#自己栗子7: 阻塞和await

import asyncio
import time

now = lambda : time.time()


async def do_some_work(x):
print(‘Waiting {}‘.format(x))
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

start = now()

coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
task = asyncio.ensure_future(coroutine)
loop.run_until_complete(task)

print(‘Task result:{}‘.format(task.result))
print(‘TIME: {}‘.format(now() - start))

‘‘‘
Waiting 2
Task result:<built-in method result of _asyncio.Task object at 0x0000000002F73A60>
TIME: 2.001114845275879
‘‘‘


#自己栗子8:並發&並行
#每當有阻塞任務時候就用await

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print(‘Task result: ‘,task.result())

print(‘Time: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
Time: 3.9912283420562744
‘‘‘


#自己栗子9 協程嵌套 [一] dones, pendings = await asyncio.wait(tasks)



import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

dones,pendings = await asyncio.wait(tasks)

for task in dones:
print(‘Task result: ‘,task.result())


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 2s
Task result: Done after 4s
Task result: Done after 1s
TIME: 4.007229328155518
‘‘‘


#自己栗子10 協程嵌套 [二] 如果使用的是 asyncio.gather創建協程對象,那麽await的返回值就是協程運行的結果

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

results = await asyncio.gather(*tasks)

for result in results:
print(‘Task result: ‘,result)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 3.9892282485961914
‘‘‘


#自己栗子11 協程嵌套 [三] 不在main協程函數裏處理結果,直接返回await的內容,那麽最外層的run_until_complete將會返回main協程的結果


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.gather(*tasks)




loop = asyncio.get_event_loop()
results = loop.run_until_complete(main())

for result in results:
print(‘Task result: ‘, result)

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 4.0052289962768555
‘‘‘

#自己栗子12 協程嵌套 [四 ] 不在main協程函數裏處理結果,直接返回await的內容,那麽最外層的run_until_complete將會返回main協程的結果,使用asyncio.wait方式掛起協程。

import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

return await asyncio.wait(tasks)




loop = asyncio.get_event_loop()
dones,pendings = loop.run_until_complete(main())

for task in dones:
print(‘Task result: ‘, task.result())

print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 2s
Task result: Done after 4s
Task result: Done after 1s
TIME: 3.9912283420562744
‘‘‘


#自己栗子13 協程嵌套 [五]使用asyncio的as_completed方法


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

for task in asyncio.as_completed(tasks):
result = await task
print(‘Task result: {}‘.format(result))




loop = asyncio.get_event_loop()
loop.run_until_complete(main())


print(‘TIME: ‘,now() - start)
‘‘‘
Waiting : 1
Waiting : 2
Waiting : 4
Task result: Done after 1s
Task result: Done after 2s
Task result: Done after 4s
TIME: 3.9912281036376953
‘‘‘


#自己栗子14 協程停止 【一】 main函數外進行事件循環的調用。這個時候,main相當於最外出的一個task,那麽處理包裝的main函數即可
‘‘‘
上面見識了協程的幾種常用的用法,都是協程圍繞著事件循環進行的操作。future對象有幾個狀態:

Pending
Running
Done
Cancelled
創建future的時候,task為pending,事件循環調用執行的時候當然就是running,調用完畢自然就是done,如果需要停止事件循環,就需要先把task取消。可以使用asyncio.Task獲取事件循環的task‘

啟動事件循環之後,馬上ctrl+c,會觸發run_until_complete的執行異常 KeyBorardInterrupt。然後通過循環asyncio.Task取消future。

‘‘‘


import asyncio
import time

now = lambda : time.time()

start = now()

async def do_some_work(x):
print(‘Waiting : ‘,x)
await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

async def main():
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

done,pending = await asyncio.wait(tasks)
for task in done:
print(‘Task result: ‘,task.result())


loop = asyncio.get_event_loop()
task = asyncio.ensure_future(main())
try:
loop.run_until_complete(task)
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
print(‘*******************‘)
print(asyncio.gather(*asyncio.Task.all_tasks()).cancel())
loop.stop()
loop.run_forever() #True表示cannel成功,loop stop之後還需要再次開啟事件循環,最後在close,不然還會拋出異常
finally:
loop.close()

print(‘TIME: ‘,now() - start)

‘‘‘
#不能再pycharm通過Ctrl+C,只能在Python交互環境裏
Waiting: 1
Waiting: 2
Waiting: 4
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
*******************
True
TIME: 2.0158370780944824
‘‘‘



#自己栗子15 協程停止 【二】 tasks在外層,沒有被包含在main函數裏面
import asyncio

import time

now = lambda: time.time()
start = now()
async def do_some_work(x):
print(‘Waiting: ‘, x)

await asyncio.sleep(x)
return ‘Done after {}s‘.format(x)

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

start = now()

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt as e:
print(asyncio.Task.all_tasks())
for task in asyncio.Task.all_tasks():
print(task.cancel())
loop.stop()
loop.run_forever()
finally:
loop.close()

print(‘TIME: ‘, now() - start)


‘‘‘
打印四個True,而不是三個,原因我也不知道
Waiting: 1
Waiting: 2
Waiting: 4
{<Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x101230648>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1032b10a8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>, <Task pending coro=<wait() running at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:307> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317d38>()]> cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>, <Task pending coro=<do_some_work() running at /Users/ghost/Rsj217/python3.6/async/async-main.py:18> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x103317be8>()]> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/tasks.py:374]>}
True
True
True
True
TIME: 0.8858370780944824
‘‘‘
"""



#自己栗子16 不同線程的時間循環
‘‘‘
很多時候,我們的事件循環用於註冊協程,而有的協程需要動態的添加到事件循環中。一個簡單的方式就是使用多線程。當前線程創建一個事件循環,然後在新建一個線程,在新線程中啟動事件循環。當前線程不會被block。
啟動上述代碼之後,當前線程不會被block,新線程中會按照順序執行call_soon_threadsafe方法註冊的more_work方法,後者因為time.sleep操作是同步阻塞的,因此運行完畢more_work需要大致6 + 3
‘‘‘

from threading import Thread
import asyncio

import time

now = lambda: time.time()
start = now()





















【Python】【五】【asyncio】