1. 程式人生 > >python全棧開發從入門到放棄之socket並發編程之IO模型

python全棧開發從入門到放棄之socket並發編程之IO模型

map 超時 sting mon recv style 好的 exceptio 得到

一 IO模型介紹

同步(synchronous) IO和異步(asynchronous) IO,阻塞(blocking) IO和非阻塞(non-blocking)IO分別是什麽,到底有什麽區別?這個問題其實不同的人給出的答案都可能不同,比如wiki,就認為asynchronous IO和non-blocking IO是一個東西。這其實是因為不同的人的知識背景不同,並且在討論這個問題的時候上下文(context)也不相同。所以,為了更好的回答這個問題,我先限定一下本文的上下文。

Stevens在文章中一共比較了五種IO Model:
* blocking IO
* nonblocking IO
* IO multiplexing
* signal driven IO
* asynchronous IO
由signal driven IO(信號驅動IO)在實際中並不常用,所以主要介紹其余四種IO Model。

再說一下IO發生時涉及的對象和步驟。對於一個network IO (這裏我們以read舉例),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,該操作會經歷兩個階段:

#1)等待數據準備 (Waiting for the data to be ready)
#2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)

記住這兩點很重要,因為這些IO模型的區別就是在兩個階段上各有不同的情況。

二 阻塞IO(blocking IO)

技術分享

當用戶進程調用了recvfrom這個系統調用,kernel就開始了IO的第一個階段:準備數據。對於network io來說,很多時候數據在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的數據到來。

而在用戶進程這邊,整個進程會被阻塞。當kernel一直等到數據準備好了,它就會將數據從kernel中拷貝到用戶內存,然後kernel返回結果,用戶進程才解除block的狀態,重新運行起來。
所以,blocking IO的特點就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。

幾乎所有的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,使用這些接口可以很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的。如下圖

ps:所謂阻塞型接口是指系統調用(一般是IO接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用獲得結果或者超時出錯時才返回。

技術分享

實際上,除非特別指定,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將無法執行任何運算或響應任何的網絡請求。

一個簡單的解決方案:

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),這樣任何一個連接的阻塞都不會影響其他的連接。

該方案的問題是:
#開啟多進程或都線程的方式,在遇到要同時響應成百上千路的連接請求,則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,而且線程與進程本身也更容易進入假死狀態。

改進方案:

#很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。

改進後方案其實也存在著問題:

#“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。
技術分享
服務端:
from socket import *
server=socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1,8080))
server.listen(5)
print(starting...)
while True:
    conn,addr=server.accept()       #這裏一直需要等待用戶連接,就是阻塞

    print(addr)
    while True:
        try:
            data=conn.recv(1024)                    #這裏等待用戶傳輸字節過來,也是阻塞
            if not data:break
            conn.send(data.upper())
        except Exception:
            break
    conn.close()

server.close()




客戶端:
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8080))

while True:
    msg=input(>>: ).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data=client.recv(1024)
    print(data.decode("utf-8"))
IO模型例子

三 非阻塞IO(non-blocking IO)

技術分享

從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那麽它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據準備好了,並且又再次收到了用戶進程的system call,那麽它馬上就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),然後返回。

也就是說非阻塞的recvform系統調用調用之後,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之後,可以幹點別的事情,然後再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要註意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。

服務端:
from socket import *
import time
server=socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1,8800))
server.listen(5)
server.setblocking(False)
print(starting...)
conn_l=[]
del_l=[]

while True:
    try:                                                 #異常處理機制
        print(conn_l)                                
        conn,addr=server.accept()             #接收server的連接  
        conn_l.append(conn)                      #把連接放到conn_l列表中
    except BlockingIOError:                     #如果沒有連接則會報錯,但異常處理機制會處理掉這個異常,相當於不停的去問系統的有沒有數據準備著
        for conn in conn_l:               #如果列表中有連接進行遍歷,
            try:
                data=conn.recv(1024)    #遍歷的來接收數據字節
                conn.send(data.upper())        並返回回去
            except BlockingIOError:     #當沒有數據時會報錯這個錯誤,所以還需要這個異常處理機制
                pass                  #可以什麽都不用寫,會不停的問有沒有數據
            except ConnectionResetError:      #當客戶端直接斷開的時,會報錯,在添加一個異常處理機制
                del_l.append(conn)                #把這個連接的conn放到新建的del_l列表中
        for obj in del_l:                            #把del_l列表中  進行遍歷
            obj.close()                                #通過obj關閉掉這個錯誤的連接
            conn_l.remove(obj)                   #然後這個在刪除掉列表中的
        del_l=[]                                       #在重新定義del_l列表表示清空了

