1. 程式人生 > >Flask解析(二):Flask-Sqlalchemy與多執行緒、多程序

Flask解析(二):Flask-Sqlalchemy與多執行緒、多程序

原創作者:flowell,轉載請標明出處:https://www.cnblogs.com/flowell/p/multiprocessing_flask_sqlalchemy.html


 

 

Sqlalchemy

  flask-sqlalchemy的session是執行緒安全的,但在多程序環境下,要確保派生子程序時,父程序不存在任何的資料庫連線,可以通過呼叫db.get_engine(app=app).dispose()來手動銷燬已經建立的engine,然後再派生子程序。


 

  最近線上的專案總是會報出資料庫連線相關的錯誤,比如“Command out of Sync”,“Mysql server has gone away”,“Lost databse connection”,“Package sequence out of order”等等,最終解決下來,發現以上錯誤可以分為兩種,一種是和連線丟失有關的,一種是和連線被多個執行緒(程序)同時使用了有關。

 

  我們專案基於flask,有多執行緒的場景,也有多程序的場景。orm用的是flask的拓展flask-sqlalchemy。flask-sqlalchemy的使用必須基於flask的app例項,也就是說要在app上下文中才能使用flask-sqlalchemy,所以在某些離線(非web)場景下,我們也用到了原生的Sqlalchemy。

 

  原生的Sqlalchemy的使用方式是

engine = create_engine(db_url)
Session = sessionmaker(bind=engine)
session = Session()
session.query(xxx)

  首先要建立一個engine,engine顧名思義就是和資料庫連線的引擎。在實際發起查詢前,是不會建立任何connection的。建立engine時可以通過指定poolclass引數來指定engine使用的連線池。預設是QueuePool,也可以設定為NullPool(不使用連線池)。為了方便理解,可以把engine視為管理連線池的物件。

 

  sqlalchemy中session和我們平時資料庫裡說的session是兩個不同的概念,在平時資料庫中,session的生命週期從連線上資料庫開始,到斷開和資料庫的連線位置。但是sqlalchemy中的session更多的是一種管理連線的物件,它從連線池取出一個連線,使用連線,然後釋放連線,而自身也跟隨著銷燬。sqlalchemy中的Connection物件是管理真正資料庫連線的物件,真正的資料庫連線在sqlalchemy中是DBAPI。

 

  預設地,如果不傳入poolclass,則使用QueuePool(具有一定數量的連線池),如果不指定pool_recycle引數,則預設資料庫連線不會重新整理。也就是說連線如果不適用,則一直不去重新整理它。但是問題來了,在Mysql中,輸入“show variables like "%timeout%"; ” ,可以看到有一個waittimeout,還有interacttimeout,預設值為28800(8小時),這兩個值代表著,如果8個小時內某個資料庫連線都不和mysql聯絡,那麼就會斷掉這個連線。所以,8個小時過去了,Mysql把連線斷掉了,但是sqlalchemy客戶端這邊卻還保持著這個連線。當某個時候該連線從連線池被取出使用時,就會丟擲“Mysql server has gone away”等連線丟失的資訊。

 

  解決這個問題的辦法很簡單,只要傳入pool_recycle引數即可。特別地,在flask-sqlalchemy中不會出現這種問題,因為falsk-sqlalchemy拓展自動地幫我們注入了pool_recycle引數,預設為7200秒。

 

def apply_driver_hacks(self, app, sa_url, options):
        """This method is called before engine creation and used to inject
        driver specific hacks into the options.  The `options` parameter is
        a dictionary of keyword arguments that will then be used to call
        the :func:`sqlalchemy.create_engine` function.
        The default implementation provides some saner defaults for things
        like pool sizes for MySQL and sqlite.  Also it injects the setting of
        `SQLALCHEMY_NATIVE_UNICODE`.
        """
        if sa_url.drivername.startswith('mysql'):
            sa_url.query.setdefault('charset', 'utf8')
            if sa_url.drivername != 'mysql+gaerdbms':
                options.setdefault('pool_size', 10)
                options.setdefault('pool_recycle', 7200)  # 預設7200秒重新整理連線
        elif sa_url.drivername == 'sqlite':
            pool_size = options.get('pool_size')
            detected_in_memory = False
            if sa_url.database in (None, '', ':memory:'):
                detected_in_memory = True
                from sqlalchemy.pool import StaticPool
                options['poolclass'] = StaticPool
                if 'connect_args' not in options:
                    options['connect_args'] = {}
                options['connect_args']['check_same_thread'] = False

                # we go to memory and the pool size was explicitly set
                # to 0 which is fail.  Let the user know that
                if pool_size == 0:
                    raise RuntimeError('SQLite in memory database with an '
                                       'empty queue not possible due to data '
                                       'loss.')
            # if pool size is None or explicitly set to 0 we assume the
            # user did not want a queue for this sqlite connection and
            # hook in the null pool.
            elif not pool_size:
                from sqlalchemy.pool import NullPool
                options['poolclass'] = NullPool

            # if it's not an in memory database we make the path absolute.
            if not detected_in_memory:
                sa_url.database = os.path.join(app.root_path, sa_url.database)

        unu = app.config['SQLALCHEMY_NATIVE_UNICODE']
        if unu is None:
            unu = self.use_native_unicode
        if not unu:
            options['use_native_unicode'] = False

        if app.config['SQLALCHEMY_NATIVE_UNICODE'] is not None:
            warnings.warn(
                "The 'SQLALCHEMY_NATIVE_UNICODE' config option is deprecated and will be removed in"
                " v3.0.  Use 'SQLALCHEMY_ENGINE_OPTIONS' instead.",
                DeprecationWarning
            )
        if not self.use_native_unicode:
            warnings.warn(
                "'use_native_unicode' is deprecated and will be removed in v3.0."
                "  Use the 'engine_options' parameter instead.",
                DeprecationWarning
            )

  

  sessionmaker是Session定製方法,我們把engine傳入sessionmaker中,就可以得到一個session工廠,通過工廠來生產真正的session物件。但是這種生產出來的session是執行緒不安全的,sqlalchemy提供了scoped_session來幫助我們生產執行緒安全的session,原理類似於Local,就是代理session,通過執行緒的id來找到真正屬於本執行緒的session。

 

  flask-sqlalchemy就是使用了scoped_session來保證執行緒安全,具體的程式碼可以在Sqlalchemy中看到,構造session時,使用了scoped_session。

 

