1. 程式人生 > >python 協程、I/O模型

python 協程、I/O模型

一、引子 (超哥協程

  併發本質:儲存狀態+切換

  cpu正在執行一個任務,轉而執行另一個任務的情概況:1、是該任務發生了阻塞;2、該任務計算的時間過長或有一個優先順序更高的程式替代了它。

  協程本質上就是一個執行緒,使用程式碼來控制任務的切換。以前執行緒任務的切換是由作業系統控制的,遇到I/O自動切換,現在我們用協程的目的就是較少作業系統切換的開銷(開關執行緒,建立暫存器、堆疊等,在他們之間進行切換等),在我們自己的程式裡面來控制任務的切換。

  

  ps:在介紹程序理論時,提及程序的三種執行狀態,而執行緒才是執行單位,所以也可以將上圖理解為執行緒的三種狀態   

  一:其中第二種情況並不能提升效率,只是為了讓cpu能夠雨露均沾,實現看起來所有任務都被“同時”執行的效果,如果多個任務都是純計算的,這種切換反而會降低效率。

 

二、yield 模擬單執行緒之間的任務切換過程

#1 yiled可以儲存狀態,yield的狀態儲存與作業系統的儲存執行緒狀態很像,但是yield是程式碼級別控制的,更輕量級
#2 send可以把一個函式的結果傳給另外一個函式,以此實現單執行緒內程式之間的切換  

 

import time

def jishi1hao():
    for  i in range(6):
        
print("快快來~~~~") yield #yield 可以記錄任務的執行狀態 time.sleep(1) print("%s號客戶2s搞定"%i) def jishi2hao(): g=jishi1hao() #獲取到生成器 g.__next__() #執行第一段函式,到第一個yield結束 for i in range(5): time.sleep(1) print("%s號技師正在忙!!!"%i) g.__next__() #繼續到下一個yield jishi2hao()
#使用yield可以實現程式的切換執行,但是不能提高效率,只是簡單的程式分段穿插執行
通過yield實現任務切換+儲存現場

  注意:yield可以實現任務的交替執行,但是不能提高程式的執行效率,反而降低了執行效率(切換過程需要時間)。

 

#基於yield併發執行,多工之間來回切換,這就是個簡單的協程的體現,但是他能夠節省I/O時間嗎?不能
import time
def consumer():
    '''任務1:接收資料,處理資料'''
    while True:
        x=yield
        time.sleep(1) #發現什麼?只是進行了切換,但是並沒有節省I/O時間
        print('處理了資料:',x)
def producer():
    '''任務2:生產資料'''
    g=consumer()
    next(g)  #找到了consumer函式的yield位置
    for i in range(3):
        g.send(i)  #給yield傳值,然後再迴圈給下一個yield傳值,並且多了切換的程式,比直接序列執行還多了一些步驟,導致執行效率反而更低了。
        print('傳送了資料:',i)
start=time.time()
#基於yield儲存狀態,實現兩個任務直接來回切換,即併發的效果
#PS:如果每個任務中都加上列印,那麼明顯地看到兩個任務的列印是你一次我一次,即併發執行的.
producer() #我在當前執行緒中只執行了這個函式,但是通過這個函式裡面的send切換了另外一個任務
stop=time.time()

# 序列執行的方式
s_t=time.time()
res=producer()
consumer()
e_t=time.time()
print("yield的時間>>>",stop-start)
print("序列的時間>>>",e_t-s_t)

#結果顯示
#yield的時間>>> 3.0015313625335693
#序列的時間>>> 3.0011236667633057
單純的切換反而會降低執行速度

  對於單執行緒下,我們不可避免程式中出現io操作,但如果我們能在自己的程式中(即使用者程式級別,而非作業系統級別)控制單執行緒下的多個任務能在一個任務遇到io阻塞時就切換到另外一個任務去計算,這樣就保證了該執行緒能夠最大限度地處於就緒態,即隨時都可以被cpu執行的狀態,相當於我們在使用者程式級別將自己的io操作最大限度地隱藏起來,從而可以迷惑作業系統,讓其看到:該執行緒好像是一直在計算,io比較少,從而更多的將cpu的執行許可權分配給我們的執行緒。

  協程的本質就是在單執行緒下,由使用者自己控制一個任務遇到io阻塞了就切換另外一個任務去執行,以此來提升效率。為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:

#1、可以檢測io操作,在遇到io操作的情況下才發生切換
#2. 可以控制多個任務之間的切換,切換之前將任務的狀態儲存下來,以便重新執行時,可以基於暫停的位置繼續執行。

 

三、協程介紹

  協程:是單執行緒下的併發,又稱微執行緒,纖程。英文名Coroutine。一句話說明什麼是執行緒:協程是一種使用者態的輕量級執行緒,即協程是由使用者程式自己控制排程的。

  需要強調的是:

#1. python的執行緒屬於核心級別的,即由作業系統控制排程(如單執行緒遇到io或執行時間過長就會被迫交出cpu執行許可權,切換其他執行緒執行)
#2. 單執行緒內開啟協程,一旦遇到io,就會從應用程式級別(而非作業系統)控制切換,以此來提升效率(!!!非io操作的切換與效率無關)

  協程優點:

#1. 協程的切換開銷更小,屬於程式級別的切換,作業系統完全感知不到,因而更加輕量級
#2. 單執行緒內就可以實現併發的效果,最大限度地利用cpu

  協程缺點

#1. 協程的本質是單執行緒下,無法利用多核,可以是一個程式開啟多個程序,每個程序內開啟多個執行緒,每個執行緒內開啟協程
#2. 協程指的是單個執行緒,因而一旦協程出現阻塞,將會阻塞整個執行緒

  

總結協程特點:

  1. 必須在只有一個單執行緒裡實現併發
  2. 修改共享資料不需加鎖
  3. 使用者程式裡自己儲存多個控制流的上下文棧
  4. 附加:一個協程遇到IO操作自動切換到其它協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模組(select機制)

四、Greenlet

  

  如果我們在單個執行緒內有20個任務,要想實現在多個任務之間切換,使用yield生成器的方式過於麻煩(需要先得到初始化一次的生成器,然後再呼叫send。。。非常麻煩),而使用greenlet模組可以非常簡單地實現這20個任務直接的切換

  #真正的協程模組就是使用greenlet完成的切換
from greenlet import greenlet

def eat(name):
    print('%s eat 1' %name)  #2
    g2.switch('taibai')   #3
    print('%s eat 2' %name) #6
    g2.switch() #7
def play(name):
    print('%s play 1' %name) #4
    g1.switch()      #5
    print('%s play 2' %name) #8

g1=greenlet(eat)
g2=greenlet(play)

g1.switch('taibai')#可以在第一次switch時傳入引數,以後都不需要  1
greenlet實現協程(不能提高效率)

  單純的切換(在沒有io的情況下或者沒有重複開闢記憶體空間的操作),反而會降低程式的執行速度。

  greenlet只是提供了一種比generator更加便捷的切換方式,當切到一個任務執行時如果遇到io,那就原地阻塞,仍然是沒有解決遇到IO自動切換來提升效率的問題。

 

五、Gevent介紹

  Gevent 是一個第三方庫,可以輕鬆通過gevent實現併發同步或非同步程式設計,在gevent中用到的主要模式是Greenlet, 它是以C擴充套件模組形式接入Python的輕量級協程。 Greenlet全部執行在主程式作業系統程序的內部,但它們被協作式地排程。

  

#用法
g1=gevent.spawn(func,1,2,3,x=4,y=5)建立一個協程物件g1,spawn括號內第一個引數是函式名,如eat,後面可以有多個引數,可以是位置實參或關鍵字實參,都是傳給函式eat的,spawn是非同步提交任務

g2=gevent.spawn(func2)

g1.join() #等待g1結束,上面只是建立協程物件,這個join才是去執行

g2.join() #等待g2結束  有人測試的時候會發現,不寫第二個join也能執行g2,是的,協程幫你切換執行了,但是你會發現,如果g2裡面的任務執行的時間長,但是不寫join的話,就不會執行完等到g2剩下的任務了


gevent.joinall([g1,g2]) #等待列表中的所有任務執行完畢

g1.value#拿到func1的返回值
import gevent
import time

def func1(i):
    print("func1開始")
    gevent.sleep(2)
    print("1111>>>",i)

def func2(i):
    print("func2開始")
    gevent.sleep(2)
    print("2222>>>", i)


if __name__ == '__main__':
    s_t=time.time()
    g1=gevent.spawn(func1,"a")
    g2=gevent.spawn(func2,"b")
    g1.join()
    g2.join()
    e_t=time.time()
    print("gevent耗時>>>",e_t-s_t)
    print("主任務結束")
gevent方法使用和時間測試

 

  from gevent import monkey;monkey.patch_all( )必須放在檔案開頭,表示拾取檔案中的所有的I/O操作。

from gevent import monkey;monkey.patch_all() #必須寫在最上面,這句話後面的所有阻塞全部能夠識別了

import gevent  #直接匯入即可
import time
def eat():
    #print()  
    print('eat food 1')
    time.sleep(2)  #加上mokey就能夠識別到time模組的sleep了
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)  #來回切換,直到一個I/O的時間結束,這裡都是我們個gevent做得,不再是控制不了的作業系統了。
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play_phone)
gevent.joinall([g1,g2])
print('')
monkey使用示例

  gevent中的同步與非同步效率對比

