Python併發程式設計系列之協程
1 引言
2 協程的意義
2.1 什麼是協程
2.2 協程的作用
2.3 相關概念
3 定義協程
4 使用協程
4.1 單個協程
4.2多協程併發
4.3 獲取返回值
4.4 繫結回撥函式
4.5 協程的巢狀使用
5 總結
1 引言
協程是近幾年併發程式設計的一個熱門話題,與Python多程序、多執行緒相比,協程在很多方面優勢明顯。本文從協程的定義和意義出發,結合asyncio模組詳細講述了協程的使用。
2 協程的意義
2.1 什麼是協程
協程,又稱微執行緒,英文名為Coroutine。對於多執行緒,在執行一個個不同任務時,遇到阻塞(例如IO操作)時,作業系統會自動將CPU資源切換給另一個執行緒。
2.2 協程的作用
與執行緒不同的是,協程需要使用者自己進行手動切換——當某執行緒在執行任務中的函式A(協程A))時,可任意終端,手動切換到任務中的另一個函式B(協程B),然後在適當的時候在回到函式A(協程A)中繼續執行,這樣雖然繁瑣,但也提供了更大的操作自由度,同時協程A和協程B都屬於同一執行緒,切換效率相比於執行緒或程序間的切換有極大地優勢。另外,協程不需要多執行緒的鎖機制,因為都屬於同一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。
我們以爬蟲為例子,說明協程的應用。啟動一個爬蟲程式一定屬於一個程序,這是毋庸置疑的,但程序本身並不會執行任何操作,所有操作都是通過執行緒來完成,所以一個程序有一個主執行緒。一般爬蟲的步驟包括髮送request請求、寫入檔案等操作,而這些都是IO操作,當執行緒執行到這些操作時,要麼等待這一操作完成要麼切換到其他執行緒。如果使用了協程呢?如果協程遇到了此類IO操作,可以立即切換到其他操作,例如直接傳送下一個request請求,甚至傳送第二個、第三個請求……直至原來的協程中IO請求完成,那麼回到原來的協程繼續下一步操作。這就是協程的工作原理,充分利用執行緒的工作效率,也沒有多執行緒切換的開銷,所以在處理IO操作時協程非常高效。
簡單總結一下協程的優缺點:
優點:
1)無需執行緒上下文切換的開銷(還是單執行緒);
2)無需原子操作的鎖定和同步的開銷;
3)方便切換控制流,簡化程式設計模型;
4)高併發+高擴充套件+低成本:一個cpu支援上萬的協程都沒有問題,適合用於高併發處理。
缺點:
1)無法利用多核的資源,協程本身是個單執行緒,它不能同時將單個cpu的多核用上,協程需要和程序配合才能運用到多cpu上(協程是跑線上程上的);
2)進行阻塞操作時會阻塞掉整個程式:如io;
現在,各位讀者應該已經對協程的概念又說了解了,也感受到了協程的魅力了吧!那麼該怎麼使用協程了……
2.3 相關概念
在Python中, asyncio、tornado和gevent等模組都實現了協程的功能。本篇中主要介紹asyncio。
在介紹通過asyncio的使用協程之前,首先有必要先介紹一下asyncio中涉及的幾個概念,要想掌握asyncio這這幾個貫穿始終的概念必須好好理解:
1)event_loop事件迴圈:程式開啟一個無限的迴圈,程式設計師會把一些函式(協程)註冊到事件迴圈上。當滿足事件發生的時候,呼叫相應的協程函式。
2)coroutine 協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。
3)future 物件: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別
4)task 任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含任務的各種狀態。Task 物件是 Future 的子類,它將coroutine和Future聯絡在一起,將 coroutine 封裝成一個 Future 物件。
5)async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面,await就類似於說下面過程阻塞,暫時執行別的協程。await關鍵字添加了一個新的協程到迴圈裡,而不需要明確地新增協程到這個事件迴圈裡。
3 定義協程
定義協程比定義程序或者執行緒還要簡單,你只需要在普通函式定義時在“def”關鍵字前面加上一個“asyncio”,即可把普通函式定義為一個協程:
async def firstCorouctine(path=‘a.txt’): print(‘協程執行開始……’) await asyncio.sleep(1) print(‘協程執行結束……’)
如果單單看firstCorouctine(),我想大家都看得出這個函式的功能。有 “async”這個關鍵字在前面修飾之後,這個函式就變成了一個協程——這就是定義協程的方法。所以說,協程某種意義上就是一個函式。我們可以通過asyncio.iscoroutinefunction函式來檢視某個函式到底是不是協程,如果是協程則返回True,否則返回False:
import asyncio async def firstCorouctine(): print(‘協程執行開始……’) await asyncio.sleep(1) print(‘協程執行結束……’) def fun(path): print(‘這是一個普通函式’) print(‘firstCorouctine是協程嗎:{}’.format(asyncio.iscoroutinefunction(firstCorouctine))) print(‘fun是協程嗎:{}’.format(asyncio.iscoroutinefunction(fun)))
輸出結果:
firstCorouctine是協程嗎:True
fun是協程嗎:False
怎麼樣,簡單吧?不過我猜,你心裡還是一團懵,甚至在想這有什麼用呢?請繼續往下看。
4 使用協程
4.1 單個協程
使用協程須得經過一下幾個步驟:定義協程->(封裝成task->)獲取事件迴圈->將task放到事件迴圈中執行。定義好的協程並不能直接使用,需要將其包裝成為了一個任務(task物件),然後放到事件迴圈中才能被執行。所謂task物件是Future類的一個子類,儲存了協程執行後的狀態,用於未來獲取協程的結果。在上面的步驟中,之所以在封裝task這一個步驟上加上括號,是因為我們也可以選擇直接將協程放到事件迴圈中,事件迴圈會自動幫我們完成這一操作。
所以,從定義好一個協程,到執行一個協程就有不同的方法:
第一種,通過asyncio 再帶的ensure_future()函式建立task,然後執行
import asyncio async def firstCorouctine(): # 定義協程 print(‘協程執行開始……’) await asyncio.sleep(1) print(‘協程執行結束……’) coroutine = firstCorouctine() # 將協程賦值給coroutine task = asyncio.ensure_future(coroutine) # 封裝為task loop = asyncio.get_event_loop() # 獲取事件迴圈 loop.run_until_complete(task) # 執行
第二種,直接通過事件迴圈的create_task方法建立task,然後執行:
import asyncio async def firstCorouctine(): # 定義協程 print(‘協程執行開始……’) await asyncio.sleep(1) print(‘協程執行結束……’) coroutine = firstCorouctine() # 將協程賦值給coroutine loop = asyncio.get_event_loop() # 獲取事件迴圈 task = loop.create_task(coroutine) # 封裝為task loop.run_until_complete(task) # 執行
第三種:直接將協程放到事件迴圈中執行。這種方法並不是說不用將協程封裝為task,而是事件迴圈內部會自動幫我們完成這一步驟。
import asyncio async def firstCorouctine(): # 定義協程 print(‘協程執行開始……’) await asyncio.sleep(1) print(‘協程執行結束……’) coroutine = firstCorouctine() # 將協程賦值給coroutine loop = asyncio.get_event_loop() # 獲取事件迴圈 loop.run_until_complete(coroutine) # 執行
當然,無論是上述哪一種方法,最終都需要通過run_until_complete方法去執行我們定義好的協程。run_until_complete 是一個阻塞(blocking)呼叫,直到協程執行結束,它才返回。這一點從函式名中就可以看得出來。
4.2 多協程併發
協程往往是多個一起應用在事件迴圈裡的,將多個協程加入事件迴圈需要藉助 asyncio.gather 函式或者asyncio.wait函式,兩個函式功能極其相似,不同的是,gather接受的引數是多個協程,而wait接受的是一個協程列表。async.wait會返回兩個值:done和pending,done為已完成的task,pending為超時未完成的task。而async.gather只返回已完成task。
使用waiter函式:
import asyncio async def firstCorouctine(n): # 定義協程 print('協程{}開始執行……'.format(n)) await asyncio.sleep(1) print('協程{}結束執行……'.format(n)) task_list = [ asyncio.ensure_future(firstCorouctine(1)), asyncio.ensure_future(firstCorouctine(2)), asyncio.ensure_future(firstCorouctine(3)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) # 使用wait函式
輸出結果:
協程1開始執行……
協程2開始執行……
協程3開始執行……
協程1結束執行……
協程2結束執行……
協程3結束執行……
使用gather函式:
import asyncio async def firstCorouctine(n): # 定義協程 print('協程{}開始執行……'.format(n)) await asyncio.sleep(1) print('協程{}結束執行……'.format(n)) task1 = asyncio.ensure_future(firstCorouctine(1)) task2 = asyncio.ensure_future(firstCorouctine(2)) task3 = asyncio.ensure_future(firstCorouctine(3)) tasks = asyncio.gather(task1, task2, task3)#如果協程有返回值時,最好賦值給一個tasks,方便回去返回結果 loop = asyncio.get_event_loop() loop.run_until_complete(tasks)
輸出結果:
協程1開始執行……
協程2開始執行……
協程3開始執行……
協程1結束執行……
協程2結束執行……
協程3結束執行……
4.3 獲取返回值
前面的章節中說到,協程本質上來說也是一種函式,既然是函式就可以返回值。那麼,協程執行完後,怎麼獲取它的返回值呢?task是future例項化物件,它封裝有一個result()方法,通過task呼叫result()方法,可以獲取協程的返回值:
import asyncio async def firstCorouctine(): # 定義協程 await asyncio.sleep(1) return ‘1234567890’ coroutine = firstCorouctine() # 將協程賦值給coroutine task = asyncio.ensure_future(coroutine) # 封裝為task loop = asyncio.get_event_loop() # 獲取事件迴圈 loop.run_until_complete(task) # 執行 return_value = task.result() # 獲取協程返回值 print(‘協程返回的值為:{}’.format(return_value))
輸出結果:
協程返回的值為:1234567890
上面的例子是單個協程是獲取返回值,如果多個協程呢?使用多個協程併發時,將多個task列表傳入事件迴圈中執行,返回的task列表中的每一個task物件就包含了返回值:
import asyncio async def firstCorouctine(n): # 定義協程 print('協程{}開始執行……'.format(n)) await asyncio.sleep(1) print('協程{}結束執行……'.format(n)) return n task_list = [ asyncio.ensure_future(firstCorouctine(1)), asyncio.ensure_future(firstCorouctine(2)), asyncio.ensure_future(firstCorouctine(3)) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(task_list)) for task in task_list: print(task.result())
輸出結果:
協程1開始執行……
協程2開始執行……
協程3開始執行……
協程1結束執行……
協程2結束執行……
協程3結束執行……
1
2
3
4.4 繫結回撥函式
在實際應用中,協程執行結束之後並不意味這整個任務就完成了,還需要執行其他函式,且其他函式也會用到協程的返回值,這就涉及到回撥函式。協程中設定回撥函式時需要將future物件(也就是我們建立的task)傳入函式中,不過這個傳參是自動完成的,所以回撥函式必須至少設定一個形參:
import asyncio async def firstCorouctine(): # 定義協程 await asyncio.sleep(1) def callBack(future): # 定義一個回撥函式 print('我是回撥函式,協程返回值為:{}'.format(future.result())) coroutine = firstCorouctine() task = asyncio.ensure_future(coroutine) task.add_done_callback(callBack) # 繫結回撥函式 loop = asyncio.get_event_loop() loop.run_until_complete(task)
輸出結果:
我是回撥函式,協程返回值為:1234567890
如果還需要傳入其他引數,就需要藉助偏函式(functools.partial)來輔助使用了,這時候切記future物件是放在最後的:
import asyncio import functools async def firstCorouctine(): # 定義協程 await asyncio.sleep(1) return '1234567890' def callBack(value , future): # 定義一個回撥函式 print('我是回撥函式,你輸入的第一個引數為:{}'.format(value)) print('我是回撥函式,協程返回值為:{}'.format(future.result())) coroutine = firstCorouctine() task = asyncio.ensure_future(coroutine) task.add_done_callback(functools.partial(callBack , 123)) # 繫結回撥函式 loop = asyncio.get_event_loop() loop.run_until_complete(task)
輸出結果:
我是回撥函式,你輸入的第一個引數為:123
我是回撥函式,協程返回值為:1234567890
4.5 協程的巢狀使用
事件迴圈執行協程時是通過run_until_complete方法,這個方法只接收一個協程或者future物件作為引數。在前面章節中,我們在介紹多協程併發操作時,用的是asyncio.wait函式和asyncio.gather函式,這兩個函式本身也是一個協程,當接收多個協程作為引數時,實際上是在wait(或gather)協程裡面執行了我們傳入的多個協程,然後把結果返回。這就證明,協程是可以巢狀的。我們也可以通過wait和gather來寫我們自己的巢狀協程。
使用wait函式巢狀:
import asyncio async def innerCorouctine(n): # 巢狀在裡層的協程 print('innerCorouctine-{}開始執行……'.format(n)) await asyncio.sleep(1) print('innerCorouctine-{}結束執行……'.format(n)) return n async def outerCorouctine(): print('outerCorouctine開始執行……') coroutine1 = innerCorouctine(1) coroutine2 = innerCorouctine(2) coroutine3 = innerCorouctine(3) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print('協程返回值:{} '.format(task.result())) print('outerCorouctine結束行……') loop = asyncio.get_event_loop() loop.run_until_complete(outerCorouctine())
輸出結果:
outerCorouctine開始執行……
innerCorouctine-1開始執行……
innerCorouctine-2開始執行……
innerCorouctine-3開始執行……
innerCorouctine-1結束執行……
innerCorouctine-2結束執行……
innerCorouctine-3結束執行……
協程返回值:1
協程返回值:3
協程返回值:2
outerCorouctine結束行……
使用gather方法進行巢狀:
import asyncio async def innerCorouctine(n): # 巢狀在裡層的協程 print('innerCorouctine-{}開始執行……'.format(n)) await asyncio.sleep(1) print('innerCorouctine-{}結束執行……'.format(n)) return n async def outerCorouctine(): print('outerCorouctine開始執行……') coroutine1 = innerCorouctine(1) coroutine2 = innerCorouctine(2) coroutine3 = innerCorouctine(3) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] tasks = await asyncio.gather(*tasks) for task in tasks: print('協程返回值:{} '.format(task)) print('outerCorouctine結束行……') loop = asyncio.get_event_loop() loop.run_until_complete(outerCorouctine())
輸出結果:
outerCorouctine開始執行……
innerCorouctine-1開始執行……
innerCorouctine-2開始執行……
innerCorouctine-3開始執行……
innerCorouctine-1結束執行……
innerCorouctine-2結束執行……
innerCorouctine-3結束執行……
協程返回值:1
協程返回值:2
協程返回值:3
outerCorouctine結束行……
當然,也還有第三種方法進行巢狀,那就是使用run_until_complete函式:
import asyncio async def innerCorouctine(n): # 巢狀在裡層的協程 print('innerCorouctine-{}開始執行……'.format(n)) await asyncio.sleep(1) print('innerCorouctine-{}結束執行……'.format(n)) return n async def outerCorouctine(): print('outerCorouctine開始執行……') coroutine1 = innerCorouctine(1) coroutine2 = innerCorouctine(2) coroutine3 = innerCorouctine(3) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks):#使用as_completed函式 result = await task print('協程返回值: {}'.format(result)) loop = asyncio.get_event_loop() loop.run_until_complete(outerCorouctine())
輸出結果:
outerCorouctine開始執行……
innerCorouctine-1開始執行……
innerCorouctine-2開始執行……
innerCorouctine-3開始執行……
innerCorouctine-1結束執行……
innerCorouctine-2結束執行……
innerCorouctine-3結束執行……
協程返回值: 1
協程返回值: 2
協程返回值: 3
5 總結
本文介紹了協程概念、意義和單執行緒下協程基本使用方法,但對於多執行緒下如何使用協程並未涉及,後續再進行補充。
參考資料:
http://python.jobbole.com/87541/
http://python.jobbole.com/87310/