1. 程式人生 > >深入Asyncio(十一)優雅地開始與結束

深入Asyncio(十一)優雅地開始與結束

關於 adl == blocking 無法 捕獲 連接建立 server got

Startup and Shutdown Graceful

大部分基於asyncio的程序都是需要長期運行、基於網絡的應用,處理這種應用的正確開啟與關閉存在驚人的復雜性。

開啟相對來說更簡單點,常規做法是創建一個task,然後調用loop.run_forever(),就如第三章QuickStart中的例子一樣。

一個例外是當啟動監聽服務器時需要經過兩個階段:

  1. 為服務器的啟動創建一個coroutine,然後調用run_until_complete()來初始化並啟動服務器本身;
  2. 通過調用loop.run_forever()來調用main函數。

通常啟動是很簡單的,碰到上述例外情況,查看官方示例。

關閉就要復雜得多,之前講過run_forever()調用會阻塞主線程,當執行關閉時,會解除阻塞並執行後續代碼,此時就需要:

  1. 收集所有尚未完成的task對象;
  2. 將他們聚集到一個group任務中;
  3. 取消group任務(需要捕捉CancelledError);
  4. 通過run_until_complete()來等待執行完畢。

在這之後關閉才算完成,初學者在寫異步代碼時總是極力擺脫的一些錯誤信息比如task還未等待就被關閉了,主要原因就是遺失了上述步驟中的一個或多個,用個例子來說明。

import asyncio

async def f(delay):
    await asyncio.sleep(delay)

loop = asyncio.get_event_loop()
t1 = loop.create_task(f(1))    # 任務1執行1秒
t2 = loop.create_task(f(2))    # 任務2執行2秒
loop.run_until_complete(t1)    # 只有任務1被執行完成
loop.close()
λ python3 taskwaring.py
Task was destroyed but it is pending!
task: <Task pending coro=<f() running at taskwaring.py:4> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x0312D6D0>()]>>

這個錯誤是說有些任務在loop關閉時還沒完成,這也就是為什麽規範的關閉過程要將所有的task收集到一個task中,取消它們然後在loop關閉之前等待取消完成。

再多看些比QuickStart代碼更細節的例子,這次用官方文檔中的echo服務器代碼作為服務器,通過客戶端代碼來深入學習。

from asyncio import (
    get_event_loop,
    start_server,
    CancelledError,
    StreamReader,
    StreamWriter,
    Task,
    gather
    )

async def echo(reader: StreamReader, writer: StreamWriter):    # 1
    print(‘New connection.‘)
    try:
        while True:    # 2
            data: bytes = await reader.readlines()  # 3
            if data in [b‘‘, b‘quit‘]:
                break
            writer.write(data.upper())  # 4
            await writer.drain()
        print(‘Leaving Connection.‘)
    except CancelledError:  # 5
        writer.write_eof()
        print(‘Cancelled‘)
    finally:
        writer.close()

loop = get_event_loop()
coro = start_server(echo, ‘127.0.0.1‘, 8888, loop=loop)    # 6
server = loop.run_until_complete(coro)  # 7

try:
    loop.run_forever()  # 8
except KeyboardInterrupt:
    print(‘Shutting Down!‘)

server.close()  # 9
loop.run_until_complete(server.wait_closed())   # 10

tasks = Task.all_tasks()    # 11
group = gather(*tasks, return_exceptions=True)  # 12
group.cancel()
loop.run_until_complete(group)  # 13
loop.close()
  1. 這個協程用於為每個建立的連接創建一個協程,使用了Stream的API;

  2. 為了保持連接,用死循環獲取消息;

  3. 從服務器獲取信息;

  4. 將消息的字符全部大寫返回;

  5. 此處處理退出,進行環境退出的清理工作;

  6. 這裏是程序開始的地方,服務器需要單獨循行,start_server方法返回一個corountine,必須在run_until_complete中執行;

  7. 運行coroutine來啟動TCP服務器;

  8. 現在才開始程序的監聽部分,為連接到服務器的每個TCP生成一個coroutine來執行echo例程函數,唯一能打斷loop的只能是KeyboardInterrupt異常;

  9. 程序運行到這裏的話,關閉操作已經開始,從現在開始要讓服務器停止接受新的連接,第一步是調用server.close();

  10. 第二步是調用server.wait_closed()來關閉那些仍在等待連接建立的socket,仍處於活躍狀態的連接不會受影響;

  11. 開始關閉task,先收集當前所有等待狀態的task;

  12. 將task聚集到一個group中,然後調用cancel方法,此處的return_exceptions參數後面講;

  13. 運行group這個協程。


要註意的一點是,如果在一個coroutine內部捕捉了一個CancelledError,要註意在異常捕捉代碼中不要創建任何coroutine,all_tasks()無法感知在run_until_complete()運行階段創建的任何新任務。

return_exceptions=True參數是幹什麽的?

