1. 程式人生 > >【Python】使用socketserver建立一個非同步TCP伺服器

【Python】使用socketserver建立一個非同步TCP伺服器

概述

這篇文章是講解如何使用socketserver建立一個非同步TCP伺服器,其中Python版本為3.5.1。

socketserver主要的類

socketserver模組中的類主要有以下幾個:
1、BaseServer 包含伺服器的核心功能與混合類(mix-in)的鉤子功能。這個類主要用於派生,不要直接生成這個類的類物件,可以考慮使用TCPServer和UDPServer類。
2、TCPServer:基本的網路同步TCP伺服器
3、UDPServer:基本的網路同步UDP伺服器
4、ForkingMixIn:實現了核心的程序化功能,用於與伺服器類進行混合(mix-in),以提供一些非同步特性。不要直接生成這個類的物件。
5、ThreadingMixIn:實現了核心的執行緒化功能,用於與伺服器類進行混合(mix-in),以提供一些非同步特性。不要直接生成這個類的物件。
6、ForkingTCPServer:ForkingMixIn與TCPServer的組合
7、ForkingUDPServer:ForkingMixIn與UDPServer的組合
8、BaseRequestHandler:基本的請求處理類
9、StreamRequestHandler:TCP請求處理類的一個實現
10、DataStreamRequestHandler:UDP請求處理類的一個實現

BaseRequestHandler類

BaseRequestHandler類的例項h可以實現以下方法:

1、h.handle() 呼叫該方法執行實際的請求操作。呼叫該函式可以不帶任何引數,但是幾個例項變數包含有用的值。h.request包含請求,h.client_address包含客戶端地址,h.server包含呼叫處理程式的例項。對於TCP之類的資料流服務,h.request屬性是套接字物件。對於資料報服務,它是包含收到資料的位元組字串。

2、h.setup() 該方法在handle()之前呼叫。預設情況下,它不執行任何操作。如果希望伺服器實現更多連線設定(如建立SSL連線),可以在這裡實現。

3、h.finish() 呼叫本方法可以在執行完handle()之後執行清除操作。預設情況下,它不執行任何操作。如果setup()和handle()方法都不生成異常,則無需呼叫該方法。

官方例程

首先上官方給出的例程

<span style="font-size:14px;">import socket
import threading
import socketserver

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)

class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
    pass

def client(ip, port, message):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.connect((ip, port))
        sock.sendall(bytes(message, 'ascii'))
        response = str(sock.recv(1024), 'ascii')
        print("Received: {}".format(response))

if __name__ == "__main__":
    # Port 0 means to select an arbitrary unused port
    HOST, PORT = "localhost", 0

    server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
    ip, port = server.server_address

    # Start a thread with the server -- that thread will then start one
    # more thread for each request
    server_thread = threading.Thread(target=server.serve_forever)
    # Exit the server thread when the main thread terminates
    server_thread.daemon = True
    server_thread.start()
    print("Server loop running in thread:", server_thread.name)

    client(ip, port, "Hello World 1")
    client(ip, port, "Hello World 2")
    client(ip, port, "Hello World 3")

    server.shutdown()
    server.server_close()</span>

client函式是建立一個客戶端,可以不用管它。
主要部分是在於主函式,ThreadedTCPServer類和ThreadedTCPRequestHandler類。ThreadedTCPServer類繼承了BaseRequestHandler類,ThreadedTCPRequestHandler繼承了ThreadingMixIn和TCPServer

正常輸入如下:

$ python ThreadedTCPServer.py
Server loop running in thread: Thread-1
Received: Thread-2: Hello World 1
Received: Thread-3: Hello World 2
Received: Thread-4: Hello World 3

增加功能

上面部分主要是講解官方的例程,下面這一部分是博主自己增加的功能。

1、獲取客戶端的ip和port

如果想在TCP建立連線後列印「<ip>:<port> is connect!」資訊出來,並獲取客戶端的ip地址和埠資訊,可以在ThreadedTCPRequestHandler類裡面改寫setup函式。
client_addr = []

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def setup(self):
        ip = self.client_address[0].strip()     # 獲取客戶端的ip
        port = self.client_address[1]           # 獲取客戶端的port
        print(ip+":"+str(port)+" is connect!")
        client_addr.append(self.client_address) # 儲存到佇列中

    def handle(self):
        data = str(self.request.recv(1024), 'ascii')
        cur_thread = threading.current_thread()
        response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
        self.request.sendall(response)
在主函式中新增下面語句,即可打印出連線過的客戶端資訊:
print("\nclient_addr:"+str(client_addr))

2、保持TCP長連線

