1. 程式人生 > >【經典】5種IO模型 | IO多路複用

【經典】5種IO模型 | IO多路複用

上篇回顧:靜態伺服器+壓測

3.2.概念篇

1.同步與非同步

同步是指一個任務的完成需要依賴另外一個任務時,只有等待被依賴的任務完成後,依賴的任務才能算完成。

非同步是指不需要等待被依賴的任務完成,只是通知被依賴的任務要完成什麼工作。然後繼續執行下面程式碼邏輯,只要自己完成了整個任務就算完成了(非同步一般使用狀態、通知和回撥)

PS:專案裡面一般是這樣的:(個人經驗)

  1. 同步架構:一般都是和錢相關的需求,需要實時返回的業務
  2. 非同步架構:更多是對寫要求比較高時的場景(同步變非同步)
    • 讀一般都是實時返回,程式碼一般都是await xxx()
  3. 想象個情景就清楚了:
    • 非同步:現在使用者寫了篇文章,可以非同步操作,就算沒真正寫到資料庫也可以返回:發表成功(大不了失敗提示一下)
    • 同步:使用者獲取訂單資訊,你如果非同步就會這樣了:提示下獲取成功,然後一片空白...使用者不解除安裝就怪了...

2.阻塞與非阻塞

阻塞是指呼叫結果返回之前,當前執行緒會被掛起,一直處於等待訊息通知,不能夠執行其他業務(大部分程式碼都是這樣的)

非阻塞是指在不能立刻得到結果之前,該函式不會阻塞當前執行緒,而會立刻返回(繼續執行下面程式碼,或者重試機制走起)

PS:專案裡面重試機制為啥一般都是3次?

  1. 第一次重試,兩臺PC掛了也是有可能的
  2. 第二次重試,負載均衡分配的三臺機器同時掛的可能性不是很大,這時候就有可能是網路有點擁堵了
  3. 最後一次重試,再失敗就沒意義了,日記寫起來,再重試網路負擔就加大了,得不償失了

3.五種IO模型

對於一次IO訪問,資料會先被拷貝到核心的緩衝區中,然後才會從核心的緩衝區拷貝到應用程式的地址空間。需要經歷兩個階段:

  1. 準備資料
  2. 將資料從核心緩衝區拷貝到程序地址空間

由於存在這兩個階段,Linux產生了下面五種IO模型(以socket為例

  1. 阻塞式IO:
    • 當用戶程序呼叫了recvfrom等阻塞方法時,核心進入IO的第1個階段:準備資料(核心需要等待足夠的資料再拷貝)這個過程需要等待,使用者程序會被阻塞,等核心將資料準備好,然後拷貝到使用者地址空間,核心返回結果,使用者程序才從阻塞態進入就緒態
    • Linux中預設情況下所有的socket都是阻塞的
  2. 非阻塞式IO:
    • 當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error
    • 使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是它可以再次傳送read操作
    • 一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上就將資料拷貝到了使用者記憶體,然後返回
    • 非阻塞IO模式下使用者程序需要不斷地詢問核心的資料準備好了沒有
  3. IO多路複用
    • 通過一種機制,一個程序可以監視多個檔案描述符(套接字描述符)一旦某個檔案描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作(這樣就不需要每個使用者程序不斷的詢問核心資料準備好了沒)
    • 常用的IO多路複用方式有selectpollepoll
  4. 訊號驅動IO:
    • 核心檔案描述符就緒後,通過訊號通知使用者程序,使用者程序再通過系統呼叫讀取資料。
    • 此方式屬於同步IO(實際讀取資料到使用者程序快取的工作仍然是由使用者程序自己負責的)
  5. 非同步IOPOSIXaio_系列函式)
    • 使用者程序發起read操作之後,立刻就可以開始去做其它的事。核心收到一個非同步IO read之後,會立刻返回,不會阻塞使用者程序。
    • 核心會等待資料準備完成,然後將資料拷貝到使用者記憶體,當這一切都完成之後,核心會給使用者程序傳送一個signal告訴它read操作完成了

4.Unix圖示

貼一下Unix程式設計裡面的圖:

**非阻塞IO**

2.非阻塞IO

**IO複用**

3.IO複用

**訊號IO**

4.訊號IO

**非同步AIO**

5.非同步AIO

3.3.IO多路複用

開始之前咱們通過非阻塞IO引入一下:(來個簡單例子socket.setblocking(False))

import time
import socket

def select(socket_addr_list):
    for client_socket, client_addr in socket_addr_list:
        try:
            data = client_socket.recv(2048)
            if data:
                print(f"[來自{client_addr}的訊息:]\n")
                print(data.decode("utf-8"))
                client_socket.send(
                    b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                )
            else:
                # 沒有訊息是觸發異常,空訊息是斷開連線
                client_socket.close()  # 關閉客戶端連線
                socket_addr_list.remove((client_socket, client_addr))
                print(f"[客戶端{client_addr}已斷開連線,當前連線數:{len(socket_addr_list)}]")
        except Exception:
            pass

