1. 程式人生 > >tornado非同步原理(1)--非同步事件

tornado非同步原理(1)--非同步事件

tornado非同步原理

tornado有四類非同步事件:立即事件,定時器非同步事件,io非同步事件,Future非同步事件。

tornado 的ioloop管理所有的非同步事件,並在適當的時機呼叫非同步事件的回掉函式。

四類非同步事件均在ioloop的start函式中排程。

立即事件:

場景:當前函式執行完後,下次ioloop排程時直接排程某函式
用法:ioloop.add_callback(callback, *args, **kwargs)
原理:立即事件全部存放在ioloop._callbacks中,IOLoop每次迴圈都會呼叫這些立即事件的回撥函式

def start(self):
    
while True: ncallbacks = len(self._callbacks) #self._callbacks用於存放所有的立即事件 due_timeouts = [] if self._timeouts: now = self.time() while self._timeouts: if self._timeouts[0].callback is None: heapq.heappop(self._timeouts) self._cancellations
-= 1 elif self._timeouts[0].deadline <= now: due_timeouts.append(heapq.heappop(self._timeouts)) else: break for i in range(ncallbacks): self._run_callback(self._callbacks.popleft()) #迴圈呼叫所有的立即事件的回撥函式 for
timeout in due_timeouts: if timeout.callback is not None: self._run_callback(timeout.callback) if self._callbacks: #如果在上面呼叫回撥函式的過程中,又添加了新的立即事件,則將等待IO事件的時間設定為0,以便及時呼叫新的立即事件 poll_timeout = 0.0 elif self._timeouts: poll_timeout = self._timeouts[0].deadline - self.time() poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT)) else: poll_timeout = _POLL_TIMEOUT event_pairs = self._impl.poll(poll_timeout) self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events)

 

定時器非同步事件:
場景:使用者希望在某一段時間後執行某函式
用法:ioloop.call_at(when, callback, *args, **kwargs), ioloop.call_later(delay, callback, *args, **kwargs)
原理:定時器事件存放在ioloop._timeouts中,IOLoop每次迴圈開始都會找出所有已經超時的定時器,並呼叫對應的回撥函式

def start(self):
    while True:
        ncallbacks = len(self._callbacks)
        due_timeouts = [] #用於存放超時的事件
        if self._timeouts: #self._timeouts用於存放所有定時器事件
            now = self.time()
            while self._timeouts:
                if self._timeouts[0].callback is None: #如果定時器事件沒有回掉函式,則說明已經取消,直接丟棄
                    heapq.heappop(self._timeouts) #heapq是一個數據結構,它保證heapq[0]永遠是最小的一個元素
                    self._cancellations -= 1
                elif self._timeouts[0].deadline <= now: #如果定時器已經超時,則取出並新增至due_timeouts中
                    due_timeouts.append(heapq.heappop(self._timeouts))
                else: #因為heapq的特性,如果執行到這一步,說明剩下事件都沒有超時,退出迴圈
                    break
        for i in range(ncallbacks):
            self._run_callback(self._callbacks.popleft())
        for timeout in due_timeouts:
            if timeout.callback is not None:
                self._run_callback(timeout.callback) #迴圈呼叫所有已超時定時器事件的回撥函式

        if self._callbacks:
            poll_timeout = 0.0
        elif self._timeouts:   #根據最小定時器事件的時間設定等待IO事件的時間
            poll_timeout = self._timeouts[0].deadline - self.time()
            poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
        else:
            poll_timeout = _POLL_TIMEOUT

        event_pairs = self._impl.poll(poll_timeout)
        self._events.update(event_pairs)
        while self._events:
            fd, events = self._events.popitem()
            fd_obj, handler_func = self._handlers[fd]
            handler_func(fd_obj, events)

 

IO非同步事件:
場景:等待某個檔案描述符的某個事件,如TCPserver等待socket的READ事件
用法:ioloop.add_handler(fd, callback, events)
原理:所有的檔案描述符全部存放在ioloop._impl中,windows平臺下_impl是tornado.platform.select.SelectIOLoop物件
在linux平臺下_impl是tornado.platform.epoll.EPollIOLoop物件,作用都是同時監聽多個檔案描述符

def start(self):
    while True:
        ncallbacks = len(self._callbacks) 
        due_timeouts = [] 
        if self._timeouts: 
            now = self.time()
            while self._timeouts:
                if self._timeouts[0].callback is None: 
                    heapq.heappop(self._timeouts) 
                    self._cancellations -= 1
                elif self._timeouts[0].deadline <= now: 
                    due_timeouts.append(heapq.heappop(self._timeouts))
                else: 
                    break
        for i in range(ncallbacks):
            self._run_callback(self._callbacks.popleft()) 
        for timeout in due_timeouts:
            if timeout.callback is not None:
                self._run_callback(timeout.callback) 

        if self._callbacks: 
            poll_timeout = 0.0
        elif self._timeouts:   
            poll_timeout = self._timeouts[0].deadline - self.time()
            poll_timeout = max(0, min(poll_timeout, _POLL_TIMEOUT))
        else:
            poll_timeout = _POLL_TIMEOUT

        event_pairs = self._impl.poll(poll_timeout) #監聽所有檔案描述符
        self._events.update(event_pairs) 
        while self._events:
            fd, events = self._events.popitem()
            fd_obj, handler_func = self._handlers[fd]
            handler_func(fd_obj, events)  #迴圈呼叫所有檔案描述符對應的回撥函式

 

Future非同步事件:
場景:等待某個非同步事件結束後執行回掉函式
用法:ioloop.add_future(future, callback), future.add_done_callback(callback)
原理:非同步事件結束後呼叫Future.set_result(),當執行set_result時將future所有的回掉函式新增為ioloop的立即事件

class Future(object):
    def set_result(self, result):
        self._result = result
        self._set_done()
        
    def _set_done(self):
        self._done = True
        if self._callbacks:
            from tornado.ioloop import IOLoop
            loop = IOLoop.current()
            for cb in self._callbacks:
                loop.add_callback(cb, self) #將所有的回掉函式設定為ioloop的立即事件
            self._callbacks = None