1. 程式人生 > >Python之旅11:socket、io多路利用和SocketServer

Python之旅11:socket、io多路利用和SocketServer

 

本章內容:

  • Socket
  • IO多路複用(select)
  • SocketServer 模組(ThreadingTCPServer原始碼剖析)
  • 事件驅動

一、socket

socket通常也稱作"套接字",用於描述IP地址和埠,是一個通訊鏈的控制代碼,應用程式通常通過"套接字"向網路發出請求或者應答網路請求。

socket起源於Unix,而Unix/Linux基本哲學之一就是“一切皆檔案”,對於檔案用【開啟】【讀寫】【關閉】模式來操作。socket就是該模式的一個實現,socket即是一種特殊的檔案,一些socket函式就是對其進行的操作(讀/寫IO、開啟、關閉)

socket和file的區別:

  • file模組是針對某個指定檔案進行【開啟】【讀寫】【關閉】
  • socket模組是針對 伺服器端 和 客戶端Socket 進行【開啟】【讀寫】【關閉】

tcp_socket_server:

# -*- coding: utf-8 -*-

import socket

ip_port = ('127.0.0.1', 9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
sk.bind(ip_port)
sk.listen(5) #也叫半連結池的最大數

while True:
    print('server waiting...')
    conn, addr = sk.accept()

    client_data = conn.recv(1024)
    print(client_data.decode('utf-8'))
    conn.sendall('不要回答,不要回答,不要回答'.encode('utf-8'))

    conn.close()

tcp_socket_client:

# -*- coding: utf-8 -*-

import socket

ip_port = ('127.0.0.1', 9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)
sk.connect(ip_port)

sk.sendall('請求佔領地球'.encode('utf-8'))

server_reply = sk.recv(1024)

print(server_reply.decode('utf-8'))

sk.close()

 

更多功能

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)

引數一:地址簇

  socket.AF_INET IPv4(預設)
  socket.AF_INET6 IPv6

  socket.AF_UNIX 只能夠用於單一的Unix系統程序間通訊

引數二:型別

  socket.SOCK_STREAM  流式socket , for TCP (預設)
  socket.SOCK_DGRAM   資料報式socket , for UDP

  socket.SOCK_RAW 原始套接字,普通的套接字無法處理ICMP、IGMP等網路報文,而SOCK_RAW可以;其次,SOCK_RAW也可以處理特殊的IPv4報文;此外,利用原始套接字,可以通過IP_HDRINCL套接字選項由使用者構造IP頭。
  socket.SOCK_RDM 是一種可靠的UDP形式,即保證交付資料報但不保證順序。SOCK_RAM用來提供對原始協議的低階訪問,在需要執行某些特殊操作時使用,如傳送ICMP報文。SOCK_RAM通常僅限於高階使用者或管理員執行的程式使用。
  socket.SOCK_SEQPACKET 可靠的連續資料包服務

引數三:協議

  0  (預設)與特定的地址家族相關的協議,如果是 0 ,則系統就會根據地址格式和套接類別,自動選擇一個合適的協議

UDP Demo:

import socket
ip_port = ('127.0.0.1',9999)
sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data = sk.recv(1024)
    print (data)




import socket
ip_port = ('127.0.0.1',9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_DGRAM,0)
while True:
    inp = raw_input('資料:').strip()
    if inp == 'exit':
        break
    sk.sendto(inp,ip_port)

sk.close()

sk.bind(address)

  s.bind(address) 將套接字繫結到地址。address地址的格式取決於地址族。在AF_INET下,以元組(host,port)的形式表示地址。

sk.listen(backlog)

  開始監聽傳入連線。backlog指定在拒絕連線之前,可以掛起的最大連線數量。

      backlog等於5,表示核心已經接到了連線請求,但伺服器還沒有呼叫accept進行處理的連線個數最大為5
      這個值不能無限大,因為要在核心中維護連線佇列

sk.setblocking(bool)

  是否阻塞(預設True),如果設定False,那麼accept和recv時一旦無資料,則報錯。

sk.accept()

  接受連線並返回(conn,address),其中conn是新的套接字物件,可以用來接收和傳送資料。address是連線客戶端的地址。

  接收TCP 客戶的連線(阻塞式)等待連線的到來