def main():
    # 存放客戶端集合
    socket_addr_list = list()

    with socket.socket() as tcp_server:
        # 防止埠繫結的設定
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        tcp_server.setblocking(False)  # 服務端非阻塞
        while True:
            try:
                client_socket, client_addr = tcp_server.accept()
                client_socket.setblocking(False)  # 客戶端非阻塞
                socket_addr_list.append((client_socket, client_addr))
            except Exception:
                pass
            else:
                print(f"[來自{client_addr}的連線,當前連線數:{len(socket_addr_list)}]")
            # 防止客戶端斷開後出錯
            if socket_addr_list:
                # 輪詢檢視客戶端有沒有訊息
                select(socket_addr_list)  # 引用傳參
                time.sleep(0.01)

if __name__ == "__main__":
    main()

輸出:
3.nowait.gif

可以思考下:

  1. 為什麼Server也要設定為非阻塞?
    • PS:一個執行緒裡面只能有一個死迴圈,現在程式需要兩個死迴圈,so ==> 放一起咯
  2. 斷開連線怎麼判斷?
    • PS:沒有訊息是觸發異常,空訊息是斷開連線
  3. client_socket為什麼不用dict存放?
    • PS:dict在迴圈的過程中,del會引發異常

1.Select

select和上面的有點類似,就是輪詢的過程交給了作業系統:

kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序

來個和上面等同的案例:

import select
import socket

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()
        socket_info_dict = dict()
        socket_list = [tcp_server]  # 監測列表
        while True:
            # 劣勢:select列表數量有限制
            read_list, write_list, error_list = select.select(
                socket_list, [], [])
            for item in read_list:
                # 服務端迎接新的連線
                if item == tcp_server:
                    client_socket, client_address = item.accept()
                    socket_list.append(client_socket)
                    socket_info_dict[client_socket] = client_address
                    print(f"[{client_address}已連線,當前連線數:{len(socket_list)-1}]")
                # 客戶端發來
                else:
                    data = item.recv(2048)
                    if data:
                        print(data.decode("utf-8"))
                        item.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        item.close()
                        socket_list.remove(item)
                        info = socket_info_dict[item]
                        print(f"[{info}已斷開,當前連線數:{len(socket_list)-1}]")

if __name__ == "__main__":
    main()

輸出和上面一樣

擴充套件說明:

select 函式監視的檔案描述符分3類,分別是writefdsreadfds、和exceptfds。呼叫後select函式會阻塞,直到有描述符就緒函式返回(有資料可讀、可寫、或者有except)或者超時(timeout指定等待時間,如果立即返回設為null即可)

select的一個缺點在於單個程序能夠監視的檔案描述符的數量存在最大限制,在Linux上一般為1024(64位=>2048)

然後Poll就出現了,就是把上限給去掉了,本質並沒變,還是使用的輪詢

2.EPoll

epoll在核心2.6中提出(Linux獨有),使用一個檔案描述符管理多個描述符,將使用者關心的檔案描述符的事件存放到核心的一個事件表中,採用監聽回撥的機制,這樣在使用者空間和核心空間的copy只需一次,避免再次遍歷就緒的檔案描述符列表

先來看個案例吧:(輸出和上面一樣)

import socket
import select

def main():
    with socket.socket() as tcp_server:
        tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcp_server.bind(('', 8080))
        tcp_server.listen()

        # epoll是linux獨有的
        epoll = select.epoll()
        # tcp_server註冊到epoll中
        epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)

        # key-value
        fd_socket_dict = dict()

        # 回撥需要自己處理
        while True:
            # 返回可讀寫的socket fd 集合
            poll_list = epoll.poll()
            for fd, event in poll_list:
                # 伺服器的socket
                if fd == tcp_server.fileno():
                    client_socket, client_addr = tcp_server.accept()
                    fd = client_socket.fileno()
                    fd_socket_dict[fd] = (client_socket, client_addr)
                    # 把客戶端註冊進epoll中
                    epoll.register(fd, select.EPOLLIN | select.EPOLLET)
                else:  # 客戶端
                    client_socket, client_addr = fd_socket_dict[fd]
                    data = client_socket.recv(2048)
                    print(
                        f"[來自{client_addr}的訊息,當前連線數:{len(fd_socket_dict)}]\n")
                    if data:
                        print(data.decode("utf-8"))
                        client_socket.send(
                            b"HTTP/1.1 200 ok\r\nContent-Type: text/html;charset=utf-8\r\n\r\n<h1>Web Server Test</h1>"
                        )
                    else:
                        del fd_socket_dict[fd]
                        print(
                            f"[{client_addr}已離線,當前連線數:{len(fd_socket_dict)}]\n"
                        )
                        # 從epoll中登出
                        epoll.unregister(fd)
                        client_socket.close()

