論事件驅動與多路IO復用
阿新 • • 發佈:2017-12-02
監聽 sea [] env tar 才有 可能 exception tput
通常,我們寫服務器處理模型的程序時,有以下幾種模型:
- (1)每收到一個請求,創建一個新的進程,來處理該請求;
- (2)每收到一個請求,創建一個新的線程,來處理該請求;
- (3)每收到一個請求,放入一個事件列表,讓主進程通過非阻塞I/O方式來處理請求
上面的幾種方式,各有千秋,
第(1)中方法,由於創建新的進程的開銷比較大,所以,會導致服務器性能比較差,但實現比較簡單。
第(2)種方式,由於要涉及到線程的同步,有可能會面臨死鎖等問題。
第(3)種方式,在寫應用程序代碼時,邏輯比前面兩種都復雜。
綜合考慮各方面因素,一般普遍認為第(3)種方式是大多數網絡服務器采用的方式
事件驅動編程是一種編程範式,這裏程序的執行流由外部事件來決定。它的特點是包含一個事件循環,當外部事件發生時使用回調機制來觸發相應的處理。另外兩種常見的編程範式是(單線程)同步以及多線程編程。
更多姿勢
http://www.cnblogs.com/alex3714/articles/5248247.html
I/O操作由操作系統完成,遇到IO操作,主進程交給操作系統處理IO,處理完成通過回調函數告訴主進程。
IO多路復用
番外篇
http://www.cnblogs.com/alex3714/articles/5876749.html
select、poll、epoll、本質為IO多路復用。
select 版本
簡單版:
import select import socket import queue server = socket.socket() server.bind(("localhost", 9990)) server.listen(1000) server.setblocking(False) # 不阻塞 msg_dic = {} inputs = [server,] outputs = [] while True: readable, writeable, exceptional = select.select(inputs, outputs, inputs) print(readable, writeable, exceptional) for r in readable: if r is server: conn, addr = server.accept() print("來了個新連接", addr) inputs.append(conn) else: try: data = r.recv(1024) print("收到的數據", data) conn.send(data) except ConnectionResetError: print("[%s]客戶端斷開了" % r) inputs.remove(r)
高級裝逼版
import select import socket import queue server = socket.socket() server.setblocking(0) server_addr = (‘localhost‘, 10000) print(‘starting up on %s port %s‘ % server_addr) server.bind(server_addr) server.listen(5) inputs = [server, ] # 自己也要監測呀,因為server本身也是個fd outputs = [] message_queues = {} while True: print("waiting for next event...") readable, writeable, exeptional = select.select(inputs, outputs, inputs) # 如果沒有任何fd就緒,那程序就會一直阻塞在這裏 for s in readable: # 每個s就是一個socket if s is server: # 別忘記,上面我們server自己也當做一個fd放在了inputs列表裏,傳給了select,如果這個s是server,代表server這個fd就緒了, # 就是有活動了, 什麽情況下它才有活動? 當然 是有新連接進來的時候 呀 # 新連接進來了,接受這個連接 conn, client_addr = s.accept() print("new connection from", client_addr) conn.setblocking(0) inputs.append(conn) # 為了不阻塞整個程序,我們不會立刻在這裏開始接收客戶端發來的數據, 把它放到inputs裏, 下一次loop時,這個新連接 # 就會被交給select去監聽,如果這個連接的客戶端發來了數據 ,那這個連接的fd在server端就會變成就續的,select就會把這個連接返回,返回到 # readable 列表裏,然後你就可以loop readable列表,取出這個連接,開始接收數據了, 下面就是這麽幹 的 message_queues[conn] = queue.Queue() # 接收到客戶端的數據後,不立刻返回 ,暫存在隊列裏,以後發送 else: # s不是server的話,那就只能是一個 與客戶端建立的連接的fd了 # 客戶端的數據過來了,在這接收 data = s.recv(1024) if data: print("收到來自[%s]的數據:" % s.getpeername()[0], data) message_queues[s].put(data) # 收到的數據先放到queue裏,一會返回給客戶端 if s not in outputs: outputs.append(s) # 為了不影響處理與其它客戶端的連接 , 這裏不立刻返回數據給客戶端 else: # 如果收不到data代表什麽呢? 代表客戶端斷開了呀 print("客戶端斷開了",s) if s in outputs: outputs.remove(s) # 清理已斷開的連接 inputs.remove(s) # 清理已斷開的連接 del message_queues[s] # 清理已斷開的連接 for s in writeable: try: next_msg = message_queues[s].get_nowait() except queue.Empty: print("client [%s]" %s.getpeername()[0], "queue is empty..") outputs.remove(s) else: print("sending msg to [%s]" % s.getpeername()[0], next_msg) s.send(next_msg.upper()) for s in exeptional: print("handling exception for ", s.getpeername()) inputs.remove(s) if s in outputs: outputs.remove(s) s.close() del message_queues[s]
secoketor模塊
import selectors
import socket
sel = selectors.DefaultSelector()
def accept(sock, mask):
conn, addr = sock.accept() # Should be ready
print("accepted", conn, "from", addr, mask)
conn.setblocking(False)
sel.register(conn, selectors.EVENT_READ, read) # 新連接註冊read回調函數
def read(conn, mask):
data = conn.recv(1024)
if data:
print("echoing", repr(data), "to", conn)
conn.send(data)
else:
print("closing", conn)
sel.unregister(conn)
conn.close()
sock = socket.socket()
sock.bind(("localhost", 10000))
sock.listen(100)
sock.setblocking(False)
sel.register(sock, selectors.EVENT_READ, accept) # 來新連接調accept
while True:
events = sel.select() # 默認阻塞,有活動連接就返回活動的連接列表 epoll/select
for key, mask in events:
callback = key.data # accept
callback(key.fileobj, mask) # key.fileobj= 文件句柄
客戶端
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# __author__:JasonLIN
import socket
import sys
# send_buf_size = 8192 # 設置發送緩沖域大小
# get_buf_size = 8192 # 設置接收緩沖域大小
messages = [b‘This is the message. ‘,
b‘It will be sent ‘,
b‘in parts.‘,
]
server_address = (‘192.168.31.102‘, 10000)
# Create a TCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(10000)]
# Connect the socket to the port where the server is listening
print(‘connecting to %s port %s‘ % server_address)
for s in socks:
# s.setsockopt(socket.SOL_TCP, socket.TCP_NODELAY, 1)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, send_buf_size)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, get_buf_size)
# s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.connect(server_address)
for message in messages:
# Send messages on both sockets
for s in socks:
print(‘%s: sending "%s"‘ % (s.getsockname(), message))
s.send(message)
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print(‘%s: received "%s"‘ % (s.getsockname(), data))
if not data:
print(sys.stderr, ‘closing socket‘, s.getsockname())
論事件驅動與多路IO復用