sk.connect(address)

  連線到address處的套接字。一般,address的格式為元組(hostname,port),如果連接出錯,返回socket.error錯誤。

sk.connect_ex(address)

  同上,只不過會有返回值,連線成功時返回 0 ,連線失敗時候返回編碼,例如:10061

sk.close()

  關閉套接字

sk.recv(bufsize[,flag])

  接受套接字的資料。資料以字串形式返回,bufsize指定最多可以接收的數量。flag提供有關訊息的其他資訊,通常可以忽略。

sk.recvfrom(bufsize[.flag])

  與recv()類似,但返回值是(data,address)。其中data是包含接收資料的字串,address是傳送資料的套接字地址。

sk.send(string[,flag])

  將string中的資料傳送到連線的套接字。返回值是要傳送的位元組數量,該數量可能小於string的位元組大小。即:可能未將指定內容全部發送。

sk.sendall(string[,flag])

  將string中的資料傳送到連線的套接字,但在返回之前會嘗試傳送所有資料。成功返回None,失敗則丟擲異常。

      內部通過遞迴呼叫send,將所有內容傳送出去。

sk.sendto(string[,flag],address)

  將資料傳送到套接字,address是形式為(ipaddr,port)的元組,指定遠端地址。返回值是傳送的位元組數。該函式主要用於UDP協議。

sk.settimeout(timeout)

  設定套接字操作的超時期,timeout是一個浮點數,單位是秒。值為None表示沒有超時期。一般,超時期應該在剛建立套接字時設定,因為它們可能用於連線的操作(如 client 連線最多等待5s )

sk.getpeername()

  返回連線套接字的遠端地址。返回值通常是元組(ipaddr,port)。

sk.getsockname()

  返回套接字自己的地址。通常是一個元組(ipaddr,port)

sk.fileno()

  套接字的檔案描述符

# 服務端
# -*- coding: utf-8 -*-

import socket

ip_port = ('127.0.0.1',9998)

sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,0)
sk.bind(ip_port)

while True:
    data, (host, port) = sk.recvfrom(1024)
    if data.split()[0].decode('utf-8') == 'exit':
        break
    print(data.decode('utf-8'), host, port)
    sk.sendto(bytes('你好',encoding='utf-8'),(host,port))



#客戶端
# -*- coding: utf-8 -*-

import socket

ip_port = ('127.0.0.1',9998)

sk = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,0)
while True:
    inp = input('資料: ').strip()
    if inp == 'exit':
        break
    sk.sendto(bytes(inp,encoding='utf-8'),ip_port)
    data = sk.recvfrom(1024)
    print(data)

sk.close()

設定socket 

  setsockopt()和getsockopt(),一個是設定選項,一個是得到設定。這裡主要使用setsockopt(),setsockopt(level,optname,value),level定義了哪個選項將被使用。通常情況下是SOL_SOCKET,意思是正在使用的socket選項。

optname引數提供使用的特殊選項。關於可用選項的設定,會因為作業系統的不同而有少許不同。如果level選定了SOL_SOCKET,那麼一些常用的選項見下表:

選項

意義

期望值

SO_BINDTODEVICE

可以使socket只在某個特殊的網路介面(網絡卡)有效。也許不能是移動便攜裝置

一個字串給出裝置的名稱或者一個空字串返回預設值

SO_BROADCAST

允許廣播地址傳送和接收資訊包。只對UDP有效。如何傳送和接收廣播資訊包

布林型整數

SO_DONTROUTE

禁止通過路由器和閘道器往外發送資訊包。這主要是為了安全而用在乙太網上UDP通訊的一種方法。不管目的地址使用什麼IP地址,都可以防止資料離開本地網路

布林型整數

SO_KEEPALIVE

可以使TCP通訊的資訊包保持連續性。這些資訊包可以在沒有資訊傳輸的時候,使通訊的雙方確定連線是保持的

布林型整數

SO_OOBINLINE

可以把收到的不正常資料看成是正常的資料,也就是說會通過一個標準的對recv()的呼叫來接收這些資料