if __name__ == "__main__":
    main()

擴充套件:epoll的兩種工作模式

LT(level trigger,水平觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程式,應用程式可以不立即處理該事件。下次呼叫epoll_wait時,會再次響應應用程式並通知此事件。LT模式是預設的工作模式。
LT模式同時支援阻塞和非阻塞socket。

ET(edge trigger,邊緣觸發)模式:當epoll_wait檢測到描述符就緒,將此事件通知應用程式,應用程式必須立即處理該事件。如果不處理,下次呼叫epoll_wait時,不會再次響應應用程式並通知此事件。
ET是高速工作方式,只支援非阻塞socket(ET模式減少了epoll事件被重複觸發的次數,因此效率要比LT模式高)

Code提煉一下

  1. 例項化物件:epoll = select.epoll()
  2. 註冊物件:epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
  3. 登出物件:epoll.unregister(fd)

PS:epoll不一定比Select效能高,一般都是分場景的:

  1. 高併發下,連線活躍度不高時:epoll比Select效能高(eg:web請求,頁面隨時關閉)
  2. 併發不高,連線活躍度比較高:Select更合適(eg:小遊戲)
  3. Select是win和linux通用的,而epoll只有linux有

其實IO多路複用還有一個kqueue,和epoll類似,下面的通用寫法中有包含


3.通用寫法(Selector

一般來說:Linux下使用epoll,Win下使用select(IO多路複用會這個通用的即可)

先看看Python原始碼:

# 選擇級別:epoll|kqueue|devpoll > poll > select
if 'KqueueSelector' in globals():
    DefaultSelector = KqueueSelector
elif 'EpollSelector' in globals():
    DefaultSelector = EpollSelector
elif 'DevpollSelector' in globals():
    DefaultSelector = DevpollSelector
elif 'PollSelector' in globals():
    DefaultSelector = PollSelector
else:
    DefaultSelector = SelectSelector

實戰案例:(可讀和可寫可以不分開)

import socket
import selectors

# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()

class Task(object):
    def __init__(self):
        # 存放客戶端fd和socket鍵值對
        self.fd_socket_dict = dict()

    def run(self):
        self.server = socket.socket()
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.server.bind(('', 8080))
        self.server.listen()
        # 把Server註冊到epoll
        Selector.register(self.server.fileno(), selectors.EVENT_READ,
                          self.connected)

    def connected(self, key):
        """客戶端連線時處理"""
        client_socket, client_address = self.server.accept()
        fd = client_socket.fileno()
        self.fd_socket_dict[fd] = (client_socket, client_address)
        # 註冊一個客戶端讀的事件(服務端去讀訊息)
        Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
        print(f"{client_address}已連線,當前連線數:{len(self.fd_socket_dict)}")

    def call_back_reads(self, key):
        """客戶端可讀時處理"""
        # 一個fd只能註冊一次,監測可寫的時候需要把可讀給登出
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        print(f"[來自{client_address}的訊息:]\n")
        data = client_socket.recv(2048)
        if data:
            print(data.decode("utf-8"))
            # 註冊一個客戶端寫的事件(服務端去發訊息)
            Selector.register(key.fd, selectors.EVENT_WRITE,
                              self.call_back_writes)
        else:
            client_socket.close()
            del self.fd_socket_dict[key.fd]
            print(f"{client_address}已斷開,當前連線數:{len(self.fd_socket_dict)}")

    def call_back_writes(self, key):
        """客戶端可寫時處理"""
        Selector.unregister(key.fd)
        client_socket, client_address = self.fd_socket_dict[key.fd]
        client_socket.send(b"ok")
        Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)

def main():
    t = Task()
    t.run()
    while True:
        ready = Selector.select()
        for key, obj in ready:
            # 需要自己回撥
            call_back = key.data
            call_back(key)

if __name__ == "__main__":
    main()

Code提煉一下

  1. 例項化物件:Selector = selectors.DefaultSelector()
  2. 註冊物件:
    • Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
    • Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
  3. 登出物件:Selector.unregister(key.fd)
  4. 注意一下:一個fd只能註冊一次,監測可寫的時候需要把可讀給登出(反之一樣)

業餘拓展:

select, iocp, epoll,kqueue及各種I/O複用機制
https://blog.csdn.net/shallwake/article/details/5265287

kqueue用法簡介
http://www.cnblogs.com/luminocean/p/5631336.html

下級預估:協程篇 or 網路深入篇