但是非阻塞IO模型絕不被推薦。

我們不能否則其優點:能夠在等待任務完成的時間裏幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在“”同時“”執行)。

但是也難掩其缺點:

#1. 循環調用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況
#2. 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。

此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用,實際操作系統提供了更為高效的檢測“操作是否完成“作用的接口,例如select()多路復用模式,可以一次檢測多個連接是否活躍。

四 多路復用IO(IO multiplexing)

IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有數據到達了,就通知用戶進程。它的流程如圖:

技術分享

當用戶進程調用了select,那麽整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這裏需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。

強調:

1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。

2. 在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。

結論: select的優勢在於可以處理多個連接,不適用於單個連接

服務端:
from socket import *
import time
import select
server=socket(AF_INET,SOCK_STREAM)
server.bind((127.0.0.1,8809))
server.listen(5)
server.setblocking(False)
print(starting...)
reads_l=[server,]
while True:
    r_l,_,_=select.select(reads_l,[],[])
    print(r_l)
    for obj in r_l:
        if obj == server:
            conn,addr=obj.accept() #obj=server
            reads_l.append(conn)
        else:
            data=obj.recv(1024) #obj=conn
            obj.send(data.upper())

客戶端:
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect((127.0.0.1,8809))

while True:
    msg=input(>>: ).strip()
    if not msg:continue
    client.send(msg.encode(utf-8))
    data=client.recv(1024)
    print(data.decode("utf-8"))

select監聽fd變化的過程分析:

#用戶進程創建socket對象,拷貝監聽的fd到內核空間,每一個fd會對應一張系統文件表,內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到;
#用戶進程再發送系統調用,比如(accept)將內核空間的數據copy到用戶空間,同時作為接受數據端內核空間的數據清除,這樣重新監聽時fd再有新的數據又可以響應到了(發送端因為基於TCP協議所以需要收到應答後才會清除)。
該模型的優點:
#相比其他模型,使用select() 的事件驅動模型只用單線程(進程)執行,占用資源少,不消耗太多 CPU,同時能夠為多客戶端提供服務。如果試圖建立一個簡單的事件驅動的服務器程序,這個模型有一定的參考價值。

該模型的缺點:

#首先select()接口並不是實現“事件驅動”的最好選擇。因為當需要探測的句柄值較大時,select()接口本身需要消耗大量時間去輪詢各個句柄。很多操作系統提供了更為高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要實現更高效的服務器程序,類似epoll這樣的接口更被推薦。遺憾的是不同的操作系統特供的epoll接口有很大差異,所以使用類似於epoll的接口實現具有較好跨平臺能力的服務器會比較困難。
#其次,該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。

五 異步IO(Asynchronous I/O)
技術分享

用戶進程發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block。然後,kernel會等待數據準備完成,然後將數據拷貝到用戶內存,當這一切都完成之後,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

六 IO模型比較分析

到目前為止,已經將四個IO Model都介紹完了。現在回過頭來回答最初的那幾個問題:blocking和non-blocking的區別在哪,synchronous IO和asynchronous IO的區別在哪。
先回答最簡單的這個:blocking vs non-blocking。前面的介紹中其實已經很明確的說明了這兩者的區別。調用blocking IO會一直block住對應的進程直到操作完成,而non-blocking IO在kernel還準備數據的情況下會立刻返回。

再說明synchronous IO和asynchronous IO的區別之前,需要先給出兩者的定義。Stevens給出的定義(其實是POSIX的定義)是這樣子的:
A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
An asynchronous I/O operation does not cause the requesting process to be blocked;
兩者的區別就在於synchronous IO做”IO operation”的時候會將process阻塞。按照這個定義,四個IO模型可以分為兩大類,之前所述的blocking IO,non-blocking IO,IO multiplexing都屬於synchronous IO這一類,而 asynchronous I/O後一類 。

有人可能會說,non-blocking IO並沒有被block啊。這裏有個非常“狡猾”的地方,定義中所指的”IO operation”是指真實的IO操作,就是例子中的recvfrom這個system call。non-blocking IO在執行recvfrom這個system call的時候,如果kernel的數據沒有準備好,這時候不會block進程。但是,當kernel中數據準備好的時候,recvfrom會將數據從kernel拷貝到用戶內存中,這個時候進程是被block了,在這段時間內,進程是被block的。而asynchronous IO則不一樣,當進程發起IO 操作之後,就直接返回再也不理睬了,直到kernel發送一個信號,告訴進程說IO完成。在這整個過程中,進程完全沒有被block。