布林型整數

SO_REUSEADDR

當socket關閉後,本地端用於該socket的埠號立刻就可以被重用。通常來說,只有經過系統定義一段時間後,才能被重用。

布林型整數

 

比較常用的用法是,setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 這裡value設定為1,表示將SO_REUSEADDR標記為TRUE,作業系統會在伺服器socket被關閉或伺服器程序終止後馬上釋放該伺服器的埠,否則作業系統會保留幾分鐘該埠(一定要放在bind()之前)。

# -*- coding: utf-8 -*-

import socket

ip_port = ('127.0.0.1', 9999)

sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM,0)

setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) 

sk.bind(ip_port)

解決粘包的問題:

socket_server_tcp:主要命令使用了(subprocess、struct.pack

from socket import *
import subprocess
import struct
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024

tcp_server=socket(AF_INET,SOCK_STREAM)
tcp_server.bind(ip_port)
tcp_server.listen(back_log)

while True:
    conn,addr=tcp_server.accept()
    print('新的客戶端連結',addr)
    while True:
        #收
        try:
            cmd=conn.recv(buffer_size)
            if not cmd:break
            print('收到客戶端的命令',cmd)

            #執行命令,得到命令的執行結果cmd_res,通過管道把命令執行的結果重定向到三個標準裡面而不是螢幕輸出來
            res=subprocess.Popen(cmd.decode('utf-8'),shell=True,
                                 stderr=subprocess.PIPE,
                                 stdout=subprocess.PIPE,
                                 stdin=subprocess.PIPE)
            err=res.stderr.read() #命令執行錯誤
            if err:
                cmd_res=err
            else:
                cmd_res=res.stdout.read()#命令執行成功

            #發
            if not cmd_res: #防止當執行../時沒有返回值時會卡住
                cmd_res='執行成功'.encode('gbk')

            length=len(cmd_res)

            data_length=struct.pack('i',length)#i代表整形,4外位元組長度,直接生成位元組型別的值
            conn.send(data_length)
            conn.send(cmd_res)
        except Exception as e:
            print(e)
            break

socket_client_tcp:

from socket import *
import struct
from functools import partial
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024

tcp_client=socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    cmd=input('>>: ').strip()
    if not cmd:continue
    if cmd == 'quit':break

    tcp_client.send(cmd.encode('utf-8'))


    #解決粘包
    length_data=tcp_client.recv(4)
    length=struct.unpack('i',length_data)[0]

    recv_size=0
    recv_msg=b''
    while recv_size < length:
        recv_msg += tcp_client.recv(buffer_size)
        recv_size=len(recv_msg) #1024


    print('命令的執行結果是 ',recv_msg.decode('gbk'))
tcp_client.close()

 

二、IO多路複用

I/O多路複用指:通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。

Linux

Linux中的 select,poll,epoll 都是IO多路複用的機制。

select

 

select最早於1983年出現在4.2BSD中,它通過一個select()系統呼叫來監視多個檔案描述符的陣列,當select()返回後,該陣列中就緒的檔案描述符便會被核心修改標誌位,使得程序可以獲得這些檔案描述符從而進行後續的讀寫操作。

select目前幾乎在所有的平臺上支援,其良好跨平臺支援也是它的一個優點,事實上從現在看來,這也是它所剩不多的優點之一。

select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024,不過可以通過修改巨集定義甚至重新編譯核心的方式提升這一限制。

另外,select()所維護的儲存大量檔案描述符的資料結構,隨著檔案描述符數量的增大,其複製的開銷也線性增長。同時,由於網路響應時間的延遲使得大量TCP連線處於非活躍狀態,但呼叫select()會對所有socket進行一次線性掃描,所以這也浪費了一定的開銷。

 

poll

 

poll在1986年誕生於System V Release 3,它和select在本質上沒有多大差別,但是poll沒有最大檔案描述符數量的限制。

poll和select同樣存在一個缺點就是,包含大量檔案描述符的陣列被整體複製於使用者態和核心的地址空間之間,而不論這些檔案描述符是否就緒,它的開銷隨著檔案描述符數量的增加而線性增大。

另外,select()和poll()將就緒的檔案描述符告訴程序後,如果程序沒有對其進行IO操作,那麼下次呼叫select()和poll()的時候將再次報告這些檔案描述符,所以它們一般不會丟失就緒的訊息,這種方式稱為水平觸發(Level Triggered)。

 

epoll

 

直到Linux2.6才出現了由核心直接支援的實現方法,那就是epoll,它幾乎具備了之前所說的一切優點,被公認為Linux2.6下效能最好的多路I/O就緒通知方法。

epoll可以同時支援水平觸發和邊緣觸發(Edge Triggered,只告訴程序哪些檔案描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麼它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的效能要更高一些,但是程式碼實現相當複雜。

epoll同樣只告知那些就緒的檔案描述符,而且當我們呼叫epoll_wait()獲得就緒檔案描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的檔案描述符即可,這裡也使用了記憶體對映(mmap)技術,這樣便徹底省掉了這些檔案描述符在系統呼叫時複製的開銷。

另一個本質的改進在於epoll採用基於事件的就緒通知方式。在select/poll中,程序只有在呼叫一定的方法後,核心才對所有監視的檔案描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個檔案描述符,一旦基於某個檔案描述符就緒時,核心會採用類似callback的回撥機制,迅速啟用這個檔案描述符,當程序呼叫epoll_wait()時便得到通知。

Python

Python中有一個select模組,其中提供了:select、poll、epoll三個方法,分別呼叫系統的 select,poll,epoll 從而實現IO多路複用。

Windows Python:

    提供: select

Mac Python:

    提供: select

Linux Python:

    提供: select、poll、epoll

注意:網路操作、檔案操作、終端操作等均屬於IO操作,對於windows只支援Socket操作,其他系統支援其他IO操作,但是無法檢測 普通檔案操作 自動上次讀取是否已經變化。

對於select方法:

控制代碼列表11, 控制代碼列表22, 控制代碼列表33 = select.select(控制代碼序列1, 控制代碼序列2, 控制代碼序列3, 超時時間)

 

引數: 可接受四個引數(前三個必須)

返回值:三個列表

 

select方法用來監視檔案控制代碼,如果控制代碼發生變化,則獲取該控制代碼。

1、當 引數1 序列中的控制代碼發生可讀時(accetp和read),則獲取發生變化的控制代碼並新增到 返回值1 序列中

2、當 引數2 序列中含有控制代碼時,則將該序列中所有的控制代碼新增到 返回值2 序列中

3、當 引數3 序列中的控制代碼發生錯誤時,則將該發生錯誤的控制代碼新增到 返回值3 序列中

4、當 超時時間 未設定,則select會一直阻塞,直到監聽的控制代碼發生變化

   當 超時時間 = 1時,那麼如果監聽的控制代碼均無任何變化,則select會阻塞 1 秒,之後返回三個空列表,如果監聽的控制代碼有變化,則直接執行。

利用select監聽終端操作例項

# serve.py
# 用select 實現監聽多個埠,並實現讀寫分離
 
 
import  select
import socket
 
sk = socket.socket()
sk.bind(('127.0.0.1', 8000,))
sk.listen(5)
 
inputs = [sk]
outputs = []
message_dict = {}    # 儲存每個客戶端接受到的資訊
 
while True:
    r_list, w_list, e_list = select.select(inputs, outputs, [], 1)
    print(len(inputs))
    for sk_or_conn in r_list:
        if sk_or_conn == sk:          # 一旦有客戶端連線,sk發生變化
            conn, addr = sk.accept()
            inputs.append(conn)         # 將客戶端的conn新增到監聽列表
            message_dict[conn] = []      # 以客戶端的conn為key,生成一個新列表來儲存接受到的資訊
            # conn.sendall(bytes('hello', encoding='utf-8'))
        else:
            try:
                ret = sk_or_conn.recv(1024) # 監聽的列表裡面如果有客戶端傳送資訊,
 
            except Exception as ex:
                inputs.remove(sk_or_conn)   # 如果客戶端斷開的話,從監聽的列表裡面移除
            else:
                data = str(ret, encoding='utf-8')
                message_dict[sk_or_conn].append(data)  # 將監聽的資訊放到字典裡
                outputs.append(sk_or_conn)         # 放到outputs裡面
                # sk_or_conn.sendall(bytes(data+'hello', encoding='utf-8'))
 
    for conn in w_list:                    # 單獨實現寫的操作
        message = message_dict[conn][0]
        conn.sendall(bytes(message+'hello', encoding='utf-8'))
        del message_dict[conn][0]
        outputs.remove(conn)
 
 
#client1.py
 
import socket
client_socket = socket.socket()
client_socket.connect(('127.0.0.1', 8000,))
while True:
    inp = input('>>>')
    client_socket.sendall(bytes(inp, encoding='utf-8'))
    data = str(client_socket.recv(1024), encoding='utf-8')
    print(data)
client_socket.close()
 
 
#client.py2
 
import socket
client_socket = socket.socket()
client_socket.connect(('127.0.0.1', 8000,))
while True:
    inp = input('>>>')
    client_socket.sendall(bytes(inp, encoding='utf-8'))
    data = str(client_socket.recv(1024), encoding='utf-8')
    print(data)
client_socket.close()

事件機制的select呼叫:

 伺服器端:


class TCPServer:
    def __init__(self, server, server_address, inputs, outputs, message_queues):
        # Create a TCP/IP
        self.server = server
        self.server.setblocking(False)

        # Bind the socket to the port
        self.server_address = server_address
        print('starting up on %s port %s' % self.server_address)
        self.server.bind(self.server_address)

        # Listen for incoming connections
        self.server.listen(5)

        # Sockets from which we expect to read
        self.inputs = inputs

        # Sockets to which we expect to write
        # 處理要傳送的訊息
        self.outputs = outputs
        # Outgoing message queues (socket: Queue)
        self.message_queues = message_queues

    def handler_recever(self, readable):
        # Handle inputs
        # 迴圈判斷是否有客戶端連線進來, 當有客戶端連線進來時select 將觸發
        for s in readable:
            # 判斷當前觸發的是不是服務端物件, 當觸發的物件是服務端物件時,說明有新客戶端連線進來了
            # 表示有新使用者來連線
            if s is self.server:
                # A "readable" socket is ready to accept a connection
                connection, client_address = s.accept()
                self.client_address = client_address
                print('connection from', client_address)
                # this is connection not server
                connection.setblocking(0)
                # 將客戶端物件也加入到監聽的列表中, 當客戶端傳送訊息時 select 將觸發
                self.inputs.append(connection)

                # Give the connection a queue for data we want to send
                # 為連線的客戶端單獨建立一個訊息佇列,用來儲存客戶端傳送的訊息
                self.message_queues[connection] = queue.Queue()
            else:
                # 有老使用者發訊息, 處理接受
                # 由於客戶端連線進來時服務端接收客戶端連線請求,將客戶端加入到了監聽列表中(input_list), 客戶端傳送訊息將觸發
                # 所以判斷是否是客戶端物件觸發
                data = s.recv(1024)
                # 客戶端未斷開
                if data != b'':
                    # A readable client socket has data
                    # 可讀的客戶機套接字具有資料
                    print('received "%s" from %s' % (data, s.getpeername()))
                    # 將收到的訊息放入到相對應的socket客戶端的訊息佇列中
                    self.message_queues[s].put(data)
                    # Add output channel for response
                    # 將需要進行回覆操作socket放到output 列表中, 讓select監聽
                    if s not in self.outputs:
                        self.outputs.append(s)
                else:
                    # 客戶端斷開了連線, 將客戶端的監聽從input列表中移除
                    # Interpret empty result as closed connection
                    print('closing ', s.getpeername())  # 獲取客戶端的socket資訊
                    # 停止監聽連線上的輸入
                    # Stop listening for input on the connection
                    if s in self.outputs:
                        self.outputs.remove(s)
                    self.inputs.remove(s)
                    s.close()

                    # Remove message queue
                    # 移除對應socket客戶端物件的訊息佇列
                    del self.message_queues[s]
            return "got it"

    def handler_send(self, writable):
        # Handle outputs
        # 如果現在沒有客戶端請求, 也沒有客戶端傳送訊息時, 開始對傳送訊息列表進行處理, 是否需要傳送訊息
        # 儲存哪個客戶端傳送過訊息
        for s in writable:
            try:
                # 如果訊息佇列中有訊息,從訊息佇列中獲取要傳送的訊息
                message_queue = self.message_queues.get(s)
                send_data = ''
                if message_queue is not None:
                    send_data = message_queue.get_nowait()
            except queue.Empty:
                # 客戶端連線斷開了
                self.outputs.remove(s)
            else:
                # print "sending %s to %s " % (send_data, s.getpeername)
                # print "send something"
                if message_queue is not None:
                    s.send(send_data)
                else:
                    print("client has closed")
                # del message_queues[s]
                # writable.remove(s)
                # print "Client %s disconnected" % (client_address)
            return "got it"

    def handler_exception(self, exceptional):
        # # Handle "exceptional conditions"
        # 處理異常的情況
        for s in exceptional:
            print('exception condition on', s.getpeername())
            # Stop listening for input on the connection
            self.inputs.remove(s)
            if s in self.outputs:
                self.outputs.remove(s)
            s.close()

            # Remove message queue
            del self.message_queues[s]
            return "got it"


def event_loop(tcpserver, inputs, outputs):
    while inputs:
        # Wait for at least one of the sockets to be ready for processing 等待至少一個套接字準備好進行處理
        print('waiting for the next event')
        # 開始select 監聽, 對input_list 中的伺服器端server 進行監聽
        # 當socket呼叫send, recv等函式時, 就會再次呼叫此函式, 這時返回的第二個引數就會有值
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        readable, writable, exceptional = select.select(inputs, outputs, inputs)
        if readable is not None:
            tcp_recever = tcpserver.handler_recever(readable)
            if tcp_recever == 'got it':
                print("server have received")
        if writable is not None:
            tcp_send = tcpserver.handler_send(writable)
            if tcp_send == 'got it':
                print("server have send")
        if exceptional is not None:
            tcp_exception = tcpserver.handler_exception(exceptional)
            if tcp_exception == 'got it':
                print("server have exception")

        sleep(0.8)


if __name__ == '__main__':
    server_address = ('localhost', 8090)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    inputs = [server]
    outputs = []
    message_queues = {}
    tcpserver = TCPServer(server, server_address, inputs, outputs, message_queues)
    # 開啟事件迴圈
    event_loop(tcpserver, inputs, outputs)

客戶端:

import socket


messages = ['This is the message ', 'It will be sent ', 'in parts ', ]

server_address = ('localhost', 8090)

# Create aTCP/IP socket

socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM), socket.socket(socket.AF_INET,  socket.SOCK_STREAM), ]

