1. 程式人生 > >深入tornado中的TCPServer

深入tornado中的TCPServer

efault 特性 else 系統調用 -- 整數 pat 文件描述 acad

1 梳理:

  應用層的下一層是傳輸層,而http協議一般是使用tcp的,所以實現tcp的重要性就不言而喻。

  由於tornado中實現了ioloop這個反應器以及iostream這個對連接的異步讀寫,所以tcp就很容易實現異步。

  在tornado的tcpserver文件中,實現了TCPServer這個類,他是一個單線程的,非阻塞的tcp 服務。

  為了與上層協議(在tornado中就是HTTPServer)交互,TCPServer提供了一個接口:handle_stream, 要求其子類必需實現該方法。

  TCPserver大體上實現了兩種啟動方式:單進程模式以及多進程模式(多進程模式需要Linux環境

)。 因為多進程方式是單進程的復雜版本,所以講了多進程那麽單進程就很好理解了。

下面就開始吧

2 準備知識點

  因為多進程模式需要Linux環境,所以需要對Linux有個基本的了解

  在Linux中,創建一個子進程只需要調用fork()系統調用就可以了,fork調用會返回兩次,子進程返回0,父進程返回子進程的pid。然後子進程和父進程繼續執行fork調用之後的語句,子進程獲得父進程數據空間,堆,棧的完全副本(也就是內存空間是獨立的)。因為fork調用之後經常會執行exec,所以Linux一般采用寫時復制(copy on write),父進程和子進程共享統一數據空間,只有當某個內存區域被修改時,才將該區域復制為副本。

  另外,盡管父進程打開的文件描述符都“復制”到了子進程,但由於父子進程的文件描述符指向同一個文件表項,所以不管是父進程或者是子進程對文件描述符進行修改,都會反映到子進程或者父進程中。所以可以這麽說:父子進程共享文件描述符

技術分享
import os
import socket
import fcntl

def set_close_exec(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFD)
    fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)

a 
= 你好 sk = socket.socket() set_close_exec(sk.fileno()) sk.bind((127.0.0.1, 8888)) sk.listen(1) def start_child(): id = os.fork() if id == 0: print(I am child process (%s) and my parent is %s. % (os.getpid(), os.getppid())) print(a) print(----------, sk.fileno()) return else: print(I (%s) just created a child process (%s). % (os.getpid(), id)) print(haha) start_child() print(done)
可以通過這段代碼簡單測驗一下fork調用的特性

3 要開車了:

tornado多進程模式啟動:

sockets = bind_sockets(8888)
tornado.process.fork_processes(0)
server = TCPServer()
server.add_sockets(sockets)
IOLoop.current().start()

  tornado的多進程處理分為以下幾個步驟:

    1 首先創建套接字,然後綁定並監聽

    2 執行fork調用,創建子進程(默認創建cpu個數的進程)。

      2.5 fork完成後,父進程與子進程就開始分工了,父進程負責管理子進程(包括當子進程異常退出時,重新fork一個子進程;關閉所有子進程),子進程則開始3、4、5步的操作

    3 啟動tcpserver

    4 為所有套接字註冊對應的事件以及處理函數   

    5 運行ioloop這個反應器

實際上也就是:

  每一個進程共享套接字(這實際上是個文件描述符),

  每一個子進程都有一個反應器,

  每一個子進程都在反應器上為相同的套接字註冊了相同的事件以及相同的處理函數。

那麽問題也就來了:

  當某個套接字上要建立連接,實際上每個子進程都能捕獲到該事件並執行對應的處理函數,但到底是哪個子進程要執行該操作呢? 當一個進程處理完了該操作,其他子進程該如何做呢?

我們帶著以上問題開始剖析:

1 首先創建套接字,然後綁定並監聽: sockets = bind_sockets(8888)

bind_sockets()方法位於tornado.netutil文件中,下面來詳細剖析一下該方法:

