1. 程式人生 > >Python epoll程式設計:實現一個ioloop

Python epoll程式設計:實現一個ioloop

簡介

python標註庫select模組提供了IO多路複用支援,包括select,poll,epoll。當處理大量空閒連結時,epoll能顯著提升介面效能。

關於IO多路複用技術,可以參考IO多路複用技術
epoll廣泛應用於高併發架構中,tornado、eventlet等都支援epoll。

ioloop

ioloop作為tornado四個主要組成部分之一,提供了一個非同步網路庫。在看過一些tornado原始碼後發現要實現一個ioloop的核心並不複雜。

#!/usr/bin/env python
# encoding: utf-8

import select


class
IOLoop(object):
ERROR = select.EPOLLERR | select.EPOLLHUP READ = select.EPOLLIN WRITE = select.EPOLLOUT def __init__(self): self.impl = select.epoll() self._events = {} self._handlers = {} def split_fd(self, fd): try: return fd.fileno(), fd except
AttributeError: return fd, fd def add_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, handler) self.impl.register(fd, events | self.ERROR) def update_handler(self, fd, handler, events): fd, obj = self.split_fd(fd) self._handlers[fd] = (obj, handler) self.impl.modify(fd, events | self.ERROR) def
remove_handler(self, fd):
fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self.impl.unregister(fd) except: pass def start(self): while True: try: event_pairs = self.impl.poll(2) except Exception as e: continue self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: fd_obj, handler_func = self._handlers[fd] handler_func(fd_obj, events) except Exception as e: print e

IOLoop提供了三個方法add_handler,update_handler, remove_handler,分別對應epoll註冊、更新、移除操作。start提供一個主迴圈,查詢epoll觸發條件,執行對應的handler。不到60行程式碼就基本實現了一個簡單的IOLoop。

應用舉例

#!/usr/bin/env python
# encoding: utf-8

import functools
import socket

from ioloop import IOLoop

io_loop = IOLoop()


def connection_ready(sock, fd, events):
    print "call connect ready"
    while True:
        try:
            connection, addr = sock.accept()
        except Exception:
            raise
        connection.setblocking(0)
        handler_connection(connection, addr)


def make_sock():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.setblocking(0)
    sock.bind(("", 8080))
    sock.listen(128)
    return sock


def handler_connection(conn, addr):
    callback = functools.partial(handler_read, conn)
    io_loop.add_handler(conn.fileno(), callback, io_loop.READ)


def handler_read(conn, fd, event):
    msg = conn.recv(1024)
    print "get msg %s" % msg
    callback = functools.partial(handler_write, conn)
    io_loop.update_handler(conn.fileno(), callback, io_loop.WRITE)


def handler_write(conn, fd, event):
    response = """HTTP/1.0 200 OK\r
Date: Mon, 1 Jan 1996 01:01:01 GMT\r
Content-Type: text/plain\r
Content-Length: 13\r
\r
Hello, world!"""

    conn.send(response)
    conn.close()
    io_loop.remove_handler(conn.fileno())


if __name__ == "__main__":
    sock = make_sock()
    callback = functools.partial(connection_ready, sock)
    io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
    io_loop.start()

測試

ab -c 1000 -n 10000結果:

Document Path:          /
Document Length:        13 bytes

Concurrency Level:      1000
Time taken for tests:   1.096 seconds
Complete requests:      10000
Failed requests:        0
Write errors:           0
Total transferred:      1140000 bytes
HTML transferred:       130000 bytes
Requests per second:    9125.30 [#/sec] (mean)
Time per request:       109.585 [ms] (mean)
Time per request:       0.110 [ms] (mean, across all concurrent requests)
Transfer rate:          1015.90 [Kbytes/sec] received

Connection Times (ms)
              min  mean[+/-sd] median   max
Connect:        0    3  26.9      1    1001
Processing:     6   18  15.1     13     361
Waiting:        1   17  15.1     13     361
Total:          7   21  32.0     14    1017

參考