# Connect thesocket to the port where the server is listening

print('connecting to %s port %s' % server_address)
# 連線到伺服器
for s in socks:
    s.connect(server_address)

for index, message in enumerate(messages):
    # Send messages on both sockets
    for s in socks:
        print('%s: sending "%s"' % (s.getsockname(), message + str(index)))
        send_data = message + str(index)
        s.sendall(bytes(send_data, encoding='utf-8'))
    # Read responses on both sockets

for s in socks:
    data = s.recv(1024)
    print('%s: received "%s"' % (s.getsockname(), data))
    if data != "":
        print('closingsocket', s.getsockname())
        s.close()

 

三、SocketServer模組

SocketServer內部使用 IO多路複用 以及 “多執行緒” 和 “多程序” ,從而實現併發處理多個客戶端請求的Socket服務端。即:每個客戶端請求連線到伺服器時,Socket服務端都會在伺服器是建立一個“執行緒”或者“程序” 專門負責處理當前客戶端的所有請求。

繼承關係(全都要通過例項化s,也就是繼承的順序開始查詢對應的方法):

ThreadingTCPServer

ThreadingTCPServer實現的Soket伺服器內部會為每個client建立一個 “執行緒”,該執行緒用來和客戶端進行互動。

1、ThreadingTCPServer基礎