各個IO Model的比較如圖所示:技術分享

經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據準備完成以後,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然後他人做完後發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。

七 selectors模塊

技術分享
IO復用:為了解釋這個名詞,首先來理解下復用這個概念,復用也就是共用的意思,這樣理解還是有些抽象,為此,咱們來理解下復用在通信領域的使用,在通信領域中為了充分利用網絡連接的物理介質,往往在同一條網絡鏈路上采用時分復用或頻分復用的技術使其在同一鏈路上傳輸多路信號,到這裏我們就基本上理解了復用的含義,即公用某個“介質”來盡可能多的做同一類(性質)的事,那IO復用的“介質”是什麽呢?為此我們首先來看看服務器編程的模型,客戶端發來的請求服務端會產生一個進程來對其進行服務,每當來一個客戶請求就產生一個進程來服務,然而進程不可能無限制的產生,因此為了解決大量客戶端訪問的問題,引入了IO復用技術,即:一個進程可以同時對多個客戶請求進行服務。也就是說IO復用的“介質”是進程(準確的說復用的是select和poll,因為進程也是靠調用select和poll來實現的),復用一個進程(select和poll)來對多個IO進行服務,雖然客戶端發來的IO是並發的但是IO所需的讀寫數據多數情況下是沒有準備好的,因此就可以利用一個函數(select和poll)來監聽IO所需的這些數據的狀態,一旦IO有數據可以進行讀寫了,進程就來對這樣的IO進行服務。

  

理解完IO復用後,我們在來看下實現IO復用中的三個API(select、poll和epoll)的區別和聯系

select,poll,epoll都是IO多路復用的機制,I/O多路復用就是通過一種機制,可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知應用程序進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而異步I/O則無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。三者的原型如下所示:

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);



 1.select的第一個參數nfds為fdset集合中最大描述符值加1,fdset是一個位數組,其大小限制為__FD_SETSIZE(1024),位數組的每一位代表其對應的描述符是否需要被檢查。第二三四參數表示需要關註讀、寫、錯誤事件的文件描述符位數組,這些參數既是輸入參數也是輸出參數,可能會被內核修改用於標示哪些描述符上發生了關註的事件,所以每次調用select前都需要重新初始化fdset。timeout參數為超時時間,該結構會被內核修改,其值為超時剩余的時間。

 select的調用步驟如下:

(1)使用copy_from_user從用戶空間拷貝fdset到內核空間

(2)註冊回調函數__pollwait

(3)遍歷所有fd,調用其對應的poll方法(對於socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或者datagram_poll)

(4)以tcp_poll為例,其核心實現就是__pollwait,也就是上面註冊的回調函數。

(5)__pollwait的主要工作就是把current(當前進程)掛到設備的等待隊列中,不同的設備有不同的等待隊列,對於tcp_poll 來說,其等待隊列是sk->sk_sleep(註意把進程掛到等待隊列中並不代表進程已經睡眠了)。在設備收到一條消息(網絡設備)或填寫完文件數 據(磁盤設備)後,會喚醒設備等待隊列上睡眠的進程,這時current便被喚醒了。

(6)poll方法返回時會返回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set賦值。

(7)如果遍歷完所有的fd,還沒有返回一個可讀寫的mask掩碼,則會調用schedule_timeout是調用select的進程(也就是 current)進入睡眠。當設備驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的進程。如果超過一定的超時時間(schedule_timeout 指定),還是沒人喚醒,則調用select的進程會重新被喚醒獲得CPU,進而重新遍歷fd,判斷有沒有就緒的fd。

(8)把fd_set從內核空間拷貝到用戶空間。

總結下select的幾大缺點:

(1)每次調用select,都需要把fd集合從用戶態拷貝到內核態,這個開銷在fd很多時會很大

(2)同時每次調用select都需要在內核遍歷傳遞進來的所有fd,這個開銷在fd很多時也很大

(3)select支持的文件描述符數量太小了,默認是1024

 

