1. 程式人生 > >真正的 Tornado 非同步非阻塞

真正的 Tornado 非同步非阻塞

原文出處https://hexiangyu.me/posts/15

其中 Tornado 的定義是 Web 框架和非同步網路庫,其中他具備有非同步非阻塞能力,能解決他兩個框架請求阻塞的問題,在需要併發能力時候就應該使用 Tornado。

但是在實際使用過程中很容易把 Tornado 使用成非同步阻塞框架,這樣對比其他兩大框架沒有任何優勢而言,本文就如何實現真正的非同步非阻塞記錄。

以下使用的 Python 版本為 2.7.13
平臺為 Macbook Pro 2016

使用 gen.coroutine 非同步程式設計


在 Tornado 中兩個裝飾器:

  • tornado.web.asynchronous
  • tornado.gen.coroutine

asynchronous 裝飾器是讓請求變成長連線的方式,必須手動呼叫 self.finish() 才會響應

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        # bad 
        self.write("Hello, world")

asynchronous 裝飾器不會自動呼叫self.finish() ,如果沒有沒有指定結束,該長連線會一直保持直到 pending 狀態。
沒有呼叫self.finish()


所以正確是使用方式是使用了 asynchronous 需要手動 finish

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()

coroutine 裝飾器是指定改請求為協程模式,說明白點就是能使用 yield 配合 Tornado 編寫非同步程式。

Tronado 為協程實現了一套自己的協議,不能使用 Python 普通的生成器。

在使用協程模式程式設計之前要知道如何編寫 Tornado 中的非同步函式,Tornado 提供了多種的非同步編寫形式:回撥、Future、協程等,其中以協程模式最是簡單和用的最多。

編寫一個基於協程的非同步函式同樣需要 coroutine 裝飾器

@gen.coroutine
def sleep(self):
    yield gen.sleep(10)
    raise gen.Return([1, 2, 3, 4, 5])

這就是一個非同步函式,Tornado 的協程非同步函式有兩個特點:

  • 需要使用 coroutine 裝飾器
  • 返回值需要使用 raise gen.Return() 當做異常丟擲

返回值作為異常丟擲是因為在 Python 3.2 之前生成器是不允許有返回值的。

使用過 Python 生成器應該知道,想要啟動生成器的話必須手動執行 next() 方法才行,所以這裡的 coroutine 裝飾器的其中一個作用就是在呼叫這個非同步函式時候自動執行生成器。

使用 coroutine 方式有個很明顯是缺點就是嚴重依賴第三方庫的實現,如果庫本身不支援 Tornado 的非同步操作再怎麼使用協程也是白搭依然會是阻塞的,放個例子感受一下。

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()


class NoBlockingHnadler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        yield gen.sleep(10)
        self.write('Blocking Request')


class BlockingHnadler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write('Blocking Request')

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/block", BlockingHnadler),
        (r"/noblock", NoBlockingHnadler),
    ], autoreload=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()

為了顯示更明顯設定了 10 秒

當我們使用 yield gen.sleep(10) 這個非同步的 sleep 時候其他請求是不阻塞的。
非阻塞效果圖
當使用 time.sleep(10) 時候會阻塞其他的請求。
阻塞效果圖
這裡的非同步非阻塞是針對另一請求來說的,本次的請求該是阻塞的仍然是阻塞的。

gen.coroutine 在 Tornado 3.1 後會自動呼叫 self.finish() 結束請求,可以不使用 asynchronous 裝飾器。

所以這種實現非同步非阻塞的方式需要依賴大量的基於 Tornado 協議的非同步庫,使用上比較侷限,好在還是有一些可以用的非同步庫

基於執行緒的非同步程式設計


使用 gen.coroutine 裝飾器編寫非同步函式,如果庫本身不支援非同步,那麼響應任然是阻塞的。

在 Tornado 中有個裝飾器能使用 ThreadPoolExecutor 來讓阻塞過程程式設計非阻塞,其原理是在 Tornado 本身這個執行緒之外另外啟動一個執行緒來執行阻塞的程式,從而讓 Tornado 變得阻塞。

futures 在 Python3 是標準庫,但是在 Python2 中需要手動安裝
pip install futuimport time

import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()


class NoBlockingHnadler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(4)

    @run_on_executor
    def sleep(self, second):
        time.sleep(second)
        return second

    @gen.coroutine
    def get(self):
        second = yield self.sleep(5)
        self.write('noBlocking Request: {}'.format(second))

def make_app():
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/noblock", NoBlockingHnadler),
    ], autoreload=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()

ThreadPoolExecutor 是對標準庫中的 threading 的高度封裝,利用執行緒的方式讓阻塞函式非同步化,解決了很多庫是不支援非同步的問題。
這裡寫圖片描述
但是與之而來的問題是,如果大量使用執行緒化的非同步函式做一些高負載的活動,會導致該 Tornado 程序效能低下響應緩慢,這只是從一個問題到了另一個問題而已。

所以在處理一些小負載的工作,是能起到很好的效果,讓 Tornado 非同步非阻塞的跑起來。

但是明明知道這個函式中做的是高負載的工作,那麼你應該採用另一種方式,使用 Tornado 結合 Celery 來實現非同步非阻塞。

基於 Celery 的非同步程式設計

Celery 是一個簡單、靈活且可靠的,處理大量訊息的分散式系統,專注於實時處理的任務佇列,同時也支援任務排程。
Celery 並不是唯一選擇,你可選擇其他的任務佇列來實現,但是 Celery 是 Python 所編寫,能很快的上手,同時 Celery 提供了優雅的介面,易於與 Python Web 框架整合等特點。

與 Tornado 的配合可以使用 tornado-celery ,該包已經把 Celery 封裝到 Tornado 中,可以直接使用。

實際測試中,由於 tornado-celery 很久沒有更新,導致請求會一直阻塞,不會返回

解決辦法是:

把 celery 降級到 3.1 pip install celery==3.1
把 pika 降級到 0.9.14 pip install pika==0.9.14

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

import tcelery, tasks

tornado.options.parse_command_line()
tcelery.setup_nonblocking_producer()


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()


class CeleryHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[5])
        self.write('CeleryBlocking Request: {}'.format(response.result))


def make_app(): 
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/celery-block", CeleryHandler),
    ], autoreload=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()
import os
import time
from celery import Celery
from tornado import gen

celery = Celery("tasks", broker="amqp://")
celery.conf.CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'amqp')

@celery.task
def sleep(seconds):
    time.sleep(float(seconds))
    return seconds

if __name__ == "__main__":
    celery.start()

這裡寫圖片描述
Celery 的 Worker 執行在另一個程序中,獨立於 Tornado 程序,不會影響 Tornado 執行效率,在處理複雜任務時候比程序模式更有效率。

總結


方法 優點 缺點 可用性
gen.coroutine 簡單、優雅 需要非同步庫支援 ★★☆☆☆
執行緒 簡單 可能會影響效能 ★★★☆☆
Celery 效能好 操作複雜、版本低 ★★★☆☆

目前沒有找到最佳的非同步非阻塞的程式設計模式,可用的非同步庫比較侷限,只有經常用的,個人編寫非同步庫比較困難。

推薦使用執行緒和 Celery 的模式進行非同步程式設計,輕量級的放線上程中執行,複雜的放在 Celery 中執行。當然如果有非同步庫使用那最好不過了。

Python 3 中可以把 Tornado 設定為 asyncio 的模式,這樣就使用 相容 asyncio 模式的庫,這應該是日後的方向。

Reference