使用ThreadingTCPServer:

  • 建立一個繼承自 SocketServer.BaseRequestHandler 的類
  • 類中必須定義一個名稱為 handle 的方法
  • 啟動ThreadingTCPServer
服務端socketserver:
import socketserver


'''
    def __init__(self, request, client_address, server):
        self.request = request
        self.client_address = client_address
        self.server = server
        self.setup()
        try:
            self.handle()
        finally:
            self.finish()

'''

class MyServer(socketserver.BaseRequestHandler):

    def handle(self): #最後一步看基類的init方法,裡面呼叫handle
        print('conn is: ',self.request)   #conn
        print('addr is: ',self.client_address) #addr

        while True:
            try:
            #收訊息
                data=self.request.recv(1024)
                if not data:break
                print('收到客戶端的訊息是',data,self.client_address)

                #發訊息
                self.request.sendall(data.upper())

            except Exception as e:
                print(e)
                break

if __name__ == '__main__':
    s=socketserver.ThreadingTCPServer(('127.0.0.1',8080),MyServer) #多執行緒
    # s=socketserver.ForkingTCPServer(('127.0.0.1',8080),MyServer) #多程序

    # self.server_address = server_address
    # self.RequestHandlerClass = RequestHandlerClass
    print(s.server_address)
    print(s.RequestHandlerClass)
    print(MyServer)
    print(s.socket)
    print(s.server_address)
   
    s.serve_forever() #主要看這個self._handle_request_noblock(),然後根據繼承關係找