2.  poll與select不同,通過一個pollfd數組向內核傳遞需要關註的事件,故沒有描述符個數的限制,pollfd中的events字段和revents分別用於標示關註的事件和發生的事件,故pollfd數組只需要被初始化一次。

 poll的實現機制與select類似,其對應內核中的sys_poll,只不過poll向內核傳遞pollfd數組,然後對pollfd中的每個描述符進行poll,相比處理fdset來說,poll效率更高。poll返回後,需要對pollfd中的每個元素檢查其revents值,來得指事件是否發生。

 

3.直到Linux2.6才出現了由內核直接支持的實現方法,那就是epoll,被公認為Linux2.6下性能最好的多路I/O就緒通知方法。epoll可以同時支持水平觸發和邊緣觸發(Edge Triggered,只告訴進程哪些文件描述符剛剛變為就緒狀態,它只說一遍,如果我們沒有采取行動,那麽它將不會再次告知,這種方式稱為邊緣觸發),理論上邊緣觸發的性能要更高一些,但是代碼實現相當復雜。epoll同樣只告知那些就緒的文件描述符,而且當我們調用epoll_wait()獲得就緒文件描述符時,返回的不是實際的描述符,而是一個代表就緒描述符數量的值,你只需要去epoll指定的一個數組中依次取得相應數量的文件描述符即可,這裏也使用了內存映射(mmap)技術,這樣便徹底省掉了這些文件描述符在系統調用時復制的開銷。另一個本質的改進在於epoll采用基於事件的就緒通知方式。在select/poll中,進程只有在調用一定的方法後,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epoll_ctl()來註冊一個文件描述符,一旦基於某個文件描述符就緒時,內核會采用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epoll_wait()時便得到通知。

 

epoll既然是對select和poll的改進,就應該能避免上述的三個缺點。那epoll都是怎麽解決的呢?在此之前,我們先看一下epoll 和select和poll的調用接口上的不同,select和poll都只提供了一個函數——select或者poll函數。而epoll提供了三個函 數,epoll_create,epoll_ctl和epoll_wait,epoll_create是創建一個epoll句柄;epoll_ctl是註 冊要監聽的事件類型;epoll_wait則是等待事件的產生。

  對於第一個缺點,epoll的解決方案在epoll_ctl函數中。每次註冊新的事件到epoll句柄中時(在epoll_ctl中指定 EPOLL_CTL_ADD),會把所有的fd拷貝進內核,而不是在epoll_wait的時候重復拷貝。epoll保證了每個fd在整個過程中只會拷貝 一次。

  對於第二個缺點,epoll的解決方案不像select或poll一樣每次都把current輪流加入fd對應的設備等待隊列中,而只在 epoll_ctl時把current掛一遍(這一遍必不可少)並為每個fd指定一個回調函數,當設備就緒,喚醒等待隊列上的等待者時,就會調用這個回調 函數,而這個回調函數會把就緒的fd加入一個就緒鏈表)。epoll_wait的工作實際上就是在這個就緒鏈表中查看有沒有就緒的fd(利用 schedule_timeout()實現睡一會,判斷一會的效果,和select實現中的第7步是類似的)。

  對於第三個缺點,epoll沒有這個限制,它所支持的FD上限是最大可以打開文件的數目,這個數字一般遠大於2048,舉個例子, 在1GB內存的機器上大約是10萬左右,具體數目可以cat /proc/sys/fs/file-max察看,一般來說這個數目和系統內存關系很大。

總結:

(1)select,poll實現需要自己不斷輪詢所有fd集合,直到設備就緒,期間可能要睡眠和喚醒多次交替。而epoll其實也需要調用 epoll_wait不斷輪詢就緒鏈表,期間也可能多次睡眠和喚醒交替,但是它是設備就緒時,調用回調函數,把就緒fd放入就緒鏈表中,並喚醒在 epoll_wait中進入睡眠的進程。雖然都要睡眠和交替,但是select和poll在“醒著”的時候要遍歷整個fd集合,而epoll在“醒著”的 時候只要判斷一下就緒鏈表是否為空就行了,這節省了大量的CPU時間,這就是回調機制帶來的性能提升。

(2)select,poll每次調用都要把fd集合從用戶態往內核態拷貝一次,並且要把current往設備等待隊列中掛一次,而epoll只要 一次拷貝,而且把current往等待隊列上掛也只掛一次(在epoll_wait的開始,註意這裏的等待隊列並不是設備等待隊列,只是一個epoll內 部定義的等待隊列),這也能節省不少的開銷。
View Code

這三種IO多路復用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認選擇當前平臺下最合適的

python全棧開發從入門到放棄之socket並發編程之IO模型