1. 程式人生 > >理解Python事件驅動程式設計(Event Loop)

理解Python事件驅動程式設計(Event Loop)

  2年前我學習Python的時候只知道併發程式設計模型只有多程序和多執行緒,這兩個模型的特點都是交由作業系統排程,無法人為控制,而且短板很明顯,上下文切換和建立開銷都是問題。後來又聽說了Python的協程-使用者級執行緒,可以人為排程,雖然輕量,但是本質上都是利用多個worker避免一個worker帶來的阻塞問題。後來接觸到Tornado,知道了Python的非同步程式設計,號稱單執行緒非同步高效能web伺服器。那個時候我一直有個疑問,既然是單執行緒,它是怎麼做到這麼高效能的,是不是內部在TCP層優化了,或者是利用了TCP層的什麼複用技術。再後來我知道Tornado裡面有個event loop的概念,由此我終於知道了事件驅動程式設計,Python所謂非同步程式設計的真正面目。

  想要理解Python的事件驅動程式設計,就必須首先理解Python的協程、yield、以及IO多路複用(select、poll、epoll三件套)。但是我們還是要分兩種場景來說明,一種是作為服務端,另一種是作為客戶端。作為服務端,event loop最核心的就是IO多路複用技術(不懂的話可以自行百度),所有來自客戶端的請求都由IO多路複用函式來處理,我覺得這個決定了Python的非同步程式設計並不是真正的非同步,在select返回準備好了的事件,依然是輪詢處理,只要其中一個事件阻塞了,也會阻塞其他事件。

展示下tornado的原始碼,只看中文的註釋,其他細節先不管,太複雜了

while True:
    # Prevent IO event starvation by delaying new callbacks
    # to the next iteration of the event loop.
    with self._callback_lock:
        callbacks = self._callbacks
        self._callbacks = []

    # Add any timeouts that have come due to the callback list.
    # Do not run anything until we have determined which ones
    # are ready, so timeouts that call add_timeout cannot
    # schedule anything in this iteration.
    due_timeouts = []
    if self._timeouts:
        now = self.time()
        while self._timeouts:
            if self._timeouts[0].callback is None:
                # The timeout was cancelled.  Note that the
                # cancellation check is repeated below for timeouts
                # that are cancelled by another timeout or callback.
                heapq.heappop(self._timeouts)
                self._cancellations -= 1
            elif self._timeouts[0].deadline <= now:
                due_timeouts.append(heapq.heappop(self._timeouts))
            else:
                break
        if (self._cancellations > 512
                and self._cancellations > (len(self._timeouts) >> 1)):
            # Clean up the timeout queue when it gets large and it's
            # more than half cancellations.
            self._cancellations = 0
            self._timeouts = [x for x in self._timeouts
                              if x.callback is not None]
            heapq.heapify(self._timeouts)

    for callback in callbacks:
        self._run_callback(callback)
    for timeout in due_timeouts:
        if timeout.callback is not None:
            self._run_callback(timeout.callback)
    # Closures may be holding on to a lot of memory, so allow
    # them to be freed before we go into our poll wait.
    callbacks = callback = due_timeouts = timeout = None

    if self._callbacks:
        # If any callbacks or timeouts called add_callback,
        # we don't want to wait in poll() before we run them.
        poll_timeout = 0.0
    elif self._timeouts:
        # If there are any timeouts, schedule the first one.
        # Use self.time() instead of 'now' to account for time
        # spent running callbacks.
        poll_timeout = self._timeouts[0].deadline - self.time()
        poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
    else:
        # No timeouts and no callbacks, so use the default.
        poll_timeout = _POLL_TIMEOUT

    if not self._running:
        break

    if self._blocking_signal_threshold is not None:
        # clear alarm so it doesn't fire while poll is waiting for
        # events.
        signal.setitimer(signal.ITIMER_REAL, 0, 0)

    try:
        # 監聽事件
        event_pairs = self._impl.poll(poll_timeout)
    except Exception as e:
        # Depending on python version and IOLoop implementation,
        # different exception types may be thrown and there are
        # two ways EINTR might be signaled:
        # * e.errno == errno.EINTR
        # * e.args is like (errno.EINTR, 'Interrupted system call')
        if errno_from_exception(e) == errno.EINTR:
            continue
        else:
            raise

    if self._blocking_signal_threshold is not None:
        signal.setitimer(signal.ITIMER_REAL,
                         self._blocking_signal_threshold, 0)

    # Pop one fd at a time from the set of pending fds and run
    # its handler. Since that handler may perform actions on
    # other file descriptors, there may be reentrant calls to
    # this IOLoop that update self._events
    self._events.update(event_pairs)
    # 迴圈處理
    while self._events:
        fd, events = self._events.popitem()
        try:
            fd_obj, handler_func = self._handlers[fd]
            handler_func(fd_obj, events)
        except (OSError, IOError) as e:
            if errno_from_exception(e) == errno.EPIPE:
                # Happens when the client closes the connection
                pass
            else:
                self.handle_callback_exception(self._handlers.get(fd))
        except Exception:
            self.handle_callback_exception(self._handlers.get(fd))
    fd_obj = handler_func = None

作為客戶端,實際上並不會用到IO多路複用,在tornado中是通過註冊回撥函式,ioloop每次都會在開始輪詢callbacks陣列並處理這些回撥函式,其中的run_sync函式其實就是拿到使用者定義的函式,利用內部的run函式註冊回撥函式,到時候直接執行。所以如果使用者定義的函式是個協程,就必須使用gen.coroutine來激發協程,這也是gen.coroutine本質的作用。

上面的程式碼裡已經展示了執行callback的過程,如下:

for callback in callbacks:
     self._run_callback(callback)

總結:作為服務端,event loop的核心在於IO多路複用技術;作為客戶端,event loop的核心在於利用Future物件延遲執行,並使用send函式激發協程,並不會使用到IO多路複用技術,但是對於IO還是做了優化的,可以看tornado TCPClient類的實現,socket設定為非阻塞,但是aiohttp對客戶端的IO還是利用了IO多路複用技術,使得效能更好。所以不管是作為客戶端還是服務端,event loop本質上都不是非同步的,所以一定會有阻塞問題存在,在我知道的使用IO多路複用技術的這些框架中,大部分都是再利用執行緒池處理耗時的操作,這樣會極大的提高併發量,神不知鬼不覺得達到了非同步的效果。