客戶端socketclient版(可以建立多個客戶端來連線):
from socket import *
ip_port=('127.0.0.1',8080)
back_log=5
buffer_size=1024

tcp_client=socket(AF_INET,SOCK_STREAM)
tcp_client.connect(ip_port)

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    if msg == 'quit':break

    tcp_client.send(msg.encode('utf-8'))

    data=tcp_client.recv(buffer_size)
    print('收到服務端發來的訊息:',data.decode('utf-8'))

tcp_client.close()

 

 

 

例項:

伺服器端:

# -*- coding: utf-8 -*-

import socketserver

class MyServer(socketserver.BaseRequestHandler):

    def handle(self):
        print(self.request,self.client_address,self.server)
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.'.encode('utf-8'))
        Flag = True
        while Flag:
            data = conn.recv(1024).decode('utf-8')
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('通過可能會被錄音.balabala一大推'.encode('utf-8'))
            else:
                conn.sendall('請重新輸入.'.encode('utf-8'))


if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()

客戶端:

# -*- coding: utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print ('receive:',data.decode('utf-8'))
    inp = input('please input:')
    sk.sendall(inp.encode('utf-8'))
    if inp == 'exit':
        break

sk.close()

SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 select 和 Threading 兩個東西,其實本質上就是在伺服器端為每一個客戶端建立一個執行緒,當前執行緒用來處理對應客戶端的請求,所以,可以支援同時n個客戶端連結(長連線)。