def create_scoped_session(self, options=None):
        """Create a :class:`~sqlalchemy.orm.scoping.scoped_session`
        on the factory from :meth:`create_session`.
        An extra key ``'scopefunc'`` can be set on the ``options`` dict to
        specify a custom scope function.  If it's not provided, Flask's app
        context stack identity is used. This will ensure that sessions are
        created and removed with the request/response cycle, and should be fine
        in most cases.
        :param options: dict of keyword arguments passed to session class  in
            ``create_session``
        """

        if options is None:
            options = {}

        scopefunc = options.pop('scopefunc', _app_ctx_stack.__ident_func__)
        options.setdefault('query_cls', self.Query)
        return orm.scoped_session(
            self.create_session(options), scopefunc=scopefunc
        )

    def create_session(self, options):
        """Create the session factory used by :meth:`create_scoped_session`.
        The factory **must** return an object that SQLAlchemy recognizes as a session,
        or registering session events may raise an exception.
        Valid factories include a :class:`~sqlalchemy.orm.session.Session`
        class or a :class:`~sqlalchemy.orm.session.sessionmaker`.
        The default implementation creates a ``sessionmaker`` for :class:`SignallingSession`.
        :param options: dict of keyword arguments passed to session class
        """

        return orm.sessionmaker(class_=SignallingSession, db=self, **options)

  

多程序和資料庫連線

  多程序環境下,要注意和資料庫連線相關的操作。


 

  說到多程序,python裡最常用的就是multiprocessing。multiprocessing在windows下和linux的表現有所區別,在此只討論linux下的表現。linux下多程序通過fork()來派生,要理解我下面說的必須先弄懂fork()是什麼東西。粗略地說,每個程序都有自己的一個空間,稱為程序空間,每個程序的程序空間都是獨立的,程序與程序之間互不干擾。fork()的作用,就是將一個程序的程序空間,完完全全地copy一份,copy出來的就是子程序了,所以我們說子程序和父程序有著一模一樣的地址空間。地址空間就是程序執行的空間,這空間裡會有程序已經開啟的檔案描述符,檔案描述符會間接地指向程序已經開啟的檔案。也就是說,fork()之後,父程序,子程序會有相同的檔案描述符,指向相同的一個檔案。為什麼?因為檔案是存在硬盤裡的,fork()時copy的記憶體中的程序空間,並沒有把檔案也copy一份。這就導致了,父程序,子程序,同時指向同一個檔案,他們任意一個都可以對這個檔案進行操作。這和本文說的資料庫有啥關係?順著這個思路想,資料庫連線是不是一個TCP連線?TCP連線是不是一個socket?socket在linux下是什麼,就是一個檔案。所以說,如果父程序在fork()之前打開了資料庫連線,那麼子程序也會擁有這個開啟的連線。

 

  兩個程序同時寫一個連線會導致資料混亂,所以會出現“Command out of sync”的錯誤,兩個程序同時讀一個連線,會導致一個程序讀到了,另一個沒讀到,就是“No result”。一個程序關閉了連線,另一個程序並不知道,它試圖去操作連線時,就會出現“Lost database connection”的錯誤。

 

  在此討論的場景是,父程序在派生子程序之前,父程序擁有已開啟的資料庫連線。派生出子程序之後,子程序也就擁有了相應的連線。如果在fork()之前父程序沒有開啟資料庫連線,那麼也不用擔心這個問題。比如Celery使用的prefork池,雖然是多程序模型,但是celery在派子程序前時不會開啟資料庫連線的,所以不用擔心在celery任務中會出現資料庫連線混亂的問題。

 

   我做的專案裡的多程序的場景之一就是使用tornado來跑web應用,在派生多個web應用例項時,確保此前建立的資料庫連線被銷燬。

 

app = Flask()
db = Sqlalchemy()
db.init_app(app)
...
...
db.get_engine(app=app).dispose()  # 先銷燬已有的engine,確保父程序沒有資料庫連線
...
...
fork()    # 派生子程序

# 例如
tornado.start()  # 啟動多個web例項程序