1. 程式人生 > >Python併發程式設計系列之協程

Python併發程式設計系列之協程

1 引言

2 協程的意義

  2.1 什麼是協程

  2.2 協程的作用

  2.3 相關概念

3 定義協程

4 使用協程

  4.1 單個協程

  4.2多協程併發

  4.3 獲取返回值

  4.4 繫結回撥函式

  4.5 協程的巢狀使用

5 總結

 

1 引言

  協程是近幾年併發程式設計的一個熱門話題,與Python多程序、多執行緒相比,協程在很多方面優勢明顯。本文從協程的定義和意義出發,結合asyncio模組詳細講述了協程的使用。

2 協程的意義

2.1 什麼是協程

  協程,又稱微執行緒,英文名為Coroutine。對於多執行緒,在執行一個個不同任務時,遇到阻塞(例如IO操作)時,作業系統會自動將CPU資源切換給另一個執行緒。

2.2 協程的作用

         與執行緒不同的是,協程需要使用者自己進行手動切換——當某執行緒在執行任務中的函式A(協程A))時,可任意終端,手動切換到任務中的另一個函式B(協程B),然後在適當的時候在回到函式A(協程A)中繼續執行,這樣雖然繁瑣,但也提供了更大的操作自由度,同時協程A和協程B都屬於同一執行緒,切換效率相比於執行緒或程序間的切換有極大地優勢。另外,協程不需要多執行緒的鎖機制,因為都屬於同一個執行緒,也不存在同時寫變數衝突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多執行緒高很多。

         我們以爬蟲為例子,說明協程的應用。啟動一個爬蟲程式一定屬於一個程序,這是毋庸置疑的,但程序本身並不會執行任何操作,所有操作都是通過執行緒來完成,所以一個程序有一個主執行緒。一般爬蟲的步驟包括髮送request請求、寫入檔案等操作,而這些都是IO操作,當執行緒執行到這些操作時,要麼等待這一操作完成要麼切換到其他執行緒。如果使用了協程呢?如果協程遇到了此類IO操作,可以立即切換到其他操作,例如直接傳送下一個request請求,甚至傳送第二個、第三個請求……直至原來的協程中IO請求完成,那麼回到原來的協程繼續下一步操作。這就是協程的工作原理,充分利用執行緒的工作效率,也沒有多執行緒切換的開銷,所以在處理IO操作時協程非常高效。

       簡單總結一下協程的優缺點:

       優點:

  1)無需執行緒上下文切換的開銷(還是單執行緒);

  2)無需原子操作的鎖定和同步的開銷;

  3)方便切換控制流,簡化程式設計模型;

  4)高併發+高擴充套件+低成本:一個cpu支援上萬的協程都沒有問題,適合用於高併發處理。

  缺點:

  1)無法利用多核的資源,協程本身是個單執行緒,它不能同時將單個cpu的多核用上,協程需要和程序配合才能運用到多cpu上(協程是跑線上程上的);

  2)進行阻塞操作時會阻塞掉整個程式:如io;

       現在,各位讀者應該已經對協程的概念又說了解了,也感受到了協程的魅力了吧!那麼該怎麼使用協程了……

2.3 相關概念

  在Python中, asyncio、tornado和gevent等模組都實現了協程的功能。本篇中主要介紹asyncio。

  在介紹通過asyncio的使用協程之前,首先有必要先介紹一下asyncio中涉及的幾個概念,要想掌握asyncio這這幾個貫穿始終的概念必須好好理解:

  1)event_loop事件迴圈:程式開啟一個無限的迴圈,程式設計師會把一些函式(協程)註冊到事件迴圈上。當滿足事件發生的時候,呼叫相應的協程函式。

  2)coroutine 協程:協程物件,指一個使用async關鍵字定義的函式,它的呼叫不會立即執行函式,而是會返回一個協程物件。協程物件需要註冊到事件迴圈,由事件迴圈呼叫。

  3)future 物件: 代表將來執行或沒有執行的任務的結果。它和task上沒有本質的區別

  4)task 任務:一個協程物件就是一個原生可以掛起的函式,任務則是對協程進一步封裝,其中包含任務的各種狀態。Task 物件是 Future 的子類,它將coroutine和Future聯絡在一起,將 coroutine 封裝成一個 Future 物件。

  5)async/await 關鍵字:python3.5 用於定義協程的關鍵字,async定義一個協程,await用於掛起阻塞的非同步呼叫介面,await就類似於說下面過程阻塞,暫時執行別的協程。await關鍵字添加了一個新的協程到迴圈裡,而不需要明確地新增協程到這個事件迴圈裡。

