1. 程式人生 > >IO模型和協程

IO模型和協程

一、 IO模型

五種IO模型:

blocking IO :阻塞IO

nonblocking IO 非阻塞IO

IO multiplexing  IO多路複用

signal driven IO 訊號驅動IO

asynchronous IO 非同步IO

對於一個network IO,它會涉及到兩個系統物件,一個是呼叫這個IO的process(or thread),另一個就是系統核心。當一個read/recv讀資料的操作發生時,該操作會經歷兩個階段:

1, 等待資料準備

2, 將資料從核心拷貝到程序中

補充:

1, 輸入操作:read,readv,recv,recvfrom,recvmsg共5個函式,如果會阻塞狀態,則會經歷wait data和copy data兩個階段,如果設定為非阻塞則在wait不到data時丟擲異常

2, 輸出操作:write,writev,send,sendto,sendmsg共5個函式,在傳送緩衝區滿了會阻塞在原地,如果設定為非阻塞,則會丟擲異常

3, 接收外來連結:accept,與輸入操作類似

4, 發起外出連結:connect,與操作類似

 

二、阻塞IO

回顧同步/非同步/阻塞/非阻塞:

同步:提交一個任務之後要等待這個任務執行完畢

非同步:只管提交任務,不等待這個任務執行完畢就可以去做其他的事情

阻塞:recv,recvfrom,accept,執行緒階段 執行狀態>>>阻塞狀態>>>就緒

非阻塞:沒有阻塞狀態

 

在一個執行緒的IO模型中,我們recv的地方阻塞,我們就開啟多執行緒,但是不管你開啟多少個執行緒,這個recv的時間是不是沒有被規避掉,不管是多執行緒還是多程序都沒有規避掉IO這個時間

 

實際上,除非特別指定,幾乎所有的IO介面(包括socket介面)都是阻塞的。這給網路程式設計帶來了一個很大的問題,如在呼叫recv(1024)的同時,執行緒將被阻塞,在此期間,執行緒將無法執行任何運算或響應任何的網路請求

一個簡單的解決方案:

在服務端使用多執行緒(或多程序)。多執行緒(或多程序)的目的是讓每個連線都擁有獨立的執行緒(或程序),這樣任何一個連線的阻塞都不會影響其他的連線。

該方案的問題是:

開啟多執行緒或都執行緒的方式,在遇到要同時響應成百上千的連線請求,則無論多執行緒還是多程序都會佔用嚴重的系統資源,降低系統對外界相應效率,而且執行緒與程序本身也更容易進入假死狀態

改進方案:

很多程式設計師可能會考慮使用“執行緒池”或“程序池”。“執行緒池”旨在減少建立和銷燬執行緒的頻率,其維持一定合理數量的執行緒,並讓空閒的執行緒重新承擔新的執行任務。“連線池”維持連線的快取池,儘量重用已有的連線、減少建立和關閉連線的頻率。這兩種技術都可以很好的降低系統開銷,都被廣泛應用很大型系統,如websphere、tomcat和各種資料庫等

改進後方案其實也存在著問題:

“執行緒池”和“連線池”技術也只是在一定程度上緩解了頻繁呼叫IO介面帶來的資源佔用。而且,所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果好多少。所以使用“池”必須其面臨的響應規模,並根據規模調整“池”的大小

 

三、非阻塞IO

缺點:

1, 迴圈呼叫recv()將大幅推高cpu佔用率,這也是我們在程式碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況

2, 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read一次,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體資料吞吐量的降低。

import time
import socket

server = socket.socket()

ip_port = ('127.0.0.1',8001)
server.bind(ip_port)

server.listen()
server.setblocking(False)
conn_list = []

while 1:
    while 1:
        try:
            conn,addr = server.accept()
            conn_list.append(conn)
            break
        except BlockingIOError:
            time.sleep(0.1)
            print('此時還沒有人鏈接我')

    for sock in conn_list:
        print(sock)
        while 1:
            try:
                from_client_msg = sock.recv(1024)
                print(from_client_msg.decode('utf-8'))
                sock.send(b'hello')
                break
            except BlockingIOError:
                print('還沒有任何的訊息啊')
非阻塞iO模型服務端

 

import socket
client = socket.socket()

client.connect(('127.0.0.1',8001))
while 1:
    to_server_msg = input('我想對你說>>>>')
    client.send(to_server_msg.encode('utf-8'))
    from_server_msg = client.recv(1024)
    print(from_server_msg.decode('utf-8'))
非阻塞IO模型客戶端

四、多路複用IO

結論:select的優勢在於可以處理多個連線,不適用於單個連線

IO多路複用的機制:

Select:windows 、linux

Poll機制:linux和select監聽機制一樣,但是對監聽列表裡面的數量沒有限制,select預設限制是1024個,但是他們兩個都是作業系統輪詢每一個被監聽的檔案描述符(如果數量很大,其實效率不太好),卡是否有可讀操作

Epoll:linux它的監聽機制和上面兩個不同,他給每一個監聽的物件綁定了一個回撥函式,你這個物件有訊息,那麼觸發回撥函式給使用者,使用者就進行系統呼叫來拷貝資料,並不是輪詢監聽所有的被監聽物件,這樣的效率高很多。

 

import select
import socket

server = socket.socket()
server.bind(('127.0.0.1',8001))
rlist = [server,]
server.listen()
while 1:
    print('11111')
    rl,wl,el = select.select(rlist,[],[])#建立rl物件,監聽
    print(222222)
    print('server物件>>>',server)
    print(rl)   #rl物件其實跟server物件(內容)一致
    for sock in rl: #當rl有值的時候,迴圈列表
        if sock == server: #值與server相同
            conn,addr = sock.accept() #建立連線
            rlist.append(conn) #把管道資訊加入列表
        else:
            from_client_msg = sock.recv(1024)#conn
            print(from_client_msg.decode('utf-8')) #列印接收
IO多路複用服務端

 

import socket

client = socket.socket()
client.connect(('127.0.0.1',8001))

to_server_msg = input('發給服務端的訊息:')
client.send(to_server_msg.encode('utf-8'))
# from_server_msg = client.recv(1024)
# print(from_server_msg.decode('utf-8'))
io多路複用客戶端

 

五、非同步IO

 

from gevent import monkey;monkey.patch_all()
import time
import gevent

def func1(n):
    print('xxxxxx',n)
    # gevent.sleep(2)
    time.sleep(2)
    print('cccccc',n)

def func2(m):
    print('111111',m)
    # gevent.sleep(2)
    time.sleep(2)
    print('222222',m)

start_time = time.time()
g1 = gevent.spawn(func1,'alex')
g2 = gevent.spawn(func2,'德瑪西亞')
# g1.join() #
# g2.join()
gevent.joinall([g1,g2])
end_time = time.time()
print(end_time - start_time)

print('程式碼結束')