python並發編程之IO模型,
了解新知識之前需要知道的一些知識
同步(synchronous):一個進程在執行某個任務時,另外一個進程必須等待其執行完畢,才能繼續執行
#所謂同步,就是在發出一個功能調用時,在沒有得到結果之前,該調用就不會返回。按照這個定義,
其實絕大多數函數都是同步調用。但是一般而言,我們在說同步、異步的時候,
特指那些需要其他部件協作或者需要一定時間完成的任務。 #舉例: #1. multiprocessing.Pool下的apply #發起同步調用後,就在原地等著任務結束,
根本不考慮任務是在計算還是在io阻塞,總之就是一股腦地等任務結束 #2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()#3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()
異步(asynchronous):
#異步的概念和同步相對。當一個異步功能調用發出後,調用者不能立刻得到結果。
當該異步功能完成後,通過狀態、通知或回調來通知調用者。如果異步功能用狀態來通知,
那麽調用者就需要每隔一定時間檢查一次,效率就很低(有些初學多線程編程的人,總喜歡用一個循環去檢查某個變量的值,這其實是一 種很嚴重的錯誤)。
如果是使用通知的方式,效率則很高,因為異步功能幾乎不需要做額外的操作。至於回調函數,其實和通知沒太多區別。 #舉例: #1. multiprocessing.Pool().apply_async() #發起異步調用後,並不會等待任務結束才返回,相反,
會立即獲取一個臨時結果(並不是最終的結果,可能是封裝好的一個對象)。#2. concurrent.futures.ProcessPoolExecutor(3).submit(func,) #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)
阻塞(blocking):
#阻塞調用是指調用結果返回之前,當前線程會被掛起(如遇到io操作)。函數只有在得到結果之後才會
將阻塞的線程激活。有人也許會把阻塞調用和同步調用等同起來,實際上他是不同的。對於同步調用來說,
很多時候當前線程還是激活的,只是從邏輯上當前函數沒有返回而已。 #舉例: #1. 同步調用:apply一個累計1億次的任務,該調用會一直等待,直到任務返回結果為止,
但並未阻塞住(即便是被搶走cpu的執行權限,那也是處於就緒態);#2. 阻塞調用:當socket工作在阻塞模式的時候,如果沒有數據的情況下調用recv函數,
則當前線程就會被掛起,直到有數據為止。
非阻塞(non-blocking):
#非阻塞和阻塞的概念相對應,指在不能立刻得到結果之前也會立刻返回,同時該函數不會阻塞當前線程。
小結:
#1. 同步與異步針對的是函數/任務的調用方式:同步就是當一個進程發起一個函數(任務)調用的時候,一直等到函數(任務)完成,而進程繼續處於激活狀態。而異步情況下是當一個進程發起一個函數(任務)調用的時候,不會等函數返回,而是繼續往下執行當,函數返回的時候通過狀態、通知、事件等方式通知進程任務完成。 #2. 阻塞與非阻塞針對的是進程或線程:阻塞是當請求不能滿足的時候就將進程掛起,而非阻塞則不會阻塞當前進程
一、IO模型介紹
IO發生時涉及的對象和步驟。對於一個網絡IO(network IO),它會涉及到兩個系統對象,一個是調用這個IO的process (or thread),另一個就是系統內核(kernel)。當一個read操作發生時,該操作會經歷兩個階段:
#1)等待數據準備 (Waiting for the data to be ready)
#2)將數據從內核拷貝到進程中(Copying the data from the kernel to the process)
記住這兩點很重要,因為這些IO模型的區別就是在兩個階段上各有不同的情況。
二、阻塞IO (blocking IO)
阻塞IO(blocking IO)的特點:就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了。
實際上,除非特別指定,幾乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。這給網絡編程帶來了一個很大的問題,如在調用recv(1024)的同時,線程將被阻塞,在此期間,線程將無法執行任何運算或響應任何的網絡請求。
1 from socket import * 2 server = socket(AF_INET,SOCK_STREAM) 3 server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 4 server.bind((‘127.0.0.1‘,8080)) 5 server.listen(5) 6 print(‘start runnig...‘) 7 while True: 8 conn,addr = server.accept() #IO操作 在這accept的時候不能幹recv的活 9 print(addr) 10 while True: 11 try: 12 data = conn.recv(1024) #IO操作 13 conn.send(data.upper()) 14 except Exception: 15 break 16 conn.close() 17 server.close() 18 19 # 我們以前寫的這個就是阻塞的IO模型:一旦阻塞了就在那卡著 20 # 直到等到數據已經到了操作系統,操作系統再從內核拷貝給應用程序 21 # 阻塞IO在那兩個階段全都阻塞住了服務端
1 from socket import * 2 client = socket(AF_INET,SOCK_STREAM) 3 client.connect((‘127.0.0.1‘,8080)) 4 while True: 5 cmd = input(‘>>:‘).strip() 6 if not cmd:continue 7 client.send(cmd.encode(‘utf-8‘)) 8 data = client.recv(1024) 9 print(‘接受的是:%s‘%data.decode(‘utf-8‘)) 10 client.close()客戶端
一個簡單的解決方案:
#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每個連接都擁有獨立的線程(或進程),
這樣任何一個連接的阻塞都不會影響其他的連接。
該方案的問題是:
#開啟多進程或都線程的方式,在遇到要同時響應成百上千路的連接請求,
則無論多線程還是多進程都會嚴重占據系統資源,降低系統對外界響應效率,
而且線程與進程本身也更容易進入假死狀態。
改進方案:
很多程序員可能會考慮使用“線程池”或“連接池”。
“線程池”旨在減少創建和銷毀線程的頻率,其維持一定合理數量的線程,並讓空閑的線程重新承擔新的執行任務
。“連接池”維持連接的緩存池,盡量重用已有的連接、減少創建和關閉連接的頻率。這兩種技術都可以很好
的降低系統開銷,都被廣泛應用很多大型系統,如websphere、tomcat和各種數據庫等。
改進後方案其實也存在著問題:
#“線程池”和“連接池”技術也只是在一定程度上緩解了頻繁調用IO接口帶來的資源占用。而且,
所謂“池”始終有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的時候效果
好多少。所以使用“池”必須考慮其面臨的響應規模,並根據響應規模調整“池”的大小。
對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,“線程池”或“連接池”或許可以緩解部分壓力,但是不能解決所有問題。總之,多線程模型可以方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題。
三、非阻塞IO (nonblocking IO)
多線程,多進程,進程池,線程池都可以實現並發,但是仍然沒有解決IO問題
那麽下面我們來了解一下非阻塞IO
從圖中可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那麽它並不會block用戶進程,而是立刻返回一個error。從用戶進程角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作。一旦kernel中的數據準備好了,並且又再次收到了用戶進程的system call,那麽它馬上就將數據拷貝到了用戶內存(這一階段仍然是阻塞的),然後返回。
也就是說非阻塞的recvform系統調用調用之後,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒準備好,此時會返回一個error。進程在返回之後,可以幹點別的事情,然後再發起recvform系統調用。重復上面的過程,循環往復的進行recvform系統調用。這個過程通常被稱之為輪詢。輪詢檢查內核數據,直到數據準備好,再拷貝數據到進程,進行數據處理。需要註意,拷貝數據整個過程,進程仍然是屬於阻塞的狀態。
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好了沒有。
server.setblocking()#默認是True
server.setblocking(False) #False的話就成非阻塞了,這只是對於socket套接字來說的
所以,在非阻塞式IO中,用戶進程其實是需要不斷的主動詢問內核數據準備好了沒有。
wait data 等數據的這個階段是不阻塞的
copy data 這個階段還是要阻塞的
服務端
1 #這種程序雖說解決了單線程並發,但是大大的占用了cpu 2 from socket import * 3 import time 4 severt = socket(AF_INET,SOCK_STREAM) 5 severt.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 6 severt.bind((‘127.0.0.1‘,8080)) 7 severt.listen(5) 8 severt.setblocking(False) #默認是True (如果是False,套接字裏的一些阻塞操作都變成非阻塞的) 9 print(‘startting....‘) 10 conn_l = [] 11 del_l =[] 12 while True: 13 try: 14 print(conn_l) 15 conn,addr = severt.accept() #收不到數據的時候才出異常 16 print(conn) 17 conn_l.append(conn) 18 except BlockingIOError: #吧收不到數據的那段時間利用起來(利用他收不到 19 #數據的時候,才幹下面的for循環) 20 for conn in conn_l: 21 try: 22 data = conn.recv(1024) 23 conn.send(data.upper()) 24 except BlockingIOError: 25 pass 26 except ConnectionResetError: #端開鏈接的錯誤(如果突然斷開鏈接,會報錯 27 #就先添加到列表裏面去,完了吧鏈接給清除了) 28 del_l.append(conn) 29 for obj in del_l: 30 obj.close() 31 conn_l.remove(obj) 32 del_l.clear()
客戶端
1 from socket import * 2 client = socket(AF_INET,SOCK_STREAM) 3 client.connect((‘127.0.0.1‘,8080)) 4 while True: 5 cmd = input(‘>>:‘).strip() 6 if not cmd:continue 7 client.send(cmd.encode(‘utf-8‘)) 8 data = client.recv(1024) 9 print(data.decode(‘utf-8‘))
對服務端的說明:如果客戶端斷開連接的時候,就會發生ConnectionResetError
所以我們的處理一下這個異常。就如上邊的服務端所示
但是非阻塞IO模型絕不被推薦。
非阻塞IO模型優點:能夠在等待任務完成的時間裏幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在“”同時“”執行)。
非阻塞IO模型缺點:
1. 循環調用recv()將大幅度推高CPU占用率;這也是我們在代碼中留一句time.sleep(2)的原因,否則在低配主機下極容易出現卡機情況
2. 任務完成的響應延遲增大了,因為每過一段時間才去輪詢一次read操作,而任務可能在兩次輪詢之間的任意時間完成。這會導致整體數據吞吐量的降低。
四、多路復用IO (IO multiplexing)
當用戶進程調用了select,那麽整個進程會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。
這個圖和blocking IO的圖其實並沒有太大的不同,事實上還更差一些。因為這裏需要使用兩個系統調用(select和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。
強調:
1. 如果處理的連接數不是很高的話,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延遲還更大。select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。
2. 在多路復用模型中,對於每一個socket,一般都設置成為non-blocking,但是,如上圖所示,整個用戶的process其實是一直被block的。只不過process是被select這個函數block,而不是被socket IO給block。
結論: select的優勢在於可以處理多個連接,不適用於單個連接
1 #服務端 2 from socket import * 3 import select 4 5 s=socket(AF_INET,SOCK_STREAM) 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 7 s.bind((‘127.0.0.1‘,8081)) 8 s.listen(5) 9 s.setblocking(False) #設置socket的接口為非阻塞 10 read_l=[s,] 11 while True: 12 r_l,w_l,x_l=select.select(read_l,[],[]) 13 print(r_l) 14 for ready_obj in r_l: 15 if ready_obj == s: 16 conn,addr=ready_obj.accept() #此時的ready_obj等於s 17 read_l.append(conn) 18 else: 19 try: 20 data=ready_obj.recv(1024) #此時的ready_obj等於conn 21 if not data: 22 read_l.remove(ready_obj) 23 continue 24 ready_obj.send(data.upper()) 25 except ConnectionResetError: 26 read_l.remove(ready_obj) 27 28 #客戶端 29 from socket import * 30 c=socket(AF_INET,SOCK_STREAM) 31 c.connect((‘127.0.0.1‘,8081)) 32 33 while True: 34 msg=input(‘>>: ‘) 35 if not msg:continue 36 c.send(msg.encode(‘utf-8‘)) 37 data=c.recv(1024) 38 print(data.decode(‘utf-8‘)) 39 40 select IO模型select IO 模塊
select監聽fd變化的過程分析:
#用戶進程創建socket對象,拷貝監聽的fd到內核空間,每一個fd會對應一張系統文件表,
內核空間的fd響應到數據後,就會發送信號給用戶進程數據已到; #用戶進程再發送系統調用,比如(accept)將內核空間的數據copy到用戶空間,
同時作為接受數據端內核空間的數據清除,這樣重新監聽時fd再有新的數據又可以響應到了
(發送端因為基於TCP協議所以需要收到應答後才會清除)。
select模塊的優點
#相比其他模型,使用select() 的事件驅動模型只用單線程(進程)執行,占用資源少,不消耗太多 CPU,
同時能夠為多客戶端提供服務。如果試圖建立一個簡單的事件驅動的服務器程序,這個模型有一定的參考價值。
select模塊的缺點
#首先select()接口並不是實現“事件驅動”的最好選擇。因為當需要探測的句柄值較大時,
select()接口本身需要消耗大量時間去輪詢各個句柄。很多操作系統提供了更為高效的接口,
如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。
如果需要實現更高效的服務器程序,類似epoll這樣的接口更被推薦。
遺憾的是不同的操作系統特供的epoll接口有很大差異,所以使用類似於epoll的接口實現
具有較好跨平臺能力的服務器會比較困難。 #其次,該模型將事件探測和事件響應夾雜在一起,一旦事件響應的執行體龐大,則對整個模型是災難性的。
五、異步IO(asynchronous IO)
用戶進程發起read操作之後,立刻就可以開始去做其它的事。而另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block。然後,kernel會等待數據準備完成,然後將數據拷貝到用戶內存,當這一切都完成之後,kernel會給用戶進程發送一個signal,告訴它read操作完成了。
六、IO模型比較分析
經過上面的介紹,會發現non-blocking IO和asynchronous IO的區別還是很明顯的。在non-blocking IO中,雖然進程大部分時間都不會被block,但是它仍然要求進程去主動的check,並且當數據準備完成以後,也需要進程主動的再次調用recvfrom來將數據拷貝到用戶內存。而asynchronous IO則完全不同。它就像是用戶進程將整個IO操作交給了他人(kernel)完成,然後他人做完後發信號通知。在此期間,用戶進程不需要去檢查IO操作的狀態,也不需要主動的去拷貝數據。
七、selsectors模塊
這三種IO多路復用模型在不同的平臺有著不同的支持,而epoll在windows下就不支持,好在我們有selectors模塊,幫我們默認選擇當前平臺下最合適的
1 #服務端 2 from socket import * 3 import selectors 4 5 sel=selectors.DefaultSelector() 6 def accept(server_fileobj,mask): 7 conn,addr=server_fileobj.accept() 8 sel.register(conn,selectors.EVENT_READ,read) 9 10 def read(conn,mask): 11 try: 12 data=conn.recv(1024) 13 if not data: 14 print(‘closing‘,conn) 15 sel.unregister(conn) 16 conn.close() 17 return 18 conn.send(data.upper()+b‘_SB‘) 19 except Exception: 20 print(‘closing‘, conn) 21 sel.unregister(conn) 22 conn.close() 23 24 25 26 server_fileobj=socket(AF_INET,SOCK_STREAM) 27 server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 28 server_fileobj.bind((‘127.0.0.1‘,8088)) 29 server_fileobj.listen(5) 30 server_fileobj.setblocking(False) #設置socket的接口為非阻塞 31 sel.register(server_fileobj,selectors.EVENT_READ,accept) #相當於網select的讀列表裏append了一個文件句柄server_fileobj,並且綁定了一個回調函數accept 32 33 while True: 34 events=sel.select() #檢測所有的fileobj,是否有完成wait data的 35 for sel_obj,mask in events: 36 callback=sel_obj.data #callback=accpet 37 callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1) 38 39 #客戶端 40 from socket import * 41 c=socket(AF_INET,SOCK_STREAM) 42 c.connect((‘127.0.0.1‘,8088)) 43 44 while True: 45 msg=input(‘>>: ‘) 46 if not msg:continue 47 c.send(msg.encode(‘utf-8‘)) 48 data=c.recv(1024) 49 print(data.decode(‘utf-8‘))selectors
python並發編程之IO模型,