1. 程式人生 > >python 並發編程之IO 模型

python 並發編程之IO 模型

lse 需要 inux 兩個 檢查 很多 soc while 特點

首先說一下 IO 發生時涉及的對象和步驟。以read 為例,會經歷兩個階段:

1)等待數據準備

2)將數據從內核拷貝到進程中

二,阻塞Io(blocking IO)

在 Linux中 默認情況下所有的socket都是blocking,一個典型的讀操作流程大概如下:

技術分享圖片

所以blocking IO 的特點就是在IO 執行的兩個階段(等待數據和拷貝數據兩個階段)都被block(阻塞)了

幾乎所有的程序員第一次接觸到的網絡編程都是從listen()、send()、recv() 等接口開始的,使用這些接口可以很方便的構建服務器/客戶機的模型。然而大部分的socket接口都是阻塞型的。如下圖

ps:所謂阻塞型接口是指系統調用(一般是IO接口)不返回調用結果並讓當前線程一直阻塞,只有當該系統調用獲得結果或者超時出錯時才返回。

所以一個簡單的解決方案:

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),這樣任何一個連接的阻塞都不會影響其他的連接。

該方案的問題是:

#開啟多進程或多線程的方式,在遇到要同時響應成百上千路的連接請求,則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,而且線程與進程本身也更容易進入假死狀態。

改進方案:

#很多程序員可能會考慮使用“線程池”或“連接池”。“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。這兩種技術都可以很好的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。

改進方案其實也存在著問題:

#“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。

三 非阻塞io(non-blocking IO)

Linux 下,可以通過設置socket時期變為non-blocking.當對一個non-blocking socket 執行讀操作時,流程如下;

技術分享圖片

從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那麽它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據準備好了,並且又再次收到了用戶進程的system call,那麽它馬上就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),然後返回。

也就是說非阻塞的recvform系統調用調用之後,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之後,可以幹點別的事情,然後再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要註意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。

所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。

#服務端

from socket import *
import   time

s =socket()
s.bind((127.0.0.1,8080))
s.listen(5)
s.setblocking(False)

r_list=[]
w_list=[]
while True:
    try:
        conn,addr= s.accept()
        r_list.append(conn)
    except BlockingIOError:
        print(可以去幹其他活了)
        print(rlist:,len(r_list))

        del_rlist=[]
        for conn in r_list:
            try:
                data =conn.recv(1024)
                if not data:
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list.append((conn,data.upper()))
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_rlist.append(conn)

        del_wlist=[]
        for item in w_list:
            try:
                conn=item[0]
                res=item[1]
                conn.send(res)
                del_wlist.append(item)
            except BlockingIOError:
                continue
            except ConnectionResetError:
                conn.close()
                del_wlist.append(item)


        for conn in del_rlist:
            r_list.remove(conn)

        for  item in del_wlist:
            w_list.remove(item)



#客戶端

from socket import *
import  os
client =socket()
client.connect((127.0.0.1,8080))

while True:
    data=%s say hello%os.getpid()
    client.send(data.encode(utf-8))
    res=client.recv(1024)
    print(res.decode(utf-8))

但是非阻塞IO 模型決不被推薦。

我們不能否認其優點:能夠在等待任務完成的時間裏幹其他活了(包括提交其他任務,也就是‘後臺’可以有多個任務在‘同時’執行)

但是也難掩其缺點:

1:循環調用recv() 將大幅度提到CPU占用率,這也是我們在代碼中留一句time.sleep(2)de 原因,否則在低配主機下極容易出現卡機極容易出現卡機情況。
2:。任務完成的響應延遲增大了, 因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成, 這回導致整體數據吞吐量的降低。

多路復用IO (IOmultiplexing)

IO multiplexing 也叫select/epoll, 他的好處就在單個process 就可以同時出處理多個網絡連接的io.

基本原理就是select/epoll.這個function 會不斷的輪詢負責所有的socket,當某個 socket 有數據到達了,就通知用戶進程。特的流程如圖:

技術分享圖片

當用戶進程調用了select,name整個進程會被block,而同時,select 會檢測所有的它所負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程在調用read操作, 將數據從kernel 拷貝到用戶進程。

強調:

1、如果處理的連接數不是很高的話,使用selec/epoll 的web server 不一定比使用multi_threading+blocking IO 的web server 性能更好,可能延尺

還更大。select/epool的優勢並不是對單個連接能處理的更快, 而是在於能處理更多的連接。

2、在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是如果整個用戶的process其實是一致被block的,值不過process是被select這個函數block ,而不 是被被socket io給block.

結論:

select的優勢在於可以處理多個連接,不適用與單個連接

#服務端
from socket import  *
import  select

s=socket
s.bind((127.0.0.1,8080))
s.listen(5)
s.setblocking(False)

r_list=[s,]
w_list=[]
w_data={}
while True:
    print(被檢測r_list: ,len(r_list))
    print(被檢測w_list: ,len(w_list))
    rl,wl,xl=select.select(r_list,w_list,[],)

    for r in rl:
        if r==s:
            conn,addr=r.accept()
            r_list.append(conn)
        else:
            try:
                data = r.recv(1024)
                if not data:
                    r.close()
                    r_list.remove(r)
                    continue

                w_list.append(r)
                w_data[r]=data.upper()
            except ConnectionResetError:
                r.close()
                r_list.remove(r)
                continue
    for w in wl:
        w.send(w_data[w])
        w_list.remove(w)
        w_data.pop(w)




#客戶端



from socket import  *
import os,

client=socket()
client.connect((127.0.0.1,8080))
while True:
    data ="%s say hello "%os.getpid()
    client.send(data.encode(utf-8))
    res=client.recv(1024)
    print(res.deccode(utf-8))

該模型優點:

相比其他模型,使用select()的事件驅動模型是用單線程(進程)執行,占用資源少,不消耗太多cpu,同時能夠為多客戶端提供服務,如果視圖建立一個簡單的事件驅動的服務器程序, 這個模型有一定的參考價值。

該模型的缺點

首先select()接口並不是事先‘事件驅動’的最好選擇。因為當需要探測的句柄值較大時,select()接口本身需要消耗大量時間去輪詢各個句柄。很多操作系統提供了更為高效的接口,如linuxt提供了epoll。。等等。如果需要實現更高效的服務器程序, 類似epoll 這樣的接口更被推薦, 遺憾的是不同的操作系統提供的epoll接口有很大的差異。所以使用類似於epoll 的接口實現據歐較好的跨平臺能力的服務器會比較困難。

其次,該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。

異步IO(Asynchronous I/O)

Linux 下的asynchronous IO 其實用的不多,從內核2.6版本才開始引入,先看他的流程:

技術分享圖片

用戶進程發起 read 操作之後,立刻就可以開始去做其他事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block。然後,kernel會等待數據準備完成,然後將數據拷貝到用戶內存,當這一切都完成之後,kernel會給用戶進程發送一個signal,告訴它read操作完成了。

python 並發編程之IO 模型