gather()方法有個默認參數是return_exceptions=False,通過默認設置來關閉異常處理是有問題的,很難直接解釋清楚,可以通過一系列事實來說明:
1. run_until_complete()方法執行Future對象,在關閉期間,執行由gather()方法返回的Future對象;
2. 如果這個Future對象拋出了一個異常,那麽這個異常會繼續向上拋出,導致loop停止;
3. 如果run_until_complete()被用來執行一個group Future對象,任何group內子任務未處理而拋出的異常都會被向上拋出,也包含CancelledError;
4. 如果一部分子任務處理了CancelledError異常,另一部分未處理,則未處理的那部分的異常也會導致loop停止,這意味著loop在所有tasks完成前就停止了;
5. 在關閉loop時,不希望上述特性被觸發,只是想要所有在group中的task盡快執行結束,也不理會某些task是否拋出異常;
6. 使用gather(*, return_exceptions=True)可以讓group將子任務中的異常當作返回值處理,因此不會影響run_until_complete()的執行。

關於捕獲異常不合人意的一點就是某些異常在group內被處理了而沒有被拋出,這對通過結果查找異常、寫logging造成了困難。

import asyncio

async def f(delay):
    await asyncio.sleep(1/delay)    # 傳入值是0就很惡心了
    return delay

loop = asyncio.get_event_loop()
for i in range(10):
    loop.create_task(f(i))
pending = asyncio.Task.all_tasks()
group = asyncio.gather(*pending, return_exceptions=True)
results = loop.run_until_complete(group)
print(f‘Results: {results}‘)
loop.close()

不設置參數的話就會導致異常被向上拋出,然後loop停止並導致其他task無法完成。安全退出是網絡編程最難的問題之一,這對asyncio也是一樣的。

Signals

在上一個例子中演示了如何通過KeyboardInterrupt來退出loop,這個異常有效地結束了run_forever()的阻塞,並允許後續代碼得以執行。

KeyboardInterrupt異常等同於SIGINT信號,在網絡服務中最常用的停止信號其實是SIGTERM,並且也是在UNIX shell環境中使用kill指令發出的默認信號。

在UNIX系統中kill指令其實就是發送信號給進程,不加參數地調用就會發送TERM信號使進程安全退出或被忽視掉,通常這不是個好辦法,因為如果進程沒有退出,kill就會發送KILL信號來強制退出,這會導致你的程序無法可控地結束。

asyncio原生支持處理進程信號,但處理一般信號的復雜度太高(不是針對asyncio),本文不會深入講解,只會挑一些常見信號來舉例。先看下例:

# shell_signal01.py
import asyncio

async def main():   # 這裏是應用的主體部分,簡單的用一個死循環來表示程序運行
    while True:
        print(‘<Your app is running>‘)
        await asyncio.sleep(1)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.create_task(main())    # 這裏與前幾個例子一樣,將coroutine添加到loop中
    try:
        loop.run_forever()
    except KeyboardInterrupt:   # 在本例中,只有Ctrl-C會終止loop,然後像前例中進行善後工作
        print(‘<Got signal: SIGINT, shutting down.>‘)
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()

這些很簡單,下面思考一些復雜的功能:
1. 產品需要將SIGINT和SIGTERM都當作停止信號;
2. 需要在應用的main()中處理CancelledError,並且處理異常的代碼也需要一小段時間來運行(例如有一堆網絡連接需要關閉);
3. 應用多次接收停止信號不會出現異常,在接收到一次停止信號後,後續的信號都不作處理。

asyncio提供了足夠粒度的API來處理這些場景。

# shell_signal02.py
import asyncio
from signal import SIGINT, SIGTERM    # 從標準庫中導入信號值

async def main():
    try:
        while True:
            print(‘<Your app is running>‘)
            await asyncio.sleep(1)
    except asyncio.CancelledError:  # 1
        for i in range(3):
            print(‘<Your app is shtting down...>‘)
            await asyncio.sleep(1)

def handler(sig):   # 2
    loop.stop()    # 3
    print(f‘Got signal: {sig}, shtting down.‘)
    loop.remove_signal_handler(SIGTERM)    # 4
    loop.add_signal_handler(SIGINT, lambda: None)   # 5


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    for sig in (SIGINT, SIGTERM):   # 6
        loop.add_signal_handler(sig, handler, sig)
    loop.create_task(main())
    loop.run_forever()
    tasks = asyncio.Task.all_tasks()
    group = asyncio.gather(*tasks, return_exceptions=True)
    group.cancel()
    loop.run_until_complete(group)
    loop.close()
  1. 現在在coroutine內部處理停止業務,在調用group.cancel()時收到取消信號,在處理關閉loop的run_until_complete階段,main將繼續運行一段時間;

  2. 這是收到信號後的回調函數,它通過add_signal_handler()修改了loop的配置;

  3. 在回調函數開始執行時,首先要停止loop,這使得關閉業務代碼開始執行;

  4. 此時已經開始停止代碼業務,因此移除SIGTERM來忽視後續的停止信號,否則會使停止代碼業務也被終止;

  5. 原理與上面類似,但SIGINT不能簡單地remove,因為KeyboardInterrupt默認是SIGINT信號的handler,需要將SIGINT的handler置空;

  6. 在這裏配置信號的回調函數,都指向handler,因此配置了SIGINT的handler,會覆蓋掉默認的KeyboardInterrupt。