技術分享
def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=_DEFAULT_BACKLOG, flags=None, reuse_port=False):
    """Creates listening sockets bound to the given port and address.

        Returns a list of socket objects (multiple sockets are returned if
        the given address maps to multiple IP addresses, which is most common
        for mixed IPv4 and IPv6 use).

        Address may be either an IP address or hostname.  If it‘s a hostname,
        the server will listen on all IP addresses associated with the
        name.  Address may be an empty string or None to listen on all
        available interfaces.  Family may be set to either `socket.AF_INET`
        or `socket.AF_INET6` to restrict to IPv4 or IPv6 addresses, otherwise
        both will be used if available.

        The ``backlog`` argument has the same meaning as for
        `socket.listen() <socket.socket.listen>`.

        ``flags`` is a bitmask of AI_* flags to `~socket.getaddrinfo`, like
        ``socket.AI_PASSIVE | socket.AI_NUMERICHOST``.

        ``reuse_port`` option sets ``SO_REUSEPORT`` option for every socket
        in the list. If your platform doesn‘t support this option ValueError will
        be raised.
    """
    if reuse_port and not hasattr(socket, "SO_REUSEPORT"):
        raise ValueError("the platform doesn‘t support SO_REUSEPORT")

    sockets = []
    if address == "":
        address = None
    # address family參數指定調用者期待返回的套接口地址結構的類型。它的值包括四種:AF_UNIX,AF_INET,AF_INET6和AF_UNSPEC。
    # AF_UNIX用於同一臺機器上的進程間通信
    # 如果指定AF_INET,那麽函數就不能返回任何IPV6相關的地址信息;如果僅指定了AF_INET6,則就不能返回任何IPV4地址信息。
    # AF_UNSPEC則意味著函數返回的是適用於指定主機名和服務名且適合任何協議族的地址。
    # 如果某個主機既有AAAA記錄(IPV6)地址,同時又有A記錄(IPV4)地址,那麽AAAA記錄將作為sockaddr_in6結構返回,而A記錄則作為sockaddr_in結構返回
    if not socket.has_ipv6 and family == socket.AF_UNSPEC: # 如果系統不支持ipv6
        family = socket.AF_INET
    if flags is None:
        flags = socket.AI_PASSIVE
    bound_port = None
    for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM, 0, flags)):
        af, socktype, proto, canonname, sockaddr = res
        if (sys.platform == darwin and address == localhost and af == socket.AF_INET6 and sockaddr[3] != 0):
            # Mac OS X在“localhost”的getaddrinfo結果中包含一個鏈接本地地址fe80 :: 1%lo0。 
            # 但是,防火墻不了解這是一個本地地址,並且會提示訪問。 所以跳過這些地址。
            continue
        try:
            sock = socket.socket(af, socktype, proto)
        except socket.error as e:
            # 如果協議不支持該地址
            if errno_from_exception(e) == errno.EAFNOSUPPORT:
                continue
            raise
        # 為 fd 設置 FD_CLOEXEC 標識
        set_close_exec(sock.fileno())
        if os.name != nt: # 非windows
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        if reuse_port:
            sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
        if af == socket.AF_INET6:
            # On linux, ipv6 sockets accept ipv4 too by default,
            # but this makes it impossible to bind to both
            # 0.0.0.0 in ipv4 and :: in ipv6.  On other systems,
            # separate sockets *must* be used to listen for both ipv4
            # and ipv6.  For consistency, always disable ipv4 on our
            # ipv6 sockets and use a separate ipv4 socket when needed.
            #
            # Python 2.x on windows doesn‘t have IPPROTO_IPV6.
            if hasattr(socket, "IPPROTO_IPV6"):
                sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)

        # 自動端口分配,端口=None
        # 應該綁定在IPv4和IPv6地址上的同一個端口上
        host, requested_port = sockaddr[:2]
        if requested_port == 0 and bound_port is not None:
            sockaddr = tuple([host, bound_port] + list(sockaddr[2:]))
        # 設置socket為非阻塞
        sock.setblocking(0)    
        sock.bind(sockaddr)
        bound_port = sock.getsockname()[1]
        sock.listen(backlog)
        sockets.append(sock)
    return sockets
View Code

2 與 2.5: tornado.process.fork_processes(0)

