1. 程式人生 > >非阻塞的socket

非阻塞的socket

非阻塞的socket

同步和非同步的概念描述的是使用者執行緒與核心的互動方式:同步是指使用者執行緒發起IO請求後需要等待或者輪詢核心IO操作完成後才能繼續執行;而非同步是指使用者執行緒發起IO請求後仍繼續執行,當核心IO操作完成後會通知使用者執行緒,或者呼叫使用者執行緒註冊的回撥函式。
阻塞和非阻塞的概念描述的是使用者執行緒呼叫核心IO操作的方式:阻塞是指IO操作需要徹底完成後才返回到使用者空間;而非阻塞是指IO操作被呼叫後立即返回給使用者一個狀態值,無需等到IO操作徹底完成。

有三種io方式
blocking IO: 發起IO操作後阻塞當前執行緒直到IO結束,標準的同步IO,如預設行為的posix read和write。

non-blocking IO: 發起IO操作後不阻塞,使用者可阻塞等待多個IO操作同時結束。non-blocking也是一種同步IO:“批量的同步”。如linux下的poll,select, epoll,BSD下的kqueue。

asynchronous IO: 發起IO操作後不阻塞,使用者得遞一個回撥待IO結束後被呼叫。如windows下的OVERLAPPED + IOCP。linux的native AIO只對檔案有效。

1.非阻塞IO

from socket import *
import time

#用來儲存所有的新連結socket
g_socket_list = list
() def mian(): server_socket = socket(AF_INET,SOCK_STREAM) server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,True) server_socket.bind(('',7890)) server_socket.listen(128) server_socket.setblocking(False) while True: # time.sleep(0.1) try: newClientInfo =
server_socket.accept() except Exception as result: print(result) else: print('一個新的客戶端到來',str(newClientInfo)) newClientInfo[0].setblocking(False)#切換為非阻塞 g_socket_list.append(newClientInfo) for clien_socket,client_addr in g_socket_list: try: recvData = clien_socket.recv(1024) if recvData: print('客戶端已關閉',client_addr) clien_socket.close() g_socket_list.remove((clien_socket,client_addr)) except Exception as result: print(result) pass print(g_socket_list) if __name__ == '__main__': mian()

簡單來說,程式一直輪詢io埠,如果沒有響應不會發生阻塞等待有資料來領,而是丟擲異常,跳過,繼續輪詢。

設定非阻塞scoket在server_socket.setblocking(False)

非阻塞acept,非阻塞connect,非阻塞write,非阻塞read等也是一樣

單程序非阻塞的服務端實現

import time
from socket import *
import sys
import re
class WSGIServer(object):
    def __init__(self,port):
        self.server_socket = socket(AF_INET,SOCK_STREAM)
        self.server_socket.setsockopt(SOL_SOCKET,SO_REUSEADDR,True)
        self.server_socket.setblocking(False)
        self.server_socket.bind(('',port))
        self.server_socket.listen(128)
        self.socket_list = []
        self.root = '/Users/keith/Downloads/wurenjizhineng/www.aspku.com'
    def run(self):
        while True:
            try:
                new_socket,new_addr = self.server_socket.accept()
            except Exception as e:
                print('-----1------',e)
            else:
                new_socket.setblocking(False)
                self.socket_list.append(new_socket)
            #socket輪詢了~
            for client_socket in self.socket_list:
                try:
                    recv_data = client_socket.recv(1024).decode('utf-8')
                except Exception as e:
                    print('------2------',e)
                else:
                    if recv_data:
                        self.deal_with_request(recv_data,client_socket)
                    else:
                        client_socket.close()
                        self.socket_list.remove(client_socket)
            print(self.socket_list)
    #處理請求
    def deal_with_request(self,recv_data,client_socket):
        if not recv_data:
            return
        request_lines =recv_data.splitlines()
        for i ,line in enumerate(request_lines):
            print(i,line)

        #提取請求的檔案(index.html)
        #GET /a/b/c/d/e/index.html HTTP/1.1
        ret =re.match(r'[^/]+([^ ])+',request_lines[0])
        if ret:
            print('正則提取資料:',ret.group(1))
            file_name = ret.group(1)
            if file_name == '/':
                file_name = '/index.html'
        try:
            f = open(self.root+file_name,'rb')
        except:
            response_body = 'file not found,請輸入正確的url'
            response_line = 'HTTP/1.1 404\r\n'
            response_header = 'Content-Type: text/html;charset=utf-8\r\n'
            response_data = (response_line+response_line+'\r\n'+response_body).encode('utf-8')
            #返回404
            client_socket.send(response_data)
        else:
            content = f.read()
            f.close()
            response_body = content
            response_line = 'HTTP/1.1 200 OK\r\n'
            response_header = 'Content-Type: text/html;charset=utf-8\r\n\r\n'
            response_data = (response_line+response_header).encode('utf-8')+response_body
            #傳送資料
            client_socket.send(response_data)