官方例程中是建立了TCP連線後就馬上斷開,如果想建立長連線,可以在handle函式中新增while迴圈,同時修改程式碼為:先判斷緩衝區是否有資料,有資料才進行響應;改寫finish函式,可以看到finish的資訊並沒有打印出來。如果註釋掉while迴圈語句,可以看到finish的資訊會打印出來。
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def setup(self):
        ip = self.client_address[0].strip()     # 獲取客戶端的ip
        port = self.client_address[1]           # 獲取客戶端的port
        print(ip+":"+str(port)+" is connect!")
        client_addr.append(self.client_address) # 儲存到佇列中

    def handle(self):
        while True: # while迴圈
            data = str(self.request.recv(1024), 'ascii')
            if data:    # 判斷是否接收到資料
                cur_thread = threading.current_thread()
                response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
                self.request.sendall(response)

    def finish(self):
        print("client is disconnect!")
感謝評論區歇業的漁夫的建議,while True 迴圈建立長連線的方式非常佔用CPU資源,最好在迴圈裡面增加一個time.sleep(0.1)的休眠。

3、伺服器給客戶端傳送請求

現在的例程是在ThreadedTCPRequestHandler類裡面呼叫self.request.sendall方法來給客戶端傳送資料,而且只能被動傳送資料,如果我想主動給客戶端傳送資料,又該怎麼辦呢?下面是實現伺服器主動給客戶端傳送請求的功能。 TCP連線想要傳送資料,只要找到相關的方法直接呼叫即可,於是我對ThreadedTCPServer這個類的例項server的方法找了好久,也沒有找到傳送的方法。後來我查資料注意到了一句話:「對於TCP之類的資料流服務,h.request屬性是套接字物件。」我覺得我可以這樣做:使用這個套接字物件傳送資料。經過嘗試後,驗證成功。下面只放上核心程式碼:
client_addr = []
client_socket = []

class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    def setup(self):
        ip = self.client_address[0].strip()     # 獲取客戶端的ip
        port = self.client_address[1]           # 獲取客戶端的port
        print(ip+":"+str(port)+" is connect!")
        client_addr.append(self.client_address) # 儲存到佇列中
        client_socket.append(self.request)      # 儲存套接字socket

    def handle(self):
        while True: # while迴圈
            data = str(self.request.recv(1024), 'ascii')
            if data:    # 判斷是否接收到資料
                cur_thread = threading.current_thread()
                response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
                self.request.sendall(response)

    def finish(self):
        print("client is disconnect!")
        client_addr.remove(self.client_address)
        client_socket.remove(self.request)

之後在主函式中通過client_socket佇列呼叫sendall或sendto方法即可。例如我在主函式這樣寫(已經註釋掉client函式呼叫):
    message = bytes("clientTest\n", "ascii")
    while True:
        time.sleep(2)
        if client_addr:
            client_socket[0].sendall(message)

修改伺服器ip地址為空及埠為8080,使用socket除錯工具連線該伺服器,即可每隔2s接收到「clientTest」字串。

4、伺服器接收客戶端資料超時後斷開

下面繼續新增新的功能,假設客戶端每隔一段時間傳送資料給伺服器(心跳包),如果在一定時間內伺服器沒有接受到心跳包,表明客戶端已經斷開了連線,這個時候伺服器可以主動斷開客戶端的連線了。那麼我們在原有的程式碼增加此功能。實際上,只需要修改ThreadedTCPRequestHandler類即可。
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):

    ip = ""
    port = 0
    timeOut = 6     # 設定超時時間變數

    def setup(self):
        self.ip = self.client_address[0].strip()     # 獲取客戶端的ip
        self.port = self.client_address[1]           # 獲取客戶端的port
        self.request.settimeout(self.timeOut)        # 對socket設定超時時間
        print(self.ip+":"+str(self.port)+"連線到伺服器!")
        client_addr.append(self.client_address) # 儲存到佇列中
        client_socket.append(self.request)      # 儲存套接字socket

    def handle(self):
        while True: # while迴圈
            try:
                data = str(self.request.recv(1024), 'ascii')
            except socket.timeout:  # 如果接收超時會丟擲socket.timeout異常
                print(self.ip+":"+str(self.port)+"接收超時!即將斷開連線!")
                break       # 記得跳出while迴圈

            if data:    # 判斷是否接收到資料
                cur_thread = threading.current_thread()
                response = bytes("{}: {}".format(cur_thread.name, data), 'ascii')
                self.request.sendall(response)

    def finish(self):
        print(self.ip+":"+str(self.port)+"斷開連線!")
        client_addr.remove(self.client_address)
        client_socket.remove(self.request)

使用socket除錯工具連線該伺服器後,不傳送任何資料,過了6秒鐘後,伺服器端主要列印如下資料: 192.168.10.53:26408連線到伺服器!
192.168.10.53:26408接收超時!即將斷開連線!
192.168.10.53:26408斷開連線!