1. 程式人生 > >第七章|7.4併發程式設計| I/O模型

第七章|7.4併發程式設計| I/O模型

I/O模型

協程是單執行緒下的併發,並不是對效能都有所提升,一定是監測單個執行緒下的多個任務的I/O,遇到I/O不要讓它阻塞,給它自動切換到其他任務去,這樣就能提高單個執行緒下的執行效率。--->>用gevent模組來實現了,gevent是怎麼檢測I/O行為的呢,gevent監測行為,遇到I/O自動切換到其他任務去,併發效果有了效能也提升了。

講I/O模型就是為了你自己去實現一個gevent模式。

同步非同步(提交任務的方式),同步呼叫是提交完任務在原地等著結果拿到結果後再執行下行程式碼;非同步呼叫是提交完就不管了接著往下執行,非同步通常跟回撥機制連用,我提交完一個任務,這個任務執行完後會自動觸發回撥函式執行把結果交給它。

同步不等於阻塞。同步只是提交任務的方式,任務是否阻塞我不管,我就在這等著只要執行完了才往下走;阻塞是遇到I/O了,你自己沒有處理,作業系統會剝奪你使用cpu給別人用。我們要解決的就是一個執行緒下可能要乾的活很多,你不要遇到I/O就卡在原地了,我能夠檢測到它確實是一個I/O行為好切換到其他任務;非阻塞和阻塞正好相反。

遇到I/O為什麼會卡那呢,我們要研究的是套接字I/O行為,實現服務端儘可能多的支援併發、效能提升起來,研究的主要是網路I/O。

服務端套接字accept(等著別人連線,給你發一個數據,會讓你感覺明顯的等),recv(收訊息) ,send(傳訊息,並不會讓你感覺等)都屬於I/O阻塞行為;

recv經歷兩個階段:waite等資料(從自己的快取資料收訊息,從客戶端到服務端,耗時最長);從作業系統快取記憶體copy到應用程式記憶體;accept和recv是一個的都要經歷這兩個階段。send是應用程式把自己的資料copy到作業系統記憶體,本地copy的速度非常快,它只經歷這一個階段,然後接下來由作業系統去幹。

四種I/O模型不同之處就是對這兩個階段的處理不一樣。

 

阻塞I/O

 

recvfrom要經歷wait for data和copy data 兩個階段才算執行完畢,這就就是阻塞I/O。

#阻塞I/O  沒有併發、遇到阻塞就等著
from socket import *
from threading import Thread #可利用多執行緒實現併發,讓主執行緒幹accept的活
def communicate(conn):
    while True:   #通訊迴圈
        try:
            data = conn.recv(1024) #
等待,等著收訊息,客戶端要產生資料 if not data: break conn.send(data.upper()) except ConnectionResetError: break conn.close() server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8080)) server.listen(5) while True: #連線迴圈 print('starting...') conn, addr = server.accept() #wait階段主要卡在這裡;accept等著對方連我,別人給我發訊息才算過去了 print(addr)#客戶端來了 t=Thread(target=communicate,args=(conn,)) #讓主執行緒幹accept的活,每來一個連結發起個執行緒讓它幹通訊的活;沒有解決I/O阻塞,起了多個執行緒各個執行互不影響; t.start() #執行緒池保證機器在健康的狀態下執行; server.close()
View Code
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080)) #發連線

while True:
    msg=input('>>: ').strip() #輸訊息
    if not msg:continue
    client.send(msg.encode('utf-8')) #發訊息
    data=client.recv(1024)
    print(data.decode('utf-8')) #收訊息
client.close()
View Code

非阻塞I/O

單執行緒下的多併發

監測單執行緒下的I/O,幫你自動切換到另外一個任務去執行,wait和copy階段是怎麼處理的呢,跟gevent一樣。

由應用程式發起個系統呼叫recvfrom打算收一個訊息,發給作業系統核心,在阻塞I/O它會讓你一直在原地等著。非阻塞I/O,發給作業系統核心,作業系統會說沒有資料準備好,返回個訊號給你的應用程式,應用程式就可以先去幹其他的活,不用再等了,幹一會其他的活再問問來了沒,這樣迴圈。當作業系統準備好了之後,把資料由系統的緩衝區copy到應用程式。waite階段是你可以利用的可以幹其他的活。

 所以,在非阻塞式IO中,使用者程序其實是需要不斷的主動詢問kernel資料準備好了沒有。

from socket import *

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8083))
server.listen(5)
server.setblocking(False) #1True是阻塞,False是非阻塞
print('starting...')