def fork_processes(num_processes, max_restarts=100):
    # 第一個參數表示啟動多少個子進程,第二個參數表示當子進程由於某些情況結束,父進程可以重新啟動一個子進程的次數
    global _task_id
    assert _task_id is None
    # 默認生成的子進程的個數等於cpu個數
    if num_processes is None or num_processes <= 0:
        num_processes = cpu_count()
    # 不允許在未完成創建子進程之前啟動ioloop
    if ioloop.IOLoop.initialized(): 
        raise RuntimeError("Cannot run in multiple processes: IOLoop instance "
                           "has already been initialized. You cannot call "
                           "IOLoop.instance() before calling start_processes()")
    gen_log.info("Starting %d processes", num_processes)
    children = {}

    
    def start_child(i):
        pid = os.fork()
        if pid == 0: # 表示子進程
            # child process
            _reseed_random()
            global _task_id
            _task_id = i
            return i  # 子進程會直接退出該函數(也就是start_child)的執行,並返回i的值
        else:
            # 父進程則會將子進程pid與子進程對應的i值進行映射,然後返回None
            children[pid] = i
            return None

    for i in range(num_processes):
        id = start_child(i)
        # 因為子進程繼承了父進程的執行流,並且子進程不應fork子進程,而是應該由父進程進行對子進程的創建和管理等操作
        # 所以子進程的執行流需要跳出fork_processes這個函數
        if id is not None:
            return id
            
    num_restarts = 0
    # 只有父進程的執行流才能到達這裏,父進程的執行流會一直處於該循環中,直到tornado服務主動關閉
    while children:
        try:
            # 等待任何一個子進程結束,返回一個tuple,包括子進程的進程ID和退出狀態信息
            pid, status = os.wait()
        except OSError as e:
            # 當阻塞於某個慢系統調用的一個進程捕獲某個信號且相應信號處理函數返回時,該系統調用可能返回一個EINTR錯誤
            if errno_from_exception(e) == errno.EINTR: 
                continue
            raise
        if pid not in children:
            continue
        id = children.pop(pid)
        if os.WIFSIGNALED(status): # 如果進程由於信號而退出,則返回True,否則返回False
            gen_log.warning("child %d (pid %d) killed by signal %d, restarting",
                            id, pid, os.WTERMSIG(status))
        elif os.WEXITSTATUS(status) != 0: # 如果WIFEXITED(status)返回True,則返回一個整數,該整數是exit()調用的參數。否則返回值是未定義的
            gen_log.warning("child %d (pid %d) exited with status %d, restarting",
                            id, pid, os.WEXITSTATUS(status))
        else:
            gen_log.info("child %d (pid %d) exited normally", id, pid)
            continue
        num_restarts += 1
        if num_restarts > max_restarts:
            raise RuntimeError("Too many child restarts, giving up")
        # 新啟動一個子進程
        new_id = start_child(id)  
        # 保證子進程執行流離開fork_processes函數
        if new_id is not None:
            return new_id
    # 當所有的子進程都完全退出,這時候我們需要結束父進程
    # 如果我們僅僅是結束fork_processes函數的執行,那麽父進程的執行流可能會啟動ioloop
    sys.exit(0)

3 啟動tcpserver: server = TCPServer()

這一步很簡單,僅僅是一些初始化的操作

技術分享
def __init__(self, io_loop=None, ssl_options=None, max_buffer_size=None, read_chunk_size=None):
        self.io_loop = io_loop
        self.ssl_options = ssl_options
        self._sockets = {}  # fd -> socket object    用來存儲文件描述符與socket對象的映射關系
        self._pending_sockets = []
        self._started = False
        self.max_buffer_size = max_buffer_size    # 最大緩沖長度
        self.read_chunk_size = read_chunk_size    # 每次讀的chunk大小

        # 校驗ssl選項. 
        if self.ssl_options is not None and isinstance(self.ssl_options, dict):
            # Only certfile is required: it can contain both keys
            if certfile not in self.ssl_options:
                raise KeyError(missing key "certfile" in ssl_options)

            if not os.path.exists(self.ssl_options[certfile]):
                raise ValueError(certfile "%s" does not exist % self.ssl_options[certfile])
            if (keyfile in self.ssl_options and not os.path.exists(self.ssl_options[keyfile])):
                raise ValueError(keyfile "%s" does not exist % self.ssl_options[keyfile])
View Code

4 為所有套接字註冊對應的事件以及處理函數: server.add_sockets(sockets)