3 定義協程

  定義協程比定義程序或者執行緒還要簡單,你只需要在普通函式定義時在“def”關鍵字前面加上一個“asyncio”,即可把普通函式定義為一個協程:

async def firstCorouctine(path=‘a.txt’):

    print(‘協程執行開始……’)

    await asyncio.sleep(1)

    print(‘協程執行結束……’)

  如果單單看firstCorouctine(),我想大家都看得出這個函式的功能。有 “async”這個關鍵字在前面修飾之後,這個函式就變成了一個協程——這就是定義協程的方法。所以說,協程某種意義上就是一個函式。我們可以通過asyncio.iscoroutinefunction函式來檢視某個函式到底是不是協程,如果是協程則返回True,否則返回False:

import asyncio

async def firstCorouctine():

print(‘協程執行開始……’)

await asyncio.sleep(1)

print(‘協程執行結束……’)

 

def fun(path):

    print(‘這是一個普通函式’)

 

print(‘firstCorouctine是協程嗎:{}’.format(asyncio.iscoroutinefunction(firstCorouctine)))

print(‘fun是協程嗎:{}’.format(asyncio.iscoroutinefunction(fun)))

  輸出結果:

  firstCorouctine是協程嗎:True

  fun是協程嗎:False

怎麼樣,簡單吧?不過我猜,你心裡還是一團懵,甚至在想這有什麼用呢?請繼續往下看。

4 使用協程

4.1 單個協程

         使用協程須得經過一下幾個步驟:定義協程->(封裝成task->)獲取事件迴圈->將task放到事件迴圈中執行。定義好的協程並不能直接使用,需要將其包裝成為了一個任務(task物件),然後放到事件迴圈中才能被執行。所謂task物件是Future類的一個子類,儲存了協程執行後的狀態,用於未來獲取協程的結果。在上面的步驟中,之所以在封裝task這一個步驟上加上括號,是因為我們也可以選擇直接將協程放到事件迴圈中,事件迴圈會自動幫我們完成這一操作。

所以,從定義好一個協程,到執行一個協程就有不同的方法:

  第一種,通過asyncio 再帶的ensure_future()函式建立task,然後執行

import asyncio

async def firstCorouctine(): # 定義協程

  print(‘協程執行開始……’)

  await asyncio.sleep(1)

  print(‘協程執行結束……’)


coroutine = firstCorouctine() # 將協程賦值給coroutine

task = asyncio.ensure_future(coroutine) # 封裝為task

loop = asyncio.get_event_loop() # 獲取事件迴圈

loop.run_until_complete(task) # 執行

  第二種,直接通過事件迴圈的create_task方法建立task,然後執行:

import asyncio

async def firstCorouctine(): # 定義協程

    print(‘協程執行開始……’)

    await asyncio.sleep(1)

    print(‘協程執行結束……’)

 

coroutine = firstCorouctine() # 將協程賦值給coroutine

loop = asyncio.get_event_loop() # 獲取事件迴圈

task = loop.create_task(coroutine) # 封裝為task

loop.run_until_complete(task) # 執行

  第三種:直接將協程放到事件迴圈中執行。這種方法並不是說不用將協程封裝為task,而是事件迴圈內部會自動幫我們完成這一步驟。

import asyncio

async def firstCorouctine(): # 定義協程

  print(‘協程執行開始……’)

  await asyncio.sleep(1)

  print(‘協程執行結束……’)


coroutine = firstCorouctine() # 將協程賦值給coroutine

loop = asyncio.get_event_loop() # 獲取事件迴圈

loop.run_until_complete(coroutine) # 執行

  當然,無論是上述哪一種方法,最終都需要通過run_until_complete方法去執行我們定義好的協程。run_until_complete 是一個阻塞(blocking)呼叫,直到協程執行結束,它才返回。這一點從函式名中就可以看得出來。

4.2 多協程併發

  協程往往是多個一起應用在事件迴圈裡的,將多個協程加入事件迴圈需要藉助 asyncio.gather 函式或者asyncio.wait函式,兩個函式功能極其相似,不同的是,gather接受的引數是多個協程,而wait接受的是一個協程列表。async.wait會返回兩個值:done和pending,done為已完成的task,pending為超時未完成的task。而async.gather只返回已完成task。

         使用waiter函式:

import asyncio