ForkingTCPServer

ForkingTCPServer和ThreadingTCPServer的使用和執行流程基本一致,只不過在內部分別為請求者建立 “執行緒”  和 “程序”。

基本使用:

    伺服器端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import SocketServer

class MyServer(SocketServer.BaseRequestHandler):

    def handle(self):
        # print self.request,self.client_address,self.server
        conn = self.request
        conn.sendall('歡迎致電 10086,請輸入1xxx,0轉人工服務.')
        Flag = True
        while Flag:
            data = conn.recv(1024)
            if data == 'exit':
                Flag = False
            elif data == '0':
                conn.sendall('通過可能會被錄音.balabala一大推')
            else:
                conn.sendall('請重新輸入.')


if __name__ == '__main__':
    server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyServer)
    server.serve_forever()

 客戶端:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

import socket


ip_port = ('127.0.0.1',8009)
sk = socket.socket()
sk.connect(ip_port)
sk.settimeout(5)

while True:
    data = sk.recv(1024)
    print 'receive:',data
    inp = raw_input('please input:')
    sk.sendall(inp)
    if inp == 'exit':
        break

sk.close()

以上ForkingTCPServer只是將 ThreadingTCPServer 例項中的程式碼:

erver = SocketServer.ThreadingTCPServer(('127.0.0.1',8009),MyRequestHandler)
變更為:
server = SocketServer.ForkingTCPServer(('127.0.0.1',8009),MyRequestHandler)