首先來看add_sockets()方法

def add_sockets(self, sockets):
        if self.io_loop is None:
            self.io_loop = IOLoop.current()    # 獲取IOLoop實例對象

        for sock in sockets:
            self._sockets[sock.fileno()] = sock
            add_accept_handler(sock, self._handle_connection, io_loop=self.io_loop)

其中調用了add_accept_handler()方法,並將自身的_handle_connection()方法作為參數傳入(在這裏我們不講解這個方法,因為涉及的東西有點多)。

def add_accept_handler(sock, callback, io_loop=None):
    if io_loop is None: # 獲取IOLoop實例對象
        io_loop = IOLoop.current()

    def accept_handler(fd, events):
        # 我們處理回調時可能會有許多的連接等待建立; 為了防止其他任務的饑餓,我們必須限制我們一次接受的連接數。 
        # 理想情況下,我們接受在處理回調過程中等待的連接數,但此可能會對負載產生不利影響。 
        # 相反,我們使用listen backlog作為我們可以合理接受的連接數的。
        for i in xrange(_DEFAULT_BACKLOG): # _DEFAULT_BACKLOG默認為128
            try:
                connection, address = sock.accept()
            except socket.error as e:
                # _ERRNO_WOULDBLOCK 表示我們已經接受了所有可用的連接。
                if errno_from_exception(e) in _ERRNO_WOULDBLOCK:
                    return
                # ECONNABORTED表示有一個連接,在他處於等待被服務端accept的時候主動關閉了。
                if errno_from_exception(e) == errno.ECONNABORTED:
                    continue
                raise
            callback(connection, address)
    io_loop.add_handler(sock, accept_handler, IOLoop.READ) # 為socket註冊handler:當發生READ事件時運行accept_handler函數。

5 運行ioloop這個反應器: IOLoop.current().start()

這一步在之前的文章中已經介紹了,詳細請看:這裏

這裏只簡單的講解一下start()方法:

def start(self):
        try:
            while True:    
                callbacks = self._callbacks
                self._callbacks = []
                due_timeouts = []
                # 將時間已到的定時任務放置到due_timeouts中,過程省略
                for callback in callbacks:          # 執行callback
                    self._run_callback(callback)
                for timeout in due_timeouts:        # 執行定時任務
                    if timeout.callback is not None:
                        self._run_callback(timeout.callback)       
                callbacks = callback = due_timeouts = timeout = None    # 釋放內存
                # 根據情況設置poll_timeout的值,過程省略
                if not self._running:    # 終止ioloop運行時,在執行完了callback後結束循環
                    breaktry:
                    event_pairs = self._impl.poll(poll_timeout)
                except Exception as e:
                    if errno_from_exception(e) == errno.EINTR:  # 系統調用被信號處理函數中斷,進行下一次循環
                        continue
                    else:
                        raise 
                self._events.update(event_pairs)
                while self._events: 
                    fd, events = self._events.popitem()             # 獲取一個fd以及對應事件
                    try:
                        fd_obj, handler_func = self._handlers[fd]   # 獲取該fd對應的事件處理函數
                        handler_func(fd_obj, events)                # 運行該事件處理函數
                    except (OSError, IOError) as e:         
                        if errno_from_exception(e) == errno.EPIPE:     # 當客戶端關閉連接時會產生EPIPE錯誤                         
                            pass
                        # 其他異常處理已經省略
                fd_obj = handler_func = None       # 釋放內存空間    

另外的:

  TCPServer還有幾個其他的方法,但很多調用了bind_sockets, add_accept_handler, fork_processes這幾個方法,當理解了這幾個方法後TCPServer的其他方法就顯得很簡單了。所以本文並不做討論

參考

  os模塊: http://www.cnblogs.com/now-fighting/p/3534185.html

  Linux異常: http://blog.csdn.net/a8039974/article/details/25830705

  Linux多進程: UNIX環境高級編程

  tornado多進程分析: http://www.nowamagic.net/academy/detail/13321081

    http://strawhatfy.github.io/2015/10/14/tornado.tcpserver/

    https://www.linuxzen.com/tornado-duo-jin-cheng-shi-xian-fen-xi.html

  

深入tornado中的TCPServer