1. 程式人生 > >Python併發程式設計之協程/非同步IO

Python併發程式設計之協程/非同步IO

協程與非同步IO

引言

隨著node.js的盛行,相信大家今年多多少少都聽到了非同步程式設計這個概念。Python社群雖然對於非同步程式設計的支援相比其他語言稍顯遲緩,但是也在Python3.4中加入了asyncio,在Python3.5上又提供了async/await語法層面的支援,剛正式釋出的Python3.6中asynico也已經由臨時版改為了穩定版。下面我們就基於Python3.4+來了解一下非同步程式設計的概念以及asyncio的用法。

什麼是協程

通常在Python中我們進行併發程式設計一般都是使用多執行緒或者多程序來實現的,對於計算型任務由於GIL的存在我們通常使用多程序來實現,而對與IO型任務我們可以通過執行緒排程來讓執行緒在執行IO任務時讓出GIL,從而實現表面上的併發。

其實對於IO型任務我們還有一種選擇就是協程,協程是執行在單執行緒當中的“併發”,協程相比多執行緒一大優勢就是省去了多執行緒之間的切換開銷,獲得了更大的執行效率。Python中的asyncio也是基於協程來進行實現的。在進入asyncio之前我們先來了解一下Python中怎麼通過生成器進行協程來實現併發。

example1

我們先來看一個簡單的例子來了解一下什麼是協程(coroutine),對生成器不瞭解的朋友建議先看一下Stackoverflow上面的這篇高票回答

Python
123456789 >>>defcoroutine():...reply=yield'hello'...yieldreply...>>>c=coroutine()>>>next(c)'hello'>>>c.send('world')'world'

example2

下面這個程式我們要實現的功能就是模擬多個學生同時向一個老師提交作業,按照傳統的話我們或許要採用多執行緒/多程序,但是這裡我們可以採用生成器來實現協程用來模擬併發。

如果下面這個程式讀起來有點困難,可以直接跳到後面部分,並不影響閱讀,等你理解協程的本質,回過頭來看就很簡單了。

Python
123456789101112131415161718 fromcollectionsimportdequedefstudent(name,homeworks):forhomework inhomeworks.items():yield(name,homework[0],homework[1])# 學生"生成"作業給老師classTeacher(object):def__init__(self,students):self.students=deque(students)defhandle(self):"""老師處理學生作業"""whilelen(self.students):student=self.students.pop()try:homework=next(student)print('handling',homework[0],homework[1],homework[2])exceptStopIteration:passelse:self.students.appendleft(student)

下面我們來呼叫一下這個程式。

Python
12345 Teacher([student('Student1',{'math':'1+1=2','cs':'operating system'}),student('Student2',{'math':'2+2=4','cs':'computer graphics'}),student('Student3',{'math':'3+3=5','cs':'compiler construction'})]).handle()

這是輸出結果,我們僅僅只用了一個簡單的生成器就實現了併發(concurrence),注意不是並行(parallel),因為我們的程式僅僅是執行在一個單執行緒當中。

Python
123456 handling Student3 cs compilerconstructionhandling Student2 cs computer graphicshandling Student1 cs operating systemhandling Student3 math3+3=5handling Student2 math2+2=4handling Student1 math1+1=2

##使用asyncio模組實現協程

從Python3.4開始asyncio模組加入到了標準庫,通過asyncio我們可以輕鬆實現協程來完成非同步IO操作。

解釋一下下面這段程式碼,我們創造了一個協程display_date(num, loop),然後它使用關鍵字yield from來等待協程asyncio.sleep(2)的返回結果。而在這等待的2s之間它會讓出CPU的執行權,直到asyncio.sleep(2)返回結果。

Python
123456789101112131415 # coroutine.pyimportasyncioimportdatetime@asyncio.coroutine# 宣告一個協程defdisplay_date(num,loop):end_time=loop.time()+10.0whileTrue:print("Loop: {} Time: {}".format(num,datetime.datetime.now()))if(loop.time()+1.0)>=end_time:breakyieldfromasyncio.sleep(2)# 阻塞直到協程sleep(2)返回結果loop=asyncio.get_event_loop()# 獲取一個event_looptasks=[display_date(1,loop),display_date(2,loop)]loop.run_until_complete(asyncio.gather(*tasks))# "阻塞"直到所有的tasks完成loop.close()

下面是執行結果,注意到併發的效果沒有,程式從開始到結束只用大約10s,而在這裡我們並沒有使用任何的多執行緒/多程序程式碼。在實際專案中你可以將asyncio.sleep(secends)替換成相應的IO任務,比如資料庫/磁碟檔案讀寫等操作。

Python
12345678910111213 ziwenxie::~»python coroutine.pyLoop:1Time:2016-12-1916:06:46.515329Loop:2Time:2016-12-1916:06:46.515446Loop:1Time:2016-12-1916:06:48.517613Loop:2Time:2016-12-1916:06:48.517724Loop:1Time:2016-12-1916:06:50.520005Loop:2Time:2016-12-1916:06:50.520169Loop:1Time:2016-12-1916:06:52.522452Loop:2Time:2016-12-1916:06:52.522567Loop:1Time:2016-12-1916:06:54.524889Loop:2Time:2016-12-1916:06:54.525031Loop:1Time:2016-12-1916:06:56.527713Loop:2Time:2016-12-1916:06:56.528102

在Python3.5中為我們提供更直接的對協程的支援,引入了async/await關鍵字,上面的程式碼我們可以這樣改寫,使用async代替了@asyncio.coroutine,使用了await代替了yield from,這樣我們的程式碼變得更加簡潔可讀。

Python
12345678910111213 importasyncioimportdatetimeasync defdisplay_date(num,loop):# 宣告一個協程end_time=loop.time()+10.0whileTrue:print("Loop: {} Time: {}".format(num,datetime.datetime.now()))if(loop.time()+1.0)>=end_time:breakawait asyncio.sleep(2)# 等同於yield fromloop=asyncio.get_event_loop()# 獲取一個event_looptasks=[display_date(1,loop),display_date(2,loop)]loop.run_until_complete(asyncio.gather(*tasks))# "阻塞"直到所有的tasks完成loop.close()

asyncio模組詳解

開啟事件迴圈有兩種方法,一種方法就是通過呼叫run_until_complete,另外一種就是呼叫run_forever。run_until_complete內建add_done_callback,使用run_forever的好處是可以通過自己自定義add_done_callback,具體差異請看下面兩個例子。

run_until_complete()

Python
1234567891011 importasyncioasync defslow_operation(future):await asyncio.sleep(1)future.set_result('Future is done!')loop=asyncio.get_event_loop()future=asyncio.Future()asyncio.ensure_future(slow_operation(future))print(loop.is_running())# Falseloop.run_until_complete(future)print(future.result())loop.close()

run_forever()

run_forever相比run_until_complete的優勢是添加了一個add_done_callback,可以讓我們在task(future)完成的時候呼叫相應的方法進行後續處理。

Python
123456789101112131415 importasyncioasync defslow_operation(future):await asyncio.sleep(1)future.set_result('Future is done!')defgot_result(future):print(future.result())loop.stop()loop=asyncio.get_event_loop()future=asyncio.Future()asyncio.ensure_future(slow_operation(future))future.add_done_c