SocketServer的ThreadingTCPServer之所以可以同時處理請求得益於 select 和 os.fork 兩個東西,其實本質上就是在伺服器端為每一個客戶端建立一個程序,當前新建立的程序用來處理對應客戶端的請求,所以,可以支援同時n個客戶端連結(長連線)。

原始碼剖析參考 ThreadingTCPServer

 

四、事件驅動

簡而言之,事件驅動分為二個部分:第一,註冊事件;第二,觸發事件。

自定義事件驅動框架,命名為:“弒君者”:

事件驅動框架:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

# event_drive.py

event_list = []


def run():
    for event in event_list:
        obj = event()
        obj.execute()


class BaseHandler(object):
    """
    使用者必須繼承該類,從而規範所有類的方法(類似於介面的功能)
    """
    def execute(self):
        raise Exception('you must overwrite execute')

程式設計師使用“弒君者框架”:

 

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from source import event_drive


class MyHandler(event_drive.BaseHandler):

    def execute(self):
        print 'event-drive execute MyHandler'


event_drive.event_list.append(MyHandler)
event_drive.run()

如上述程式碼,事件驅動只不過是框架規定了執行順序,程式設計師在使用框架時,可以向原執行順序中註冊“事件”,從而在框架執行時可以出發已註冊的“事件”。

基於事件驅動Socket

#!/usr/bin/env python
# -*- coding:utf-8 -*-
 
from twisted.internet import protocol
from twisted.internet import reactor
 
class Echo(protocol.Protocol):
    def dataReceived(self, data):
        self.transport.write(data)
 
def main():
    factory = protocol.ServerFactory()
    factory.protocol = Echo
 
    reactor.listenTCP(8000,factory)
    reactor.run()
 
if __name__ == '__main__':
    main()

 

程式執行流程:

  • 執行服務端程式
  • 建立Protocol的派生類Echo
  • 建立ServerFactory物件,並將Echo類封裝到其protocol欄位中
  • 執行reactor的 listenTCP 方法,內部使用 tcp.Port 建立socket server物件,並將該物件新增到了 reactor的set型別的欄位 _read 中
  • 執行reactor的 run 方法,內部執行 while 迴圈,並通過 select 來監視 _read 中檔案描述符是否有變化,迴圈中...
  • 客戶端請求到達
  • 執行reactor的 _doReadOrWrite 方法,其內部通過反射呼叫 tcp.Port 類的 doRead 方法,內部 accept 客戶端連線並建立Server物件例項(用於封裝客戶端socket資訊)和 建立 Echo 物件例項(用於處理請求) ,然後呼叫 Echo 物件例項的 makeConnection 方法,建立連線。
  • 執行 tcp.Server 類的 doRead 方法,讀取資料,
  • 執行 tcp.Server 類的 _dataReceived 方法,如果讀取資料內容為空(關閉連結),否則,出發 Echo 的 dataReceived 方法
  • 執行 Echo 的 dataReceived 方法

從原始碼可以看出,上述例項本質上使用了事件驅動的方法 和 IO多路複用的機制來進行Socket的處理。

非同步IO操作:

#!/usr/bin/env python
# -*- coding:utf-8 -*-

from twisted.internet import reactor, protocol
from twisted.web.client import getPage
from twisted.internet import reactor
import time

class Echo(protocol.Protocol):

    def dataReceived(self, data):
        deferred1 = getPage('http://cnblogs.com')
        deferred1.addCallback(self.printContents)

        deferred2 = getPage('http://baidu.com')
        deferred2.addCallback(self.printContents)

        for i in range(2):
            time.sleep(1)
            print 'execute ',i


    def execute(self,data):
        self.transport.write(data)

    def printContents(self,content):
        print len(content),content[0:100],time.time()

def main():

    factory = protocol.ServerFactory()
    factory.protocol = Echo

    reactor.listenTCP(8000,factory)
    reactor.run()

if __name__ == '__main__':
    main()