python 協程、I/O模型
一、引子 (超哥協程)
併發本質:儲存狀態+切換
cpu正在執行一個任務,轉而執行另一個任務的情概況:1、是該任務發生了阻塞;2、該任務計算的時間過長或有一個優先順序更高的程式替代了它。
協程本質上就是一個執行緒,使用程式碼來控制任務的切換。以前執行緒任務的切換是由作業系統控制的,遇到I/O自動切換,現在我們用協程的目的就是較少作業系統切換的開銷(開關執行緒,建立暫存器、堆疊等,在他們之間進行切換等),在我們自己的程式裡面來控制任務的切換。
ps:在介紹程序理論時,提及程序的三種執行狀態,而執行緒才是執行單位,所以也可以將上圖理解為執行緒的三種狀態
一:其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。
二、yield 模擬單執行緒之間的任務切換過程
#1 yiled可以儲存狀態,yield的狀態儲存與作業系統的儲存執行緒狀態很像,但是yield是程式碼級別控制的,更輕量級 #2 send可以把一個函式的結果傳給另外一個函式,以此實現單執行緒內程式之間的切換
import time def jishi1hao(): for i in range(6):通過yield實現任務切換+儲存現場print("快快來~~~~") yield #yield 可以記錄任務的執行狀態 time.sleep(1) print("%s號客戶2s搞定"%i) def jishi2hao(): g=jishi1hao() #獲取到生成器 g.__next__() #執行第一段函式,到第一個yield結束 for i in range(5): time.sleep(1) print("%s號技師正在忙!!!"%i) g.__next__() #繼續到下一個yield jishi2hao()#使用yield可以實現程式的切換執行,但是不能提高效率,只是簡單的程式分段穿插執行
注意:yield可以實現任務的交替執行,但是不能提高程式的執行效率,反而降低了執行效率(切換過程需要時間)。
#基於yield併發執行,多工之間來回切換,這就是個簡單的協程的體現,但是他能夠節省I/O時間嗎?不能 import time def consumer(): '''任務1:接收資料,處理資料''' while True: x=yield time.sleep(1) #發現什麼?只是進行了切換,但是並沒有節省I/O時間 print('處理了資料:',x) def producer(): '''任務2:生產資料''' g=consumer() next(g) #找到了consumer函式的yield位置 for i in range(3): g.send(i) #給yield傳值,然後再迴圈給下一個yield傳值,並且多了切換的程式,比直接序列執行還多了一些步驟,導致執行效率反而更低了。 print('傳送了資料:',i) start=time.time() #基於yield儲存狀態,實現兩個任務直接來回切換,即併發的效果 #PS:如果每個任務中都加上列印,那麼明顯地看到兩個任務的列印是你一次我一次,即併發執行的. producer() #我在當前執行緒中只執行了這個函式,但是通過這個函式裡面的send切換了另外一個任務 stop=time.time() # 序列執行的方式 s_t=time.time() res=producer() consumer() e_t=time.time() print("yield的時間>>>",stop-start) print("序列的時間>>>",e_t-s_t) #結果顯示 #yield的時間>>> 3.0015313625335693 #序列的時間>>> 3.0011236667633057單純的切換反而會降低執行速度
對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。
協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:
#1、可以檢測io操作,在遇到io操作的情況下才發生切換 #2. 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。
三、協程介紹
協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。
需要強調的是:
#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行) #2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)
協程優點:
#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級 #2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu
協程缺點
#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程 #2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒
總結協程特點:
- 必須在只有一個單執行緒裡實現併發
- 修改共享資料不需加鎖
- 使用者程式裡自己儲存多個控制流的上下文棧
- 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制)
四、Greenlet
如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換
#真正的協程模組就是使用greenlet完成的切換 from greenlet import greenlet def eat(name): print('%s eat 1' %name) #2 g2.switch('taibai') #3 print('%s eat 2' %name) #6 g2.switch() #7 def play(name): print('%s play 1' %name) #4 g1.switch() #5 print('%s play 2' %name) #8 g1=greenlet(eat) g2=greenlet(play) g1.switch('taibai')#可以在第一次switch時傳入引數,以後都不需要 1greenlet實現協程(不能提高效率)
單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度。
greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。
五、Gevent介紹
Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。
#用法 g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的,spawn是非同步提交任務 g2=gevent.spawn(func2) g1.join() #等待g1結束,上面只是建立協程物件,這個join才是去執行 g2.join() #等待g2結束 有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了 gevent.joinall([g1,g2]) #等待列表中的所有任務執行完畢 g1.value#拿到func1的返回值
import gevent import time def func1(i): print("func1開始") gevent.sleep(2) print("1111>>>",i) def func2(i): print("func2開始") gevent.sleep(2) print("2222>>>", i) if __name__ == '__main__': s_t=time.time() g1=gevent.spawn(func1,"a") g2=gevent.spawn(func2,"b") g1.join() g2.join() e_t=time.time() print("gevent耗時>>>",e_t-s_t) print("主任務結束")gevent方法使用和時間測試
from gevent import monkey;monkey.patch_all( )必須放在檔案開頭,表示拾取檔案中的所有的I/O操作。
from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的所有阻塞全部能夠識別了 import gevent #直接匯入即可 import time def eat(): #print() print('eat food 1') time.sleep(2) #加上mokey就能夠識別到time模組的sleep了 print('eat food 2') def play(): print('play 1') time.sleep(1) #來回切換,直到一個I/O的時間結束,這裡都是我們個gevent做得,不再是控制不了的作業系統了。 print('play 2') g1=gevent.spawn(eat) g2=gevent.spawn(play_phone) gevent.joinall([g1,g2]) print('主')monkey使用示例
gevent中的同步與非同步效率對比
from gevent import spawn,joinall,monkey;monkey.patch_all() import time def task(pid): """ Some non-deterministic task """ time.sleep(0.5) print('Task %s done' % pid) def synchronous():#同步提交任務,序列,一次出來一個 for i in range(10): task(i) def asynchronous():#非同步提交任務 g_l=[spawn(task,i) for i in range(10)] joinall(g_l) if __name__ == '__main__': print('Synchronous:') synchronous() print('Asynchronous:') asynchronous()gevent 同步和非同步
六、協程的應用
爬蟲
from gevent import monkey;monkey.patch_all() import gevent import requests import time def get_page(url): print('GET: %s' %url) response=requests.get(url) if response.status_code == 200: print('%d bytes received from %s' %(len(response.text),url))
if __name__ == '__main__':
start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.zhihu.com/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
七、I/O模型簡介(超哥IO模型)
Stevens在文章中一共比較了五種IO Model:
* blocking IO 阻塞IO
* nonblocking IO 非阻塞IO
* IO multiplexing IO多路複用
* signal driven IO 訊號驅動IO(不常見,不講)
* asynchronous IO 非同步IO
再說一下IO發生時涉及的物件和步驟。對於一個network IO (這裡我們以read、recv舉例),它會涉及到兩個系統物件,一個是呼叫這個IO的process (or thread),另一個就是系統核心(kernel)。當一個read/recv讀資料的操作發生時,該操作會經歷兩個階段:
#1)等待資料準備 (Waiting for the data to be ready) #2)將資料從核心拷貝到程序中(Copying the data from the kernel to the process)
#1、輸入操作:read、readv、recv、recvfrom、recvmsg共5個函式,如果會阻塞狀態,則會經歷wait data和copy data兩個階段,如果設定為非阻塞則在wait 不到data時丟擲異常 #2、輸出操作:write、writev、send、sendto、sendmsg共5個函式,在傳送緩衝區滿了會阻塞在原地,如果設定為非阻塞,則會丟擲異常 #3、接收外來連結:accept,與輸入操作類似 #4、發起外出連結:connect,與輸出操作類似網路常見的阻塞函式
(1)、阻塞I/O(blocking IO)
上圖分析:兩個阻塞階段
當用戶程序呼叫了recvfrom這個系統呼叫,kernel就開始了IO的第一個階段:準備資料。對於network io來說,很多時候資料在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的資料到來。
而在使用者程序這邊,整個程序會被阻塞。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才解除block的狀態,重新執行起來。
(2)非阻塞 IO (設定socket 變成non-blocking)
從圖中可以看出,當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error。從使用者程序角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是使用者就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次傳送read操作。一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上就將資料拷貝到了使用者記憶體(這一階段仍然是阻塞的),然後返回。
也就是說非阻塞的recvform系統呼叫呼叫之後,程序並沒有被阻塞,核心馬上返回給程序,如果資料還沒準備好,此時會返回一個error。程序在返回之後,可以乾點別的事情,然後再發起recvform系統呼叫。重複上面的過程,迴圈往復的進行recvform系統呼叫。這個過程通常被稱之為輪詢。輪詢檢查核心資料,直到資料準備好,再拷貝資料到程序,進行資料處理。需要注意,拷貝資料整個過程,程序仍然是屬於阻塞的狀態。
所以,在非阻塞式IO中,使用者程序其實是需要不斷的主動詢問kernel資料準備好了沒有。
# 服務端 import socket import time server=socket.socket() server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(('127.0.0.1',8083)) server.listen(5) server.setblocking(False) #設定不阻塞 r_list=[] #用來儲存所有來請求server端的conn連線 w_list={} #用來儲存所有已經有了請求資料的conn的請求資料 while 1: try: conn,addr=server.accept() #不阻塞,會報錯 r_list.append(conn) #為了將連線儲存起來,不然下次迴圈的時候,上一次的連線就沒有了 except BlockingIOError: # 強調強調強調:!!!非阻塞IO的精髓在於完全沒有阻塞!!! # time.sleep(0.5) # 開啟該行註釋純屬為了方便檢視效果 print('在做其他的事情') print('rlist: ',len(r_list)) print('wlist: ',len(w_list)) # 遍歷讀列表,依次取出套接字讀取內容 del_rlist=[] #用來儲存刪除的conn連線 for conn in r_list: try: data=conn.recv(1024) #不阻塞,會報錯 if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下資料 conn.close() del_rlist.append(conn) continue w_list[conn]=data.upper() except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收 continue except ConnectionResetError: # 當前套接字出異常,則關閉,然後加入刪除列表,等待被清除 conn.close() del_rlist.append(conn) # 遍歷寫列表,依次取出套接字傳送內容 del_wlist=[] for conn,data in w_list.items(): try: conn.send(data) del_wlist.append(conn) except BlockingIOError: continue # 清理無用的套接字,無需再監聽它們的IO操作 for conn in del_rlist: r_list.remove(conn) #del_rlist.clear() #清空列表中儲存的已經刪除的內容 for conn in del_wlist: w_list.pop(conn) #del_wlist.clear() #客戶端 import socket import os import time import threading client=socket.socket() client.connect(('127.0.0.1',8083)) while 1: res=('%s hello' %os.getpid()).encode('utf-8') client.send(res) data=client.recv(1024) print(data.decode('utf-8')) ##多執行緒的客戶端請求版本 # def func(): # sk = socket.socket() # sk.connect(('127.0.0.1',9000)) # sk.send(b'hello') # time.sleep(1) # print(sk.recv(1024)) # sk.close() # # for i in range(20): # threading.Thread(target=func).start()非阻塞IO示例
雖然我們上面的程式碼通過設定非阻塞,規避了IO操作,但是非阻塞IO模型絕不被推薦。
我們不能否定其優點:能夠在等待任務完成的時間裡幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在“”同時“”執行)。
(3)多路複用IO(IO multiplexing) (重點)
多路複用:採用了代理(select)模式,把所有的需要監控的物件傳遞給selct監控,select監控到了被監控物件有動作,就返回,執行相應的任務。
IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。它的流程如圖:
當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。
python中的select模組:
import select fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout]) 引數: 可接受四個引數(前三個必須) rlist: wait until ready for reading #等待讀的物件,你需要監聽的需要獲取資料的物件列表 wlist: wait until ready for writing #等待寫的物件,你需要寫一些內容的時候,input等等,也就是說我會迴圈他看看是否有需要傳送的訊息,如果有我取出這個物件的訊息併發送出去,一般用不到,這裡我們也給一個[]。 xlist: wait for an “exceptional condition” #等待異常的物件,一些額外的情況,一般用不到,但是必須傳,那麼我們就給他一個[]。 timeout: 超時時間 當超時時間 = n(正整數)時,那麼如果監聽的控制代碼均無任何變化,則select會阻塞n秒,之後返回三個空列表,如果監聽的控制代碼有變化,則直接執行。 返回值:三個列表與上面的三個引數列表是對應的 select方法用來監視檔案描述符(當檔案描述符條件不滿足時,select會阻塞),當某個檔案描述符狀態改變後,會返回三個列表 1、當引數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並新增到fd_r_list中 2、當引數2 序列中含有fd時,則將該序列中所有的fd新增到 fd_w_list中 3、當引數3 序列中的fd發生錯誤時,則將該發生錯誤的fd新增到 fd_e_list中 4、當超時時間為空,則select會一直阻塞,直到監聽的控制代碼發生變化
結論: select的優勢在於可以處理多個連線,不適用於單個連線
#服務端 from socket import * import select server = socket(AF_INET, SOCK_STREAM) server.bind(('127.0.0.1',8093)) server.listen(5) # 設定為非阻塞 server.setblocking(False) # 初始化將服務端socket物件加入監聽列表,後面還要動態新增一些conn連線物件,當accept的時候sk就有感應,當recv的時候conn就有動靜 rlist=[server,] rdata = {} #存放客戶端傳送過來的訊息 wlist=[] #等待寫物件 wdata={} #存放要返回給客戶端的訊息 print('預備!監聽!!!') count = 0 #寫著計數用的,為了看實驗效果用的,沒用 while True: # 開始 select 監聽,對rlist中的服務端server進行監聽,select函式阻塞程序,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手訊號,從而變得可讀,滿足select函式的“可讀”條件),被觸發的(有動靜的)套接字(伺服器套接字)返回給了rl這個返回值裡面; rl,wl,xl=select.select(rlist,wlist,[],0.5) print('%s 次數>>'%(count),wl) count = count + 1 # 對rl進行迴圈判斷是否有客戶端連線進來,當有客戶端連線進來時select將觸發 for sock in rl: # 判斷當前觸發的是不是socket物件, 當觸發的物件是socket物件時,說明有新客戶端accept連線進來了 if sock == server: # 接收客戶端的連線, 獲取客戶端物件和客戶端地址資訊 conn,addr=sock.accept() #把新的客戶端連線加入到監聽列表中,當客戶端的連線有接收訊息的時候,select將被觸發,會知道這個連線有動靜,有訊息,那麼返回給rl這個返回值列表裡面。 rlist.append(conn) else: # 由於客戶端連線進來時socket接收客戶端連線請求,將客戶端連線加入到了監聽列表中(rlist),客戶端傳送訊息的時候這個連線將觸發 # 所以判斷是否是客戶端連線物件觸發 try: data=sock.recv(1024) #沒有資料的時候,我們將這個連線關閉掉,並從監聽列表中移除 if not data: sock.close() rlist.remove(sock) continue print("received {0} from client {1}".format(data.decode(), sock)) #將接受到的客戶端的訊息儲存下來 rdata[sock] = data.decode() #將客戶端連線物件和這個物件接收到的訊息加工成返回訊息,並新增到wdata這個字典裡面 wdata[sock]=data.upper() #需要給這個客戶端回覆訊息的時候,我們將這個連線新增到wlist寫監聽列表中 wlist.append(sock) #如果這個連接出錯了,客戶端暴力斷開了(注意,我還沒有接收他的訊息,或者接收他的訊息的過程中出錯了) except Exception: #關閉這個連線 sock.close() #在監聽列表中將他移除,因為不管什麼原因,它畢竟是斷開了,沒必要再監聽它了 rlist.remove(sock) # 如果現在沒有客戶端請求連線,也沒有客戶端傳送訊息時,開始對傳送訊息列表進行處理,是否需要傳送訊息 for sock in wl: sock.send(wdata[sock]) wlist.remove(sock) wdata.pop(sock) # #將一次select監聽列表中有接收資料的conn物件所接收到的訊息列印一下 # for k,v in rdata.items(): # print(k,'發來的訊息是:',v) # #清空接收到的訊息 # rdata.clear() --------------------------------------- #客戶端 from socket import * client=socket(AF_INET,SOCK_STREAM) client.connect(('127.0.0.1',8093)) while True: msg=input('>>: ').strip() if not msg:continue client.send(msg.encode('utf-8')) data=client.recv(1024) print(data.decode('utf-8')) client.close()select 網路IO模型程式碼
select做得事情和第二階段的阻塞沒有關係,就是從核心態將資料拷貝到使用者態的阻塞,始終幫你做得監聽的工作,幫你節省了一些第一階段阻塞的時間。
IO多路複用的機制:
select機制: Windows、Linux
poll機制 : Linux #和lselect監聽機制一樣,但是對監聽列表裡面的數量沒有限制,select預設限制是1024個,但是他們兩個都是作業系統輪詢每一個被監聽的檔案描述符(如果數量很大,其實效率不太好),看是否有可讀操作。
epoll機制 : Linux #它的監聽機制和上面兩個不同,他給每一個監聽的物件綁定了一個回撥函式,你這個物件有訊息,那麼觸發回撥函式給使用者,使用者就進行系統呼叫來拷貝資料,並不是輪詢監聽所有的被監聽物件,這樣的效率高很多。