from gevent import spawn,joinall,monkey;monkey.patch_all()

import time
def task(pid):
    """
    Some non-deterministic task
    """
    time.sleep(0.5)
    print('Task %s done' % pid)


def synchronous():#同步提交任務,序列,一次出來一個
    for i in range(10):
        task(i)

def asynchronous():#非同步提交任務
    g_l=[spawn(task,i) for i in range(10)]
    joinall(g_l)

if __name__ == '__main__':
    print('Synchronous:')
    synchronous()

    print('Asynchronous:')
    asynchronous()
gevent 同步和非同步

 

六、協程的應用

  爬蟲

from gevent import monkey;monkey.patch_all()
import gevent
import requests
import time

def get_page(url):
    print('GET: %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        print('%d bytes received from %s' %(len(response.text),url))
if __name__ == '__main__':

start_time=time.time()
gevent.joinall([
gevent.spawn(get_page,'https://www.zhihu.com/'),
gevent.spawn(get_page,'https://www.yahoo.com/'),
gevent.spawn(get_page,'https://github.com/'),
])
stop_time=time.time()
print('run time is %s' %(stop_time-start_time))
 

 

七、I/O模型簡介(超哥IO模型

  Stevens在文章中一共比較了五種IO Model:
  * blocking IO          阻塞IO
  * nonblocking IO    非阻塞IO
  * IO multiplexing    IO多路複用
  * signal driven IO   訊號驅動IO(不常見,不講)
  * asynchronous IO 非同步IO

   再說一下IO發生時涉及的物件和步驟。對於一個network IO (這裡我們以read、recv舉例),它會涉及到兩個系統物件,一個是呼叫這個IO的process (or thread),另一個就是系統核心(kernel)。當一個read/recv讀資料的操作發生時,該操作會經歷兩個階段:

#1)等待資料準備 (Waiting for the data to be ready)
#2)將資料從核心拷貝到程序中(Copying the data from the kernel to the process)
#1、輸入操作:read、readv、recv、recvfrom、recvmsg共5個函式,如果會阻塞狀態,則會經歷wait data和copy data兩個階段,如果設定為非阻塞則在wait 不到data時丟擲異常

#2、輸出操作:write、writev、send、sendto、sendmsg共5個函式,在傳送緩衝區滿了會阻塞在原地,如果設定為非阻塞,則會丟擲異常

#3、接收外來連結:accept,與輸入操作類似

#4、發起外出連結:connect,與輸出操作類似
網路常見的阻塞函式

  (1)、阻塞I/O(blocking IO)

  上圖分析:兩個阻塞階段

  

  當用戶程序呼叫了recvfrom這個系統呼叫,kernel就開始了IO的第一個階段:準備資料。對於network io來說,很多時候資料在一開始還沒有到達(比如,還沒有收到一個完整的UDP包),這個時候kernel就要等待足夠的資料到來。

  而在使用者程序這邊,整個程序會被阻塞。當kernel一直等到資料準備好了,它就會將資料從kernel中拷貝到使用者記憶體,然後kernel返回結果,使用者程序才解除block的狀態,重新執行起來。

  

 

  (2)非阻塞 IO (設定socket 變成non-blocking)

   從圖中可以看出,當用戶程序發出read操作時,如果kernel中的資料還沒有準備好,那麼它並不會block使用者程序,而是立刻返回一個error。從使用者程序角度講 ,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果。使用者程序判斷結果是一個error時,它就知道資料還沒有準備好,於是使用者就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次傳送read操作。一旦kernel中的資料準備好了,並且又再次收到了使用者程序的system call,那麼它馬上就將資料拷貝到了使用者記憶體(這一階段仍然是阻塞的),然後返回。

  也就是說非阻塞的recvform系統呼叫呼叫之後,程序並沒有被阻塞,核心馬上返回給程序,如果資料還沒準備好,此時會返回一個error。程序在返回之後,可以乾點別的事情,然後再發起recvform系統呼叫。重複上面的過程,迴圈往復的進行recvform系統呼叫。這個過程通常被稱之為輪詢。輪詢檢查核心資料,直到資料準備好,再拷貝資料到程序,進行資料處理。需要注意,拷貝資料整個過程,程序仍然是屬於阻塞的狀態。

  所以,在非阻塞式IO中,使用者程序其實是需要不斷的主動詢問kernel資料準備好了沒有。

# 服務端
import socket
import time

server=socket.socket()
server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
server.bind(('127.0.0.1',8083))
server.listen(5)

server.setblocking(False) #設定不阻塞
r_list=[]  #用來儲存所有來請求server端的conn連線
w_list={}  #用來儲存所有已經有了請求資料的conn的請求資料

while 1:
    try:
        conn,addr=server.accept() #不阻塞,會報錯
        r_list.append(conn)  #為了將連線儲存起來,不然下次迴圈的時候,上一次的連線就沒有了
    except BlockingIOError:
        # 強調強調強調:!!!非阻塞IO的精髓在於完全沒有阻塞!!!
        # time.sleep(0.5) # 開啟該行註釋純屬為了方便檢視效果
        print('在做其他的事情')
        print('rlist: ',len(r_list))
        print('wlist: ',len(w_list))


        # 遍歷讀列表,依次取出套接字讀取內容
        del_rlist=[] #用來儲存刪除的conn連線
        for conn in r_list:
            try:
                data=conn.recv(1024) #不阻塞,會報錯
                if not data: #當一個客戶端暴力關閉的時候,會一直接收b'',別忘了判斷一下資料
                    conn.close()
                    del_rlist.append(conn)
                    continue
                w_list[conn]=data.upper()
            except BlockingIOError: # 沒有收成功,則繼續檢索下一個套接字的接收
                continue
            except ConnectionResetError: # 當前套接字出異常,則關閉,然後加入刪除列表,等待被清除
                conn.close()
                del_rlist.append(conn)


        # 遍歷寫列表,依次取出套接字傳送內容
        del_wlist=[]
        for conn,data in w_list.items():
            try:
                conn.send(data)
                del_wlist.append(conn)
            except BlockingIOError:
                continue


        # 清理無用的套接字,無需再監聽它們的IO操作
        for conn in del_rlist:
            r_list.remove(conn)
        #del_rlist.clear() #清空列表中儲存的已經刪除的內容
        for conn in del_wlist:
            w_list.pop(conn)
        #del_wlist.clear()

#客戶端
import socket
import os
import time
import threading
client=socket.socket()
client.connect(('127.0.0.1',8083))

while 1:
    res=('%s hello' %os.getpid()).encode('utf-8')
    client.send(res)
    data=client.recv(1024)

    print(data.decode('utf-8'))


##多執行緒的客戶端請求版本
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(20):
#     threading.Thread(target=func).start()
非阻塞IO示例

  雖然我們上面的程式碼通過設定非阻塞,規避了IO操作,但是非阻塞IO模型絕不被推薦。

  我們不能否定其優點:能夠在等待任務完成的時間裡幹其他活了(包括提交其他任務,也就是 “後臺” 可以有多個任務在“”同時“”執行)。

 

  (3)多路複用IO(IO multiplexing) (重點)

    多路複用:採用了代理(select)模式,把所有的需要監控的物件傳遞給selct監控,select監控到了被監控物件有動作,就返回,執行相應的任務。

  IO multiplexing這個詞可能有點陌生,但是如果我說select/epoll,大概就都能明白了。有些地方也稱這種IO方式為事件驅動IO(event driven IO)。我們都知道,select/epoll的好處就在於單個process就可以同時處理多個網路連線的IO。它的基本原理就是select/epoll這個function會不斷的輪詢所負責的所有socket,當某個socket有資料到達了,就通知使用者程序。它的流程如圖:

  當用戶程序呼叫了select,那麼整個程序會被block,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的資料準備好了,select就會返回。這個時候使用者程序再呼叫read操作,將資料從kernel拷貝到使用者程序。

   python中的select模組: 

import select

fd_r_list, fd_w_list, fd_e_list = select.select(rlist, wlist, xlist, [timeout])

引數: 可接受四個引數(前三個必須)
    rlist: wait until ready for reading  #等待讀的物件,你需要監聽的需要獲取資料的物件列表
    wlist: wait until ready for writing  #等待寫的物件,你需要寫一些內容的時候,input等等,也就是說我會迴圈他看看是否有需要傳送的訊息,如果有我取出這個物件的訊息併發送出去,一般用不到,這裡我們也給一個[]。
    xlist: wait for an “exceptional condition”  #等待異常的物件,一些額外的情況,一般用不到,但是必須傳,那麼我們就給他一個[]。
    timeout: 超時時間
    當超時時間 = n(正整數)時,那麼如果監聽的控制代碼均無任何變化,則select會阻塞n秒,之後返回三個空列表,如果監聽的控制代碼有變化,則直接執行。
返回值:三個列表與上面的三個引數列表是對應的
  select方法用來監視檔案描述符(當檔案描述符條件不滿足時,select會阻塞),當某個檔案描述符狀態改變後,會返回三個列表
    1、當引數1 序列中的fd滿足“可讀”條件時,則獲取發生變化的fd並新增到fd_r_list中
    2、當引數2 序列中含有fd時,則將該序列中所有的fd新增到 fd_w_list中
    3、當引數3 序列中的fd發生錯誤時,則將該發生錯誤的fd新增到 fd_e_list中
    4、當超時時間為空,則select會一直阻塞,直到監聽的控制代碼發生變化

  結論: select的優勢在於可以處理多個連線,不適用於單個連線  

#服務端
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1',8093))
server.listen(5)
# 設定為非阻塞
server.setblocking(False)

# 初始化將服務端socket物件加入監聽列表,後面還要動態新增一些conn連線物件,當accept的時候sk就有感應,當recv的時候conn就有動靜
rlist=[server,]
rdata = {}  #存放客戶端傳送過來的訊息

wlist=[]  #等待寫物件
wdata={}  #存放要返回給客戶端的訊息

print('預備!監聽!!!')
count = 0 #寫著計數用的,為了看實驗效果用的,沒用
while True:
    # 開始 select 監聽,對rlist中的服務端server進行監聽,select函式阻塞程序,直到rlist中的套接字被觸發(在此例中,套接字接收到客戶端發來的握手訊號,從而變得可讀,滿足select函式的“可讀”條件),被觸發的(有動靜的)套接字(伺服器套接字)返回給了rl這個返回值裡面;
    rl,wl,xl=select.select(rlist,wlist,[],0.5)
    print('%s 次數>>'%(count),wl)
    count = count + 1
    # 對rl進行迴圈判斷是否有客戶端連線進來,當有客戶端連線進來時select將觸發
    for sock in rl:
        # 判斷當前觸發的是不是socket物件, 當觸發的物件是socket物件時,說明有新客戶端accept連線進來了
        if sock == server:
            # 接收客戶端的連線, 獲取客戶端物件和客戶端地址資訊
            conn,addr=sock.accept()
            #把新的客戶端連線加入到監聽列表中,當客戶端的連線有接收訊息的時候,select將被觸發,會知道這個連線有動靜,有訊息,那麼返回給rl這個返回值列表裡面。
            rlist.append(conn)
        else:
            # 由於客戶端連線進來時socket接收客戶端連線請求,將客戶端連線加入到了監聽列表中(rlist),客戶端傳送訊息的時候這個連線將觸發
            # 所以判斷是否是客戶端連線物件觸發
            try:
                data=sock.recv(1024)
                #沒有資料的時候,我們將這個連線關閉掉,並從監聽列表中移除
                if not data:
                    sock.close()
                    rlist.remove(sock)
                    continue
                print("received {0} from client {1}".format(data.decode(), sock))
                #將接受到的客戶端的訊息儲存下來
                rdata[sock] = data.decode()

                #將客戶端連線物件和這個物件接收到的訊息加工成返回訊息,並新增到wdata這個字典裡面
                wdata[sock]=data.upper()
                #需要給這個客戶端回覆訊息的時候,我們將這個連線新增到wlist寫監聽列表中
                wlist.append(sock)
            #如果這個連接出錯了,客戶端暴力斷開了(注意,我還沒有接收他的訊息,或者接收他的訊息的過程中出錯了)
            except Exception:
                #關閉這個連線
                sock.close()
                #在監聽列表中將他移除,因為不管什麼原因,它畢竟是斷開了,沒必要再監聽它了
                rlist.remove(sock)
    # 如果現在沒有客戶端請求連線,也沒有客戶端傳送訊息時,開始對傳送訊息列表進行處理,是否需要傳送訊息
    for sock in wl:
        sock.send(wdata[sock])
        wlist.remove(sock)
        wdata.pop(sock)

    # #將一次select監聽列表中有接收資料的conn物件所接收到的訊息列印一下
    # for k,v in rdata.items():
    #     print(k,'發來的訊息是:',v)
    # #清空接收到的訊息
    # rdata.clear()

---------------------------------------
#客戶端
from socket import *

client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8093))


while True:
    msg=input('>>: ').strip()
    if not msg:continue
    client.send(msg.encode('utf-8'))
    data=client.recv(1024)
    print(data.decode('utf-8'))

client.close()
select 網路IO模型程式碼

  

select做得事情和第二階段的阻塞沒有關係,就是從核心態將資料拷貝到使用者態的阻塞,始終幫你做得監聽的工作,幫你節省了一些第一階段阻塞的時間。

   IO多路複用的機制:

    select機制: Windows、Linux

    poll機制    : Linux    #和lselect監聽機制一樣,但是對監聽列表裡面的數量沒有限制,select預設限制是1024個,但是他們兩個都是作業系統輪詢每一個被監聽的檔案描述符(如果數量很大,其實效率不太好),看是否有可讀操作。

    epoll機制  : Linux    #它的監聽機制和上面兩個不同,他給每一個監聽的物件綁定了一個回撥函式,你這個物件有訊息,那麼觸發回撥函式給使用者,使用者就進行系統呼叫來拷貝資料,並不是輪詢監聽所有的被監聽物件,這樣的效率高很多。