在關閉過程中等待Executor執行

在QuickStart中有一段代碼使用了阻塞的sleep()調用,當時說明了一個情況即如果該阻塞調用耗時比loop的執行耗時長時會發生什麽,現在來討論,先放結論,如果不進行人工幹預將會得到一系列errors。

import time
import asyncio

async def main():
    print(f‘{time.ctime()} Hello!‘)
    await asyncio.sleep(1.0)
    print(f‘{time.ctime()} Goodbye!‘)
    loop.stop()


def blocking():
    time.sleep(1.5)
    print(f"{time.ctime()} Hello from a thread!")


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()
λ python3 quickstart.py
Sun Sep 30 14:11:57 2018 Hello!
Sun Sep 30 14:11:58 2018 Goodbye!
Sun Sep 30 14:11:59 2018 Hello from a thread!
exception calling callback for <Future at 0x36cff70 state=finished returned NoneType>
Traceback (most recent call last):
    ...
    raise RuntimeError(‘Event loop is closed‘)
RuntimeError: Event loop is closed

來看下背後發生了什麽,run_in_executor()返回的是Future而不是Task,這說明它不能被asyncio.Task.all_tasks()感知,所以後續的run_until_complete()也就不會等待這個Future執行完畢。

有三個解決思路,都經過了不同程度的權衡,下面逐個過一遍,從不同視角觀察事件loop的內涵,思考在程序中相互調用的所有coroutine、線程、子進程的生命周期管理。

第一個思路,將executor放到coroutine中並以此建立一個task。

# OPTION-A
import time
import asyncio

async def main():
    print(f‘{time.ctime()} Hello!‘)
    await asyncio.sleep(1.0)
    print(f‘{time.ctime()} Goodbye!‘)
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

async def run_blocking():  # 1
    await loop.run_in_executor(None, blocking)

loop = asyncio.get_event_loop()
loop.create_task(main())
loop.create_task(run_blocking())  # 2
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()
  1. 這個想法是run_in_executor返回的Future而不是task,雖然無法用all_tasks()捕獲,但可以用await等待一個Future,所以用一個新的coroutine來await在executor中的阻塞調用,這個新的coroutine將被作為task添加到loop;

  2. 就像運行main一樣將這個coroutine添加到loop中。


上述代碼看起來不錯,除了不能執行任務取消。可以發現代碼中少了group.cancel(),倘若加回來又會得到Event loop is closed錯誤,甚至不能在run_blocking()中處理CancelledError以便重新await Future,無論做什麽該task都會被取消,但executor會將其內部的sleep執行完。

第二個思路,收集尚未完成的task,僅取消它們,但在調用run_until_complete()之前要將run_in_executor()生成的Future添加進去。

# OPTION-B
import time
import asyncio

async def main():
    print(f‘{time.ctime()} Hello!‘)
    await asyncio.sleep(1.0)
    print(f‘{time.ctime()} Goodbye!‘)
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 1
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)   # 2
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
group_tasks.cancel()    # 取消tasks
group = asyncio.gather(group_task, future)  # 3
loop.run_until_complete(group)
loop.close()
  1. 記錄返回的Future;

  2. 此處loop已停止,先獲得所有task,註意這裏面沒有executor的Future;

  3. 創建了一個新的group來合並tasks和Future,在這種情況下executor也能正常退出,而tasks仍然通過正常的cancel來取消。


這個解決辦法在關閉時比較友好,但仍然有缺陷。通常來說,在整個程序中通過某種方式收集所有的executor返回的Future對象,然後與tasks合並,然後等待執行完成,這十分不方便,雖然有效,但還有更好的解決辦法。

# OPTION-C
import time
import asyncio
from concurrent.futures import ThreadPoolExecutor as Executor

async def main():
    print(f‘{time.ctime()} Hello!‘)
    await asyncio.sleep(1.0)
    print(f‘{time.ctime()} Goodbye!‘)
    loop.stop()

def blocking():
    time.sleep(2.0)
    print(f"{time.ctime()} Hello from a thread!")

loop = asyncio.get_event_loop()
executor = Executor()   # 1
loop.set_default_executor(executor)    # 2
loop.create_task(main())
future = loop.run_in_executor(None, blocking)   # 3
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
loop.run_until_complete(group)
executor.shutdown(wait=True)    # 4
loop.close()
  1. 建立自己的executor實例;

  2. 將其設定為loop的默認executor;

  3. 像以前一樣;

  4. 明確地在loop關閉前等待executor的所有Future執行完,這可以避免"Event loop is closed"這樣的錯誤信息,能這樣做是因為獲得了使用executor的權限,而asyncio默認的executor沒有開放相應的接口調用。

現在可以在任何地方調用run_in_executor(),並且程序可以優雅地退出了。

深入Asyncio(十一)優雅地開始與結束