Python之旅11:socket、io多路利用和SocketServer
本章內容:
- Socket
- IO多路複用(select)
- SocketServer 模組(ThreadingTCPServer原始碼剖析)
-
事件驅動
一、socket
socket通常也稱作"套接字",用於描述IP地址和埠,是一個通訊鏈的控制代碼,應用程式通常通過"套接字"向網路發出請求或者應答網路請求。
socket起源於Unix,而Unix/Linux基本哲學之一就是“一切皆檔案”,對於檔案用【開啟】【讀寫】【關閉】模式來操作。socket就是該模式的一個實現,socket即是一種特殊的檔案,一些socket函式就是對其進行的操作(讀/寫IO、開啟、關閉)
socket和file的區別:
- file模組是針對某個指定檔案進行【開啟】【讀寫】【關閉】
- socket模組是針對 伺服器端 和 客戶端Socket 進行【開啟】【讀寫】【關閉】
tcp_socket_server:
# -*- coding: utf-8 -*- import socket ip_port = ('127.0.0.1', 9999) sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0) sk.bind(ip_port) sk.listen(5) #也叫半連結池的最大數 while True: print('server waiting...') conn, addr = sk.accept() client_data = conn.recv(1024) print(client_data.decode('utf-8')) conn.sendall('不要回答,不要回答,不要回答'.encode('utf-8')) conn.close()
tcp_socket_client:
# -*- coding: utf-8 -*-
import socket
ip_port = ('127.0.0.1', 9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
sk.connect(ip_port)
sk.sendall('請求佔領地球'.encode('utf-8'))
server_reply = sk.recv(1024)
print(server_reply.decode('utf-8'))
sk.close()
更多功能
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
引數一:地址簇
socket.AF_INET IPv4(預設)
socket.AF_INET6 IPv6socket.AF_UNIX 只能夠用於單一的Unix系統程序間通訊
引數二:型別
socket.SOCK_STREAM 流式socket , for TCP (預設)
socket.SOCK_DGRAM 資料報式socket , for UDPsocket.SOCK_RAW 原始套接字,普通的套接字無法處理ICMP、IGMP等網路報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由使用者構造IP頭。
socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付資料報但不保證順序。SOCK_RAM用來提供對原始協議的低階訪問,在需要執行某些特殊操作時使用,如傳送ICMP報文。SOCK_RAM通常僅限於高階使用者或管理員執行的程式使用。
socket.SOCK_SEQPACKET 可靠的連續資料包服務引數三:協議
0 (預設)與特定的地址家族相關的協議,如果是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議
UDP Demo:
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)
while True:
data = sk.recv(1024)
print (data)
import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
inp = raw_input('資料:').strip()
if inp == 'exit':
break
sk.sendto(inp,ip_port)
sk.close()
sk.bind(address)
s.bind(address) 將套接字繫結到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。
sk.listen(backlog)
開始監聽傳入連線。backlog指定在拒絕連線之前,可以掛起的最大連線數量。
backlog等於5,表示核心已經接到了連線請求,但伺服器還沒有呼叫accept進行處理的連線個數最大為5
這個值不能無限大,因為要在核心中維護連線佇列
sk.setblocking(bool)
是否阻塞(預設True),如果設定False,那麼accept和recv時一旦無資料,則報錯。
sk.accept()
接受連線並返回(conn,address),其中conn是新的套接字物件,可以用來接收和傳送資料。address是連線客戶端的地址。
接收TCP 客戶的連線(阻塞式)等待連線的到來
sk.connect(address)
連線到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。
sk.connect_ex(address)
同上,只不過會有返回值,連線成功時返回 0 ,連線失敗時候返回編碼,例如:10061
sk.close()
關閉套接字
sk.recv(bufsize[,flag])
接受套接字的資料。資料以字串形式返回,bufsize指定最多可以接收的數量。flag提供有關訊息的其他資訊,通常可以忽略。
sk.recvfrom(bufsize[.flag])
與recv()類似,但返回值是(data,address)。其中data是包含接收資料的字串,address是傳送資料的套接字地址。
sk.send(string[,flag])
將string中的資料傳送到連線的套接字。返回值是要傳送的位元組數量,該數量可能小於string的位元組大小。即:可能未將指定內容全部發送。
sk.sendall(string[,flag])
將string中的資料傳送到連線的套接字,但在返回之前會嘗試傳送所有資料。成功返回None,失敗則丟擲異常。
內部通過遞迴呼叫send,將所有內容傳送出去。
sk.sendto(string[,flag],address)
將資料傳送到套接字,address是形式為(ipaddr,port)的元組,指定遠端地址。返回值是傳送的位元組數。該函式主要用於UDP協議。
sk.settimeout(timeout)
設定套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛建立套接字時設定,因為它們可能用於連線的操作(如 client 連線最多等待5s )
sk.getpeername()
返回連線套接字的遠端地址。返回值通常是元組(ipaddr,port)。
sk.getsockname()
返回套接字自己的地址。通常是一個元組(ipaddr,port)
sk.fileno()
套接字的檔案描述符
# 服務端
# -*- coding: utf-8 -*-
import socket
ip_port = ('127.0.0.1',9998)
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,0)
sk.bind(ip_port)
while True:
data, (host, port) = sk.recvfrom(1024)
if data.split()[0].decode('utf-8') == 'exit':
break
print(data.decode('utf-8'), host, port)
sk.sendto(bytes('你好',encoding='utf-8'),(host,port))
#客戶端
# -*- coding: utf-8 -*-
import socket
ip_port = ('127.0.0.1',9998)
sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,0)
while True:
inp = input('資料: ').strip()
if inp == 'exit':
break
sk.sendto(bytes(inp,encoding='utf-8'),ip_port)
data = sk.recvfrom(1024)
print(data)
sk.close()
設定socket
setsockopt()和getsockopt(),一個是設定選項,一個是得到設定。這裡主要使用setsockopt(),setsockopt(level,optname,value),level定義了哪個選項將被使用。通常情況下是SOL_SOCKET,意思是正在使用的socket選項。
optname引數提供使用的特殊選項。關於可用選項的設定,會因為作業系統的不同而有少許不同。如果level選定了SOL_SOCKET,那麼一些常用的選項見下表:
選項 |
意義 |
期望值 |
SO_BINDTODEVICE |
可以使socket只在某個特殊的網路介面(網絡卡)有效。也許不能是移動便攜裝置 |
一個字串給出裝置的名稱或者一個空字串返回預設值 |
SO_BROADCAST |
允許廣播地址傳送和接收資訊包。只對UDP有效。如何傳送和接收廣播資訊包 |
布林型整數 |
SO_DONTROUTE |
禁止通過路由器和閘道器往外發送資訊包。這主要是為了安全而用在乙太網上UDP通訊的一種方法。不管目的地址使用什麼IP地址,都可以防止資料離開本地網路 |
布林型整數 |
SO_KEEPALIVE |
可以使TCP通訊的資訊包保持連續性。這些資訊包可以在沒有資訊傳輸的時候,使通訊的雙方確定連線是保持的 |
布林型整數 |
SO_OOBINLINE |
可以把收到的不正常資料看成是正常的資料,也就是說會通過一個標準的對recv()的呼叫來接收這些資料 |
布林型整數 |
SO_REUSEADDR |
當socket關閉後,本地端用於該socket的埠號立刻就可以被重用。通常來說,只有經過系統定義一段時間後,才能被重用。 |
布林型整數 |
比較常用的用法是,setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 這裡value設定為1,表示將SO_REUSEADDR標記為TRUE,作業系統會在伺服器socket被關閉或伺服器程序終止後馬上釋放該伺服器的埠,否則作業系統會保留幾分鐘該埠(一定要放在bind()之前)。
# -*- coding: utf-8 -*-
import socket
ip_port = ('127.0.0.1', 9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
sk.bind(ip_port)
解決粘包的問題:
socket_server_tcp:主要命令使用了(subprocess、struct.pack)
from socket import *
import subprocess
import struct
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024
tcp_server=socket(AF_INET,SOCK_STREAM)
tcp_server.bind(ip_port)
tcp_server.listen(back_log)
while True:
conn,addr=tcp_server.accept()
print('新的客戶端連結',addr)
while True:
#收
try:
cmd=conn.recv(buffer_size)
if not cmd:break
print('收到客戶端的命令',cmd)
#執行命令,得到命令的執行結果cmd_res,通過管道把命令執行的結果重定向到三個標準裡面而不是螢幕輸出來
res=subprocess.Popen(cmd.decode('utf-8'),shell=True,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
stdin=subprocess.PIPE)
err=res.stderr.read() #命令執行錯誤
if err:
cmd_res=err
else:
cmd_res=res.stdout.read()#命令執行成功
#發
if not cmd_res: #防止當執行../時沒有返回值時會卡住
cmd_res='執行成功'.encode('gbk')
length=len(cmd_res)
data_length=struct.pack('i',length)#i代表整形,4外位元組長度,直接生成位元組型別的值
conn.send(data_length)
conn.send(cmd_res)
except Exception as e:
print(e)
break
socket_client_tcp:
from socket import *
import struct
from functools import partial
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024
tcp_client=socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)
while True:
cmd=input('>>: ').strip()
if not cmd:continue
if cmd == 'quit':break
tcp_client.send(cmd.encode('utf-8'))
#解決粘包
length_data=tcp_client.recv(4)
length=struct.unpack('i',length_data)[0]
recv_size=0
recv_msg=b''
while recv_size < length:
recv_msg += tcp_client.recv(buffer_size)
recv_size=len(recv_msg) #1024
print('命令的執行結果是 ',recv_msg.decode('gbk'))
tcp_client.close()
二、IO多路複用
I/O多路複用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。
Linux
Linux中的 select,poll,epoll 都是IO多路複用的機制。
|
Python
Python中有一個select模組,其中提供了:select、poll、epoll三個方法,分別呼叫系統的 select,poll,epoll 從而實現IO多路複用。
|
注意:網路操作、檔案操作、終端操作等均屬於IO操作,對於windows只支援Socket操作,其他系統支援其他IO操作,但是無法檢測 普通檔案操作 自動上次讀取是否已經變化。
對於select方法:
|
利用select監聽終端操作例項
# serve.py
# 用select 實現監聽多個埠,並實現讀寫分離
import select
import socket
sk = socket.socket()
sk.bind(('127.0.0.1', 8000,))
sk.listen(5)
inputs = [sk]
outputs = []
message_dict = {} # 儲存每個客戶端接受到的資訊
while True:
r_list, w_list, e_list = select.select(inputs, outputs, [], 1)
print(len(inputs))
for sk_or_conn in r_list:
if sk_or_conn == sk: # 一旦有客戶端連線,sk發生變化
conn, addr = sk.accept()
inputs.append(conn) # 將客戶端的conn新增到監聽列表
message_dict[conn] = [] # 以客戶端的conn為key,生成一個新列表來儲存接受到的資訊
# conn.sendall(bytes('hello', encoding='utf-8'))
else:
try:
ret = sk_or_conn.recv(1024) # 監聽的列表裡面如果有客戶端傳送資訊,
except Exception as ex:
inputs.remove(sk_or_conn) # 如果客戶端斷開的話,從監聽的列表裡面移除
else:
data = str(ret, encoding='utf-8')
message_dict[sk_or_conn].append(data) # 將監聽的資訊放到字典裡
outputs.append(sk_or_conn) # 放到outputs裡面
# sk_or_conn.sendall(bytes(data+'hello', encoding='utf-8'))
for conn in w_list: # 單獨實現寫的操作
message = message_dict[conn][0]
conn.sendall(bytes(message+'hello', encoding='utf-8'))
del message_dict[conn][0]
outputs.remove(conn)
#client1.py
import socket
client_socket = socket.socket()
client_socket.connect(('127.0.0.1', 8000,))
while True:
inp = input('>>>')
client_socket.sendall(bytes(inp, encoding='utf-8'))
data = str(client_socket.recv(1024), encoding='utf-8')
print(data)
client_socket.close()
#client.py2
import socket
client_socket = socket.socket()
client_socket.connect(('127.0.0.1', 8000,))
while True:
inp = input('>>>')
client_socket.sendall(bytes(inp, encoding='utf-8'))
data = str(client_socket.recv(1024), encoding='utf-8')
print(data)
client_socket.close()
事件機制的select呼叫:
伺服器端:
class TCPServer:
def __init__(self, server, server_address, inputs, outputs, message_queues):
# Create a TCP/IP
self.server = server
self.server.setblocking(False)
# Bind the socket to the port
self.server_address = server_address
print('starting up on %s port %s' % self.server_address)
self.server.bind(self.server_address)
# Listen for incoming connections
self.server.listen(5)
# Sockets from which we expect to read
self.inputs = inputs
# Sockets to which we expect to write
# 處理要傳送的訊息
self.outputs = outputs
# Outgoing message queues (socket: Queue)
self.message_queues = message_queues
def handler_recever(self, readable):
# Handle inputs
# 迴圈判斷是否有客戶端連線進來, 當有客戶端連線進來時select 將觸發
for s in readable:
# 判斷當前觸發的是不是服務端物件, 當觸發的物件是服務端物件時,說明有新客戶端連線進來了
# 表示有新使用者來連線
if s is self.server:
# A "readable" socket is ready to accept a connection
connection, client_address = s.accept()
self.client_address = client_address
print('connection from', client_address)
# this is connection not server
connection.setblocking(0)
# 將客戶端物件也加入到監聽的列表中, 當客戶端傳送訊息時 select 將觸發
self.inputs.append(connection)
# Give the connection a queue for data we want to send
# 為連線的客戶端單獨建立一個訊息佇列,用來儲存客戶端傳送的訊息
self.message_queues[connection] = queue.Queue()
else:
# 有老使用者發訊息, 處理接受
# 由於客戶端連線進來時服務端接收客戶端連線請求,將客戶端加入到了監聽列表中(input_list), 客戶端傳送訊息將觸發
# 所以判斷是否是客戶端物件觸發
data = s.recv(1024)
# 客戶端未斷開
if data != b'':
# A readable client socket has data
# 可讀的客戶機套接字具有資料
print('received "%s" from %s' % (data, s.getpeername()))
# 將收到的訊息放入到相對應的socket客戶端的訊息佇列中
self.message_queues[s].put(data)
# Add output channel for response
# 將需要進行回覆操作socket放到output 列表中, 讓select監聽
if s not in self.outputs:
self.outputs.append(s)
else:
# 客戶端斷開了連線, 將客戶端的監聽從input列表中移除
# Interpret empty result as closed connection
print('closing ', s.getpeername()) # 獲取客戶端的socket資訊
# 停止監聽連線上的輸入
# Stop listening for input on the connection
if s in self.outputs:
self.outputs.remove(s)
self.inputs.remove(s)
s.close()
# Remove message queue
# 移除對應socket客戶端物件的訊息佇列
del self.message_queues[s]
return "got it"
def handler_send(self, writable):
# Handle outputs
# 如果現在沒有客戶端請求, 也沒有客戶端傳送訊息時, 開始對傳送訊息列表進行處理, 是否需要傳送訊息
# 儲存哪個客戶端傳送過訊息
for s in writable:
try:
# 如果訊息佇列中有訊息,從訊息佇列中獲取要傳送的訊息
message_queue = self.message_queues.get(s)
send_data = ''
if message_queue is not None:
send_data = message_queue.get_nowait()
except queue.Empty:
# 客戶端連線斷開了
self.outputs.remove(s)
else:
# print "sending %s to %s " % (send_data, s.getpeername)
# print "send something"
if message_queue is not None:
s.send(send_data)
else:
print("client has closed")
# del message_queues[s]
# writable.remove(s)
# print "Client %s disconnected" % (client_address)
return "got it"
def handler_exception(self, exceptional):
# # Handle "exceptional conditions"
# 處理異常的情況
for s in exceptional:
print('exception condition on', s.getpeername())
# Stop listening for input on the connection
self.inputs.remove(s)
if s in self.outputs:
self.outputs.remove(s)
s.close()
# Remove message queue
del self.message_queues[s]
return "got it"
def event_loop(tcpserver, inputs, outputs):
while inputs:
# Wait for at least one of the sockets to be ready for processing 等待至少一個套接字準備好進行處理
print('waiting for the next event')
# 開始select 監聽, 對input_list 中的伺服器端server 進行監聽
# 當socket呼叫send, recv等函式時, 就會再次呼叫此函式, 這時返回的第二個引數就會有值
readable, writable, exceptional = select.select(inputs, outputs, inputs)
readable, writable, exceptional = select.select(inputs, outputs, inputs)
if readable is not None:
tcp_recever = tcpserver.handler_recever(readable)
if tcp_recever == 'got it':
print("server have received")
if writable is not None:
tcp_send = tcpserver.handler_send(writable)
if tcp_send == 'got it':
print("server have send")
if exceptional is not None:
tcp_exception = tcpserver.handler_exception(exceptional)
if tcp_exception == 'got it':
print("server have exception")
sleep(0.8)
if __name__ == '__main__':
server_address = ('localhost', 8090)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
inputs = [server]
outputs = []
message_queues = {}
tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
# 開啟事件迴圈
event_loop(tcpserver, inputs, outputs)
客戶端:
import socket
messages = ['This is the message ', 'It will be sent ', 'in parts ', ]
server_address = ('localhost', 8090)
# Create aTCP/IP socket
socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET, socket.SOCK_STREAM), ]
# Connect thesocket to the port where the server is listening
print('connecting to %s port %s' % server_address)
# 連線到伺服器
for s in socks:
s.connect(server_address)
for index, message in enumerate(messages):
# Send messages on both sockets
for s in socks:
print('%s: sending "%s"' % (s.getsockname(), message + str(index)))
send_data = message + str(index)
s.sendall(bytes(send_data, encoding='utf-8'))
# Read responses on both sockets
for s in socks:
data = s.recv(1024)
print('%s: received "%s"' % (s.getsockname(), data))
if data != "":
print('closingsocket', s.getsockname())
s.close()
三、SocketServer模組
SocketServer內部使用 IO多路複用 以及 “多執行緒” 和 “多程序” ,從而實現併發處理多個客戶端請求的Socket服務端。即:每個客戶端請求連線到伺服器時,Socket服務端都會在伺服器是建立一個“執行緒”或者“程序” 專門負責處理當前客戶端的所有請求。
繼承關係(全都要通過例項化s,也就是繼承的順序開始查詢對應的方法):
ThreadingTCPServer
ThreadingTCPServer實現的Soket伺服器內部會為每個client建立一個 “執行緒”,該執行緒用來和客戶端進行互動。
1、ThreadingTCPServer基礎
使用ThreadingTCPServer:
- 建立一個繼承自 SocketServer.BaseRequestHandler 的類
- 類中必須定義一個名稱為 handle 的方法
- 啟動ThreadingTCPServer
服務端socketserver:
import socketserver
'''
def __init__(self, request, client_address, server):
self.request = request
self.client_address = client_address
self.server = server
self.setup()
try:
self.handle()
finally:
self.finish()
'''
class MyServer(socketserver.BaseRequestHandler):
def handle(self): #最後一步看基類的init方法,裡面呼叫handle
print('conn is: ',self.request) #conn
print('addr is: ',self.client_address) #addr
while True:
try:
#收訊息
data=self.request.recv(1024)
if not data:break
print('收到客戶端的訊息是',data,self.client_address)
#發訊息
self.request.sendall(data.upper())
except Exception as e:
print(e)
break
if __name__ == '__main__':
s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) #多執行緒
# s=socketserver.ForkingTCPServer(('127.0.0.1',8080),MyServer) #多程序
# self.server_address = server_address
# self.RequestHandlerClass = RequestHandlerClass
print(s.server_address)
print(s.RequestHandlerClass)
print(MyServer)
print(s.socket)
print(s.server_address)
s.serve_forever() #主要看這個self._handle_request_noblock(),然後根據繼承關係找
客戶端socketclient版(可以建立多個客戶端來連線):
from socket import *
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024
tcp_client=socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)
while True:
msg=input('>>: ').strip()
if not msg:continue
if msg == 'quit':break
tcp_client.send(msg.encode('utf-8'))
data=tcp_client.recv(buffer_size)
print('收到服務端發來的訊息:',data.decode('utf-8'))
tcp_client.close()
例項:
伺服器端:
# -*- coding: utf-8 -*-
import socketserver
class MyServer(socketserver.BaseRequestHandler):
def handle(self):
print(self.request,self.client_address,self.server)
conn = self.request
conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.'.encode('utf-8'))
Flag = True
while Flag:
data = conn.recv(1024).decode('utf-8')
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall('通過可能會被錄音.balabala一大推'.encode('utf-8'))
else:
conn.sendall('請重新輸入.'.encode('utf-8'))
if __name__ == '__main__':
server = socketserver.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
server.serve_forever()
客戶端:
# -*- coding: utf-8 -*-
import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024)
print ('receive:',data.decode('utf-8'))
inp = input('please input:')
sk.sendall(inp.encode('utf-8'))
if inp == 'exit':
break
sk.close()
SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在伺服器端為每一個客戶端建立一個執行緒,當前執行緒用來處理對應客戶端的請求,所以,可以支援同時n個客戶端連結(長連線)。
ForkingTCPServer
ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別為請求者建立 “執行緒” 和 “程序”。
基本使用:
伺服器端:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer
class MyServer(SocketServer.BaseRequestHandler):
def handle(self):
# print self.request,self.client_address,self.server
conn = self.request
conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
Flag = True
while Flag:
data = conn.recv(1024)
if data == 'exit':
Flag = False
elif data == '0':
conn.sendall('通過可能會被錄音.balabala一大推')
else:
conn.sendall('請重新輸入.')
if __name__ == '__main__':
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
server.serve_forever()
客戶端:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import socket
ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)
while True:
data = sk.recv(1024)
print 'receive:',data
inp = raw_input('please input:')
sk.sendall(inp)
if inp == 'exit':
break
sk.close()
以上ForkingTCPServer只是將 ThreadingTCPServer 例項中的程式碼:
erver = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
變更為:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)
SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 select 和 os.fork 兩個東西,其實本質上就是在伺服器端為每一個客戶端建立一個程序,當前新建立的程序用來處理對應客戶端的請求,所以,可以支援同時n個客戶端連結(長連線)。
原始碼剖析參考 ThreadingTCPServer
四、事件驅動
簡而言之,事件驅動分為二個部分:第一,註冊事件;第二,觸發事件。
自定義事件驅動框架,命名為:“弒君者”:
事件驅動框架:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# event_drive.py
event_list = []
def run():
for event in event_list:
obj = event()
obj.execute()
class BaseHandler(object):
"""
使用者必須繼承該類,從而規範所有類的方法(類似於介面的功能)
"""
def execute(self):
raise Exception('you must overwrite execute')
程式設計師使用“弒君者框架”:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from source import event_drive
class MyHandler(event_drive.BaseHandler):
def execute(self):
print 'event-drive execute MyHandler'
event_drive.event_list.append(MyHandler)
event_drive.run()
如上述程式碼,事件驅動只不過是框架規定了執行順序,程式設計師在使用框架時,可以向原執行順序中註冊“事件”,從而在框架執行時可以出發已註冊的“事件”。
基於事件驅動Socket
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import protocol
from twisted.internet import reactor
class Echo(protocol.Protocol):
def dataReceived(self, data):
self.transport.write(data)
def main():
factory = protocol.ServerFactory()
factory.protocol = Echo
reactor.listenTCP(8000,factory)
reactor.run()
if __name__ == '__main__':
main()
程式執行流程:
- 執行服務端程式
- 建立Protocol的派生類Echo
- 建立ServerFactory物件,並將Echo類封裝到其protocol欄位中
- 執行reactor的 listenTCP 方法,內部使用 tcp.Port 建立socket server物件,並將該物件新增到了 reactor的set型別的欄位 _read 中
- 執行reactor的 run 方法,內部執行 while 迴圈,並通過 select 來監視 _read 中檔案描述符是否有變化,迴圈中...
- 客戶端請求到達
- 執行reactor的 _doReadOrWrite 方法,其內部通過反射呼叫 tcp.Port 類的 doRead 方法,內部 accept 客戶端連線並建立Server物件例項(用於封裝客戶端socket資訊)和 建立 Echo 物件例項(用於處理請求) ,然後呼叫 Echo 物件例項的 makeConnection 方法,建立連線。
- 執行 tcp.Server 類的 doRead 方法,讀取資料,
- 執行 tcp.Server 類的 _dataReceived 方法,如果讀取資料內容為空(關閉連結),否則,出發 Echo 的 dataReceived 方法
- 執行 Echo 的 dataReceived 方法
從原始碼可以看出,上述例項本質上使用了事件驅動的方法 和 IO多路複用的機制來進行Socket的處理。
非同步IO操作:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from twisted.internet import reactor, protocol
from twisted.web.client import getPage
from twisted.internet import reactor
import time
class Echo(protocol.Protocol):
def dataReceived(self, data):
deferred1 = getPage('http://cnblogs.com')
deferred1.addCallback(self.printContents)
deferred2 = getPage('http://baidu.com')
deferred2.addCallback(self.printContents)
for i in range(2):
time.sleep(1)
print 'execute ',i
def execute(self,data):
self.transport.write(data)
def printContents(self,content):
print len(content),content[0:100],time.time()
def main():
factory = protocol.ServerFactory()
factory.protocol = Echo
reactor.listenTCP(8000,factory)
reactor.run()
if __name__ == '__main__':
main()