rlist=[] #有連線來就不停的加
wlist=[] #send在資料量大的情況下也會阻塞
while True:
    try:
        conn, addr = server.accept() #2問作業系統有沒有資料  ;服務端可以不停的建連結
        rlist.append(conn)  #有連結就建連結,每建立一個連結就放在列表裡
        print(rlist)
    except BlockingIOError: #3捕捉這個異常就是那個作業系統發的訊號,拿到這個異常然後幹其他的活;利用accept那個waite階段
        #print('幹其他的活') #沒有資料來就可以幹其他的活了,

        #收訊息
        del_rlist = [] #
        for conn in rlist: #取出conn去收訊息
            try:
                data=conn.recv(1024)
                if not data: #收空訊息,就跳過
                    del_rlist.append(conn)
                    continue 
                wlist.append((conn,data.upper())) #存放套接字以及準備發的資料
            except BlockingIOError: #捕捉異常,第一個套接字沒有,可能下一個就來了
                continue #跳過
            except Exception: #可能套接字出現異常,客戶端單方面的把連線斷開;
                conn.close() #把麼有用的給回收掉,del_rlist從刪除它,不能在迴圈過程中刪除
                del_rlist.append(conn)#

        #發訊息
        del_wlist=[]
        for item in wlist:
            try:
                conn=item[0]#拿到conn物件
                data=item[1]#拿到資料
                conn.send(data) #緩衝區大小不能無線的大,send在資料量過大的情況下也會阻塞
                del_wlist.append(item)
            except BlockingIOError: #出異常了是記憶體滿了,沒拋異常證明發成功了
                pass

        for item in del_wlist:#發成功了把你刪了 
            wlist.remove(item)

        for conn in del_rlist: #把沒有資料發來的conn給刪除掉
            rlist.remove(conn)


server.close()
View Code
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8083))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
View Code

但是非阻塞IO模型絕不被推薦。

我們不能否則其優點:能夠在等待任務完成的時間裡幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在“同時”執行)。

但是也難掩其缺點: 

1. 迴圈呼叫recv()將大幅度推高CPU佔用率;這也是我們在程式碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況
2. 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。
這會導致整體資料吞吐量的降低。

此外,在這個方案中recv()更多的是起到檢測“操作是否完成”的作用,實際作業系統提供了更為高效的檢測“操作是否完成“作用的介面,例如select()多路複用模式,可以一次檢測多個連線是否活躍。  

多路複用I/O模型

IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO。

直接發select(它是多路複用模型的一種,相當於一箇中介,套接字有沒有準備好由中介去問了,問作業系統資料有沒有準備好,沒有準備好它就在那等著;等到套接字資料準備好之後由服務端去發起個系統呼叫,是沒有waite階段的。)

多路複用經歷了3個階段,多了一箇中間互動的階段,效能還不如阻塞I/O,針對一個套接字;select高效能在於它可以監測多個套接字,而阻塞I/O只能有一個。

服務端的套接字有幾類?conn,addr=server.accept()建連結;還有一個是建好連結的那個conn,負責recv和send操作。

 

 

from socket import *
import select

server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8083))
server.listen(5)
server.setblocking(False)
print('starting...')

rlist=[server,] #專門存收訊息的 server調accept用來收;還有一個種是建完連結之後才能拿到的conn套接字;初始狀態就是放server,後來可以放conn
wlist=[] #存發的資料
wdata={}

while True:
    rl,wl,xl=select.select(rlist,wlist,[],0.5) #去問作業系統套接字準備好沒有;[]表示出異常的列表;0.5表示每隔0.5s去問一次;
    print('rl',rl)  #rlist會不斷存conn和server一堆;wlist存一堆conn,緩衝區一旦沒滿證明就可以發了
    print('wl',wl)  #wl是可以往緩衝區send值的這種套接字

    for sock in rl: #讀列表裡邊是幹accept和recv的活
        if sock == server: #幹連結的活
            conn,addr=sock.accept() #拿到conn的活
            rlist.append(conn)
        else:
            try:
                data=sock.recv(1024) #拿到資料,加到那個列表裡邊
                if not data:  #針對linux系統
                    sock.close()
                    rlist.remove(sock) #這個套接字不要監測了
                    continue
                wlist.append(sock) #把套接字加進去,還有它配套的資料
                wdata[sock]=data.upper()
            except Exception:#有可能客戶端單方面把連結斷開
                sock.close()
                rlist.remove(sock) ##rlist沒有必要監測這個套接字了

    for sock in wl: #只能執行send操作
        data=wdata[sock]
        sock.send(data) #傳資料
        wlist.remove(sock)#傳完之後就把它給刪了
        wdata.pop(sock)

server.close()
View Code
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8083))

while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
View Code

該模型的優點:

相比其他模型,使用select() 的事件驅動模型只用單執行緒(程序)執行,佔用資源少,不消耗太多 CPU,同時能夠為多客戶端提供服務。
如果試圖建立一個簡單的事件驅動的伺服器程式,這個模型有一定的參考價值。

該模型的缺點:

首先select()介面並不是實現“事件驅動”的最好選擇。因為當需要探測的控制代碼值較大時,select()介面本身需要消耗大量時間去輪詢各個控制代碼。
很多作業系統提供了更為高效的介面,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
如果需要實現更高效的伺服器程式,類似epoll這樣的介面更被推薦。遺憾的是不同的作業系統特供的epoll介面有很大差異,
所以使用類似於epoll的介面實現具有較好跨平臺能力的伺服器會比較困難。
其次,該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。

非同步I/O模型

 首先還是發一個系統呼叫,傳送給作業系統核心之後,立馬返回,返回之後就可以幹其他活了。經歷了waite和copy階段,最後把資料直接丟給它。向作業系統說我要一個數據,資料請求發出去之後,作業系統你給我準備好資料,準備好之後給我送過來,這就是一種非同步的方式。比阻塞I/O高,發起請求就直接返回了,沒有經歷waite和copy階段

那是作業系統乾的活;比起非阻塞I/O也高了,不用迴圈的去問了,它是什麼時候好什麼時候給我送回來。