1. 程式人生 > >Python標準模塊--asyncio

Python標準模塊--asyncio

saas

1 模塊簡介

asyncio模塊作為一個臨時的庫,在Python 3.4版本中加入。這意味著,asyncio模塊可能做不到向後兼容甚至在後續的Python版本中被刪除。根據Python官方文檔,asyncio通過coroutines、sockets和其它資源上的多路復用IO訪問、運行網絡客戶端和服務端以及其它相關的原始服務等提供了一種單線程並發應用的架構。本文並不能覆蓋所有關於asyncio模塊的技術點,但是你可以學到如何去使用這個模塊,以及為什麽它是有用的。

如果你在一些較老的Python版本中需要一些類似於asyncio模塊的技術,你可以看Twisted或者gevent。

2 模塊使用

2.1 定義

asyncio模塊提供了一種關於事件循環的框架。事件循環就是等待一些任務發生,然後執行相應的事件。它也會處理例如IO操作或者系統事件。asyncio實際中有好幾種循環實現方式。模塊默認使用的方式是其所運行的操作系統上最有效的方式。如果你願意,你也可以顯式地選擇其它事件循環方式。一個事件循環就是當事件A發生時,函數B共同起作用。

設想這樣一個場景,服務器等待用戶訪問並請求一些資源,例如網頁。如果這個網站不是非常知名的網站,這個服務器將會在很長的時間內處於空閑狀態。但是,一旦某個時間用戶點擊了這個網站,服務器就需要作出響應。這個響應就是事件處理。當一個用戶下載網頁,服務器將會去檢查並調用一個或者多個事件句柄。一旦這些事件句柄完成相應的處理,它們需要將控制交回給事件循環。為了在Python中完成這個任務,asyncio使用協程。

協程是一個特殊的函數,可以將控制交回給它的調用函數,但是並不丟失它的狀態。協程是一個消費者函數,並且是生成器的擴展。協程相比線程最大的優勢就是執行協程時不需要占用太多內存。你需要註意的是,當你調用一個協程函數,它並沒有真正執行。相反,它將會返回一個協程對象,你可以將這個協程對象傳遞給事件循環,然後可以立即或者稍後執行它。

當你在使用asyncio模塊時,另一個你可能會執行的是future。future就是一個可以表示還沒有結束的任務結果的對象。你的事件循環可以觀察future對象並等待它們結束。當一個future結束時,它被設置為已完成。asyncio模塊也支持鎖和信號。

本文最後一部分,我將會提到Task。Task是協程的一個框架,是Future的一個子類。你可以在事件循環中對Task進行調度。

2.2 async和await

async和await是Python 3.5中新添加的關鍵詞,用來定義一個原生的協程,以便於和基於協程的生成器相區別。如果你想了解更多關於async和await的知識,你可以去閱讀PEP 492。

在Python 3.4中,你可以按照如下方式創建一個協程,

import [email protected] my_foo():
    yield from func()

這個裝飾器在Python 3.5中依然有效,但是模塊的類型有所更新,協程函數可以告訴你正在交互的是不是一個原生的協程。從Python 3.5開始,你可以使用async def這種語法來定義一個協程函數,所以上述函數可以按照如下方式定義,

import asyncioasync def my_coro():
    await func()

當你以這種方式定義一個協程函數,你不能在函數內部使用yield。取而代之,你必須使用return或者await語句,用於將返回值返回給調用者。你需要註意的是,關鍵字await只能在async def函數中使用。

關鍵字async和await可以認為是異步編程中的接口。asyncio模塊就是一個可以將async/await用於異步編程的框架。實際上,有一個叫做curio的項目證實了這個概念,那就是它單獨實現了在後臺使用async/await的事件循環。

2.3 協程示例

盡管上述的描述可以讓你獲得很多關於協程如何工作的背景知識,有時候,你僅僅想看到一些示例,這樣你就可以切身感受到它的語法形式,以及如何將這些代碼組合在一起。考慮到這一點,讓我們以一個簡單的示例開始把。

一個非常常見的任務就是你想完整的下載一個文件,這個文件可能來源於內部資源或者互聯網。當然你想要下載的文件可能不止一個。讓我們創建兩個協程來完成這個任務。