async def firstCorouctine(n): # 定義協程

    print('協程{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('協程{}結束執行……'.format(n))

task_list = [

    asyncio.ensure_future(firstCorouctine(1)),

    asyncio.ensure_future(firstCorouctine(2)),

    asyncio.ensure_future(firstCorouctine(3))

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(task_list)) # 使用wait函式

  輸出結果:

  協程1開始執行……

  協程2開始執行……

  協程3開始執行……

  協程1結束執行……

  協程2結束執行……

  協程3結束執行…… 

  使用gather函式:

import asyncio

async def firstCorouctine(n):  # 定義協程

    print('協程{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('協程{}結束執行……'.format(n))

 
task1 = asyncio.ensure_future(firstCorouctine(1))

task2 = asyncio.ensure_future(firstCorouctine(2))

task3 = asyncio.ensure_future(firstCorouctine(3))


tasks = asyncio.gather(task1, task2, task3)#如果協程有返回值時,最好賦值給一個tasks,方便回去返回結果

loop = asyncio.get_event_loop()

loop.run_until_complete(tasks)

  輸出結果:

  協程1開始執行……

  協程2開始執行……

  協程3開始執行……

  協程1結束執行……

  協程2結束執行……

  協程3結束執行……

4.3 獲取返回值

  前面的章節中說到,協程本質上來說也是一種函式,既然是函式就可以返回值。那麼,協程執行完後,怎麼獲取它的返回值呢?task是future例項化物件,它封裝有一個result()方法,通過task呼叫result()方法,可以獲取協程的返回值:

import asyncio

async def firstCorouctine(): # 定義協程

await asyncio.sleep(1)

    return ‘1234567890’

coroutine = firstCorouctine() # 將協程賦值給coroutine

task = asyncio.ensure_future(coroutine) # 封裝為task

loop = asyncio.get_event_loop() # 獲取事件迴圈

loop.run_until_complete(task) # 執行

 

return_value = task.result() # 獲取協程返回值

print(‘協程返回的值為:{}’.format(return_value))

  輸出結果:

  協程返回的值為:1234567890

  上面的例子是單個協程是獲取返回值,如果多個協程呢?使用多個協程併發時,將多個task列表傳入事件迴圈中執行,返回的task列表中的每一個task物件就包含了返回值:

import asyncio

async def firstCorouctine(n): # 定義協程

    print('協程{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('協程{}結束執行……'.format(n))

    return n

task_list = [

    asyncio.ensure_future(firstCorouctine(1)),

    asyncio.ensure_future(firstCorouctine(2)),

    asyncio.ensure_future(firstCorouctine(3))

]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(task_list))

for task in task_list:

    print(task.result())

  輸出結果:

  協程1開始執行……

  協程2開始執行……

  協程3開始執行……

  協程1結束執行……

  協程2結束執行……

  協程3結束執行……

  1

  2

  3

4.4 繫結回撥函式

  在實際應用中,協程執行結束之後並不意味這整個任務就完成了,還需要執行其他函式,且其他函式也會用到協程的返回值,這就涉及到回撥函式。協程中設定回撥函式時需要將future物件(也就是我們建立的task)傳入函式中,不過這個傳參是自動完成的,所以回撥函式必須至少設定一個形參:

import asyncio

async def firstCorouctine(): # 定義協程

    await asyncio.sleep(1)

   

def callBack(future): # 定義一個回撥函式

    print('我是回撥函式,協程返回值為:{}'.format(future.result()))

 

coroutine = firstCorouctine()

task = asyncio.ensure_future(coroutine)

task.add_done_callback(callBack) # 繫結回撥函式

loop = asyncio.get_event_loop()

loop.run_until_complete(task)

  輸出結果:

  我是回撥函式,協程返回值為:1234567890

  如果還需要傳入其他引數,就需要藉助偏函式(functools.partial)來輔助使用了,這時候切記future物件是放在最後的:

import asyncio

import functools

async def firstCorouctine(): # 定義協程

    await asyncio.sleep(1)

    return '1234567890'

 

def callBack(value , future): # 定義一個回撥函式

    print('我是回撥函式,你輸入的第一個引數為:{}'.format(value))

    print('我是回撥函式,協程返回值為:{}'.format(future.result()))

coroutine = firstCorouctine()

task = asyncio.ensure_future(coroutine)

task.add_done_callback(functools.partial(callBack , 123)) # 繫結回撥函式

loop = asyncio.get_event_loop()

loop.run_until_complete(task)

  輸出結果:

  我是回撥函式,你輸入的第一個引數為:123

  我是回撥函式,協程返回值為:1234567890

4.5 協程的巢狀使用

  事件迴圈執行協程時是通過run_until_complete方法,這個方法只接收一個協程或者future物件作為引數。在前面章節中,我們在介紹多協程併發操作時,用的是asyncio.wait函式和asyncio.gather函式,這兩個函式本身也是一個協程,當接收多個協程作為引數時,實際上是在wait(或gather)協程裡面執行了我們傳入的多個協程,然後把結果返回。這就證明,協程是可以巢狀的。我們也可以通過wait和gather來寫我們自己的巢狀協程。

         使用wait函式巢狀:

import asyncio

async def innerCorouctine(n): # 巢狀在裡層的協程

    print('innerCorouctine-{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('innerCorouctine-{}結束執行……'.format(n))

    return n

async def outerCorouctine():

    print('outerCorouctine開始執行……')

    coroutine1 = innerCorouctine(1)

    coroutine2 = innerCorouctine(2)

    coroutine3 = innerCorouctine(3)

    tasks = [

        asyncio.ensure_future(coroutine1),

        asyncio.ensure_future(coroutine2),

        asyncio.ensure_future(coroutine3)

    ]

     dones, pendings = await asyncio.wait(tasks)

    for task in dones:

        print('協程返回值:{} '.format(task.result()))

    print('outerCorouctine結束行……')

 
loop = asyncio.get_event_loop()

loop.run_until_complete(outerCorouctine())

  輸出結果:

  outerCorouctine開始執行……

  innerCorouctine-1開始執行……

  innerCorouctine-2開始執行……

  innerCorouctine-3開始執行……

  innerCorouctine-1結束執行……

  innerCorouctine-2結束執行……

  innerCorouctine-3結束執行……

  協程返回值:1

  協程返回值:3

  協程返回值:2

  outerCorouctine結束行……

  使用gather方法進行巢狀:

import asyncio

async def innerCorouctine(n): # 巢狀在裡層的協程

    print('innerCorouctine-{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('innerCorouctine-{}結束執行……'.format(n))

    return n

async def outerCorouctine():

    print('outerCorouctine開始執行……')

    coroutine1 = innerCorouctine(1)

    coroutine2 = innerCorouctine(2)

    coroutine3 = innerCorouctine(3)


    tasks = [

        asyncio.ensure_future(coroutine1),

        asyncio.ensure_future(coroutine2),

        asyncio.ensure_future(coroutine3)

    ]

    tasks = await asyncio.gather(*tasks)

    for task in tasks:

        print('協程返回值:{} '.format(task))

    print('outerCorouctine結束行……')


loop = asyncio.get_event_loop()

loop.run_until_complete(outerCorouctine())

  輸出結果:

  outerCorouctine開始執行……

  innerCorouctine-1開始執行……

  innerCorouctine-2開始執行……

  innerCorouctine-3開始執行……

  innerCorouctine-1結束執行……

  innerCorouctine-2結束執行……

  innerCorouctine-3結束執行……

  協程返回值:1

  協程返回值:2

  協程返回值:3

  outerCorouctine結束行……

  當然,也還有第三種方法進行巢狀,那就是使用run_until_complete函式:

import asyncio

async def innerCorouctine(n): # 巢狀在裡層的協程

    print('innerCorouctine-{}開始執行……'.format(n))

    await asyncio.sleep(1)

    print('innerCorouctine-{}結束執行……'.format(n))

    return n

async def outerCorouctine():

    print('outerCorouctine開始執行……')

    coroutine1 = innerCorouctine(1)

    coroutine2 = innerCorouctine(2)

    coroutine3 = innerCorouctine(3)

    tasks = [

        asyncio.ensure_future(coroutine1),

        asyncio.ensure_future(coroutine2),

        asyncio.ensure_future(coroutine3)

    ]

    for task in asyncio.as_completed(tasks):#使用as_completed函式

        result = await task

        print('協程返回值: {}'.format(result))

loop = asyncio.get_event_loop()

loop.run_until_complete(outerCorouctine())

  輸出結果:

  outerCorouctine開始執行……

  innerCorouctine-1開始執行……

  innerCorouctine-2開始執行……

  innerCorouctine-3開始執行……

  innerCorouctine-1結束執行……

  innerCorouctine-2結束執行……

  innerCorouctine-3結束執行……

  協程返回值: 1

  協程返回值: 2

  協程返回值: 3

5 總結

  本文介紹了協程概念、意義和單執行緒下協程基本使用方法,但對於多執行緒下如何使用協程並未涉及,後續再進行補充。

  參考資料:

  http://python.jobbole.com/87541/

  http://python.jobbole.com/87310/