def main():
    if len(sys.argv) == 2:
        port = sys.argv[1]
        if port.isdigit():
            port = int(port)
    else:
        print('按照格式輸入')
        return
    print('伺服器用的埠是',port)
    http_server = WSGIServer(port)
    http_server.run()

if __name__ == '__main__':
    main()


2.epool非阻塞實現伺服器,IO多路複用

就是我們常說的select,poll,epoll,有些地方也稱這種IO方式為event driven IO。
select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。

它的基本原理就是select,poll,epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。

I/O 多路複用的特點:

通過一種機制使一個程序能同時等待多個檔案描述符,而這些檔案描述符(套接字描述符)其中的任意一個進入讀就緒狀態,epoll()函式就可以返回。 所以, IO多路複用,本質上不會有併發的功能,因為任何時候還是隻有一個程序或執行緒進行工作,它之所以能提高效率是因為select\epoll 把進來的socket放到他們的 ‘監視’ 列表裡面,當任何socket有可讀可寫資料立馬處理,那如果select\epoll 手裡同時檢測著很多socket, 一有動靜馬上返回給程序處理,總比一個一個socket過來,阻塞等待,處理高效率。

當然也可以多執行緒/多程序方式,一個連線過來開一個程序/執行緒處理,這樣消耗的記憶體和程序切換頁會耗掉更多的系統資源。 所以我們可以結合IO多路複用和多程序/多執行緒 來高效能併發,IO複用負責提高接受socket的通知效率,收到請求後,交給程序池/執行緒池來處理邏輯。

import socket
import select

# 建立套接字
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# 設定可以重複使用繫結的資訊
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,1)

# 繫結本機資訊
s.bind(("",7788))

# 變為被動
s.listen(10)

# 建立一個epoll物件
epoll = select.epoll()

# 測試,用來列印套接字對應的檔案描述符
# print(s.fileno())
# print(select.EPOLLIN|select.EPOLLET)

# 註冊事件到epoll中
# epoll.register(fd[, eventmask])
# 注意,如果fd已經註冊過,則會發生異常
# 將建立的套接字新增到epoll的事件監聽中
epoll.register(s.fileno(), select.EPOLLIN|select.EPOLLET)

connections = {}
addresses = {}

# 迴圈等待客戶端的到來或者對方傳送資料
while True:

    # epoll 進行 fd 掃描的地方 -- 未指定超時時間則為阻塞等待
    epoll_list = epoll.poll()

    # 對事件進行判斷
    for fd, events in epoll_list:

        # print fd
        # print events

        # 如果是socket建立的套接字被啟用
        if fd == s.fileno():
            new_socket, new_addr = s.accept()

            print('有新的客戶端到來%s' % str(new_addr))

            # 將 conn 和 addr 資訊分別儲存起來
            connections[new_socket.fileno()] = new_socket
            addresses[new_socket.fileno()] = new_addr

            # 向 epoll 中註冊 新socket 的 可讀 事件
            epoll.register(new_socket.fileno(), select.EPOLLIN|select.EPOLLET)

        # 如果是客戶端傳送資料
        elif events == select.EPOLLIN:
            # 從啟用 fd 上接收
            recvData = connections[fd].recv(1024).decode("utf-8")

            if recvData:
                print('recv:%s' % recvData)
            else:
                # 從 epoll 中移除該 連線 fd
                epoll.unregister(fd)

                # server 側主動關閉該 連線 fd
                connections[fd].close()
                print("%s---offline---" % str(addresses[fd]))
                del connections[fd]
                del addresses[fd]