import asyncioimport osimport urllib.request async def download_coroutine(url):
    request = urllib.request.urlopen(url)
    filename = os.path.basename(url)    with open(filename,"wb") as file_handle:        while True:
            chunk = request.read(1024)            if not chunk:                break
            file_handle.write(chunk)
        msg = "Finished downloading {filename}".format(filename = filename)        return msgasync def main(urls):
    coroutines = [download_coroutine(url) for url in urls]
    completed,pending = awit asyncio.wait(coroutines)    for item in completed:
        print(item.result())if __name__ == "__main__":
    urls = ["http://www.irs.gov/pub/irs-pdf/f1040.pdf",            "http://www.irs.gov/pub/irs-pdf/f1040a.pdf",            "http://www.irs.gov/pub/irs-pdf/f1040ez.pdf",            "http://www.irs.gov/pub/irs-pdf/f1040es.pdf",            "http://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
    event_loop = asyncio.get_event_loop()    try:
        event_loop.run_until_complete(main(urls))    finally:
        event_loop.close()

這段代碼中,我們引入了我們需要的模塊,然後通過async語法創建了第一個協程。這個協程叫做download_coroutine,它使用Python的urllib模塊下載傳遞給它的任何URL地址。當它完成任務時,它將會返回一條相應的信息。

另一個協程就是我們的主協程。它基本上就是獲取一個包含一個或者多個URL地址的列表,然後將它們加入隊列。我們使用asyncio的wait函數用於等待協程的結束。當然,為了啟動這些協程,它們需要被加入到事件循環中。我們在代碼段中最後的地方做了這個處理,我們先獲取一個事件循環,然後調用它的run_until_complete的方法。你將會註意到,我們將主協程傳入事件循環中。這個會先運行主協程,主協程將第二個協程加入到隊列中,並讓它們運行。這就是有名的鏈協程。

2.4 調度調用

你也可以通過異步事件循環來調度調用常規函數。我們看的第一個方法是call_soon。方法call_soon基本上就是盡可能的調用你的回調或者事件句柄。它的工作機制類似於先進先出隊列,所以如果一些回調需要一段時間來處理任務,其它的回調就會相應的延遲,直到先前的回調結束。讓我們來看一個示例。

import asyncioimport functoolsdef event_handler(loop,stop = False):
    print("Event handler called")    if stop:
        print("Stopping the loop")
        loop.stop()if __name__ == "__main__":
    loop = asyncio.get_event_loop()    try:
        loop.call_soon(functools.partial(event_handler,loop))
        print("Starting event loop")
        loop.call_soon(functools.partial(event_handler,loop,stop = True))

        loop.run_forever()    finally:
        print("closing event loop")
        loop.close()

由於asyncio的函數不接受關鍵字,但是如果我們需要將關鍵字傳入事件句柄中,那麽我們就需要使用functools模塊了。無論何時被調用,我們定義的常規函數將會在標準輸出上打印一些文字信息。如果你偶然將這個函數的stop變量設置為True,它將會停止事件循環。

第一次我們調用它時,我們沒有停止事件循環。第二次我們調用它時,我們停止了事件循環。我們停止事件循環的原因是我們將它放入run_forever,這個將時間循環設置為無限循環。一旦循環停止,我們就可以將它關閉。如果你運行這段代碼,你得到的輸出如下所示,

Starting event loopEvent handler calledEvent handler called
Stopping the loopclosing event loop

還有一個相關的函數是call_soon_threadsafe,顧名思義,它與call_soon的工作機制相似,但是它是線程安全的。

如果你想延遲一段時間再調用,你可以使用call_later函數。在這個示例中,我們可以將call_soon函數按照如下方式修改,

loop.call_later(1,event_handler,loop)

這個將會延遲調用我們的事件句柄1秒鐘,然後才會去調用它,並將循環作為第一個參數傳入。

如果你想在未來一個指定的時間調度,你需要獲取循環的時間,而不是計算機的時間,你可以按照如下方式操作,

current_time = loop.time()

一旦你這樣做,你可以使用call_at函數,然後將你想調用事件句柄的時間傳遞給它。讓我們來看看我們想在5分鐘之後調用我們的事件句柄,下面就是你如何操作的,

loop.call_at(current_time + 300,event_handler,loop)

在這個示例中,我們使用我們獲取的當前時間,然後加上300秒鐘或者5分鐘。通過這個操作,我們延遲調用事件循環5分鐘。

2.5 任務

Task是Future的一個子類,也是協程的一個框架。Task可以讓你記錄到任務結束處理的時間。由於任務是Future類型,其它的協程可以等待一個任務,當任務處理完畢時你也可以獲取到它的結果。讓我們看一個簡單的示例。

import asyncioimport timeasync def my_task(seconds):
    print("This task is take {} seconds to cpmplete".format(seconds))
    time.sleep(seconds)    return "task finished"if __name__ == "__main__":
    my_event_loop = asyncio.get_event_loop()    try:
        print("task creation started")
        task_obj = my_event_loop.create_task(my_task(seconds = 2))
        my_event_loop.run_until_complete(task_obj)    finally:
        my_event_loop.close()

    print("The task‘s result was :{}".format(task_obj.result()))

在這裏,我們創建一個異步函數,它接受秒數,也是它將會運行的時間。這個模仿了一個長時間運行的任務。然後我們創建了我們的事件循環,並且通過事件循環對象的create_task函數創建了一個任務對象。函數create_task接受我們想要轉換為任務的函數。然後我們運行事件循環,直到任務完成。在最後,一旦任務結束,我們就獲得任務的結果。

通過任務的cancel方法,任務也可以很容易被取消。當你想結束一個任務,調用它就可以了。當一個任務在等待另一個操作時被取消,這個任務將會報出CancelError錯誤。

2.6 總結

到這裏,你應該已經了解如何利用asyncio庫進行工作了。asyncio庫是非常強大的,它允許你去做很多酷並且有意思的任務。你可以查看http://asyncio.org/,該網站包含了很多使用asyncio的項目,可以獲取到很多關於如何使用asyncio庫的靈感。當然,Python官方文檔也是一個很好的開始asyncio之旅的地方。


Python標準模塊--asyncio