1. 程式人生 > >網絡編程進階:並發編程之協程、IO模型

網絡編程進階:並發編程之協程、IO模型

ror 異步 kernel nec 加鎖 處理 完全 OS com

協程:

基於單線程實現並發,即只用一個主線程(此時可利用的CPU只有一個)情況下實現並發; 並發的本質:切換+保存狀態

CPU正在運行一個任務,會在兩種情況下切走去執行其他任務(切換有操作系統強制控制),一種情況是該任務發生了阻塞,另一種是該任務計算的時間過長或有一個優先級更高的程序替代了它

技術分享圖片

在介紹進程理論時,提及進程的三種執行狀態,而線程才是執行單位,所以也可以將上圖理解為線程的三種狀態

如果多個任務都是純計算的,上圖的情況2並不能提升效率,因為只是讓CPU來回切,這樣看起來所有任務都被“同時”執行的效果,此時這種切換反而會降低效率;

yield本身就是一種在單線程下可以保存任務運行狀態的方法,其特點如下:

  1. yield可以保存狀態,yield的狀態保存於操作系統的保存線程狀態很像,但是yield是代碼級別控制的,更輕量級

  2. send可以把一個函數的結果傳遞給另外一個函數,以此實現單線程內程序之間的切換;yield並不能實現遇到io切換

在任務1遇到io情況下,切到另外一個任務去執行,這樣就可以利用任務1阻塞的時間完成其他任務的計算,效率的提升就在此處

對於單線程下,我們不可避免程序中出現io操作,如果我們能在自己的程序中(即用戶程序級別,而非操作系統級別)控制單線程下的多個任務能在一個任務遇到io阻塞時就切換到另一個任務去計算,這樣就保證了該線程能最大限度的處於就緒狀態,即隨時都可以被CPU執行的狀態,相當於我們在用戶程序級別將自己的io操作最大限度的隱藏起來,從而可以“迷惑”操作系統,讓其看到:該線程好像是一直在計算,io比較少,從而更多的將CPU的執行權限分配給我們的線程(程序執行效率高就是該軟件能夠過得更多的CPU執行權限)

協程的本質就是在單線程下,由用戶自己控制一個任務遇到io阻塞了就切換到另外一個任務去執行,以此提升效率;為了實現它,我們需要找尋一種可以同時滿足以下條件的解決方案:

  1. 可以控制多個任務之間的切換,切換之前將任務的狀態保存下來,以便重新運行時,可以基於暫停的位置繼續執行

  2. 作為1的補充:可以檢測io操作,在遇到io操作的情況下才發生切換

協程:是單線程下的並發,又稱微線程,纖程(Coroutine);協程是一種用戶態的輕量級纖程,即協程是用戶程序由自己控制調度的

需要註意的是:

  1. python的線程屬於內核級別的,即有操作系統控制調度(如單線程遇到io或執行時間過長就會被迫交出CPU執行權限,切換到其他線程運行)

  2. 單線程內開啟協程,一遇到io,就會從應用程序級別(而非操作系統級別)控制切換,以此來提升效率(註意:非io操作的切換與效率無關)

對比操作系統控制線程的切換,用戶在單線程內控制協程的切換:

優點:

  1. 協程的切換開銷更小,屬於程序級別的切換,操作系統完全感知不到,因而更加輕量級

  2. 單線程內就可以實現並發的效果,最大限度的利用CPU

缺點:

  1. 協程的本質還是單線程,無法利用多核優勢,可以是一個程序開啟多個進程,每個進程內開啟多個線程,每個線程內開啟協程;

  2. 協程指的是單個線程,因而一旦協程出現阻塞,將會阻塞整個線程

協程特點總結:

  1. 必須在只有一個單線程裏實現並發

  2. 修改共享數據不需要加鎖

  3. 用戶程序裏自己保存多個控制流的上下文棧

  4. 附加: 一個協程遇到io操作自動切換到其他協程(如何實現檢測IO,yield、greenlet都無法實現,就用到了gevent模塊(select機制))

協程greenlet:

from greenlet import greenlet
import time

def eat(name):
    print("%s is eating 1"%name)
    time.sleep(3)
    g2.switch("neo")   # g2 再去 switch
    print("%s is eating 2"%name)
    g2.switch("alex")  # 再 switch 到 play

def play(name):
    print("%s play 1"%name)
    g1.switch("neo")  # 再 switch 到 eat
    print("%s play 2"%name)

g1 = greenlet(eat)   # greenlet() 只能傳入函數名,不要傳函數參數
g2 = greenlet(play)

g1.switch("neo")  # 利用 g1.switch() 啟動程序  # 此時需要傳入函數的參數

"""
greenlet 並不能檢測到I/O阻塞
"""

運行結果:

技術分享圖片

gevent協程:

from gevent import monkey;monkey.patch_all()  # 合並成一行,專門用於打標記
import gevent
import time

def eat(name):
    print("%s is eating 1"%name)
    # gevent.sleep(3)  # gevent.sleep()和 time.sleep()效果一樣
    time.sleep(3)
    print("%s is eating 2"%name)

def play(name):
    print("%s play 1"%name)
    # gevent.sleep(4)
    time.sleep(4)
    print("%s play 2"%name)

start = time.time()

g1 = gevent.spawn(eat,"neo")   # 提交任務  #  spawn()第一個參數寫任務名,後面直接參數就行(位置參數或關鍵字參數都可以)
g2 = gevent.spawn(play,"alice")  # gevent.spawn()是異步提交任務

g1.join()
g2.join()   # 保證上面提交的兩個任務都執行完畢了  # 協程是單線程的,需要再線程結束前等待g1和g2,要不然g1和g2還沒起來,“主線程”就結束了,此時g1和g2也就不會再執行了
# g1.join()和g2.join()可以合並成:
# gevent.joinall([g1,g2])
"""
執行過程分析:
g1先起來,執行了第一個print,然後遇到了IO阻塞(gevent.sleep(3)),然後立馬就切到了 g2 提交的 play任務,
執行 play中的第一個print,接著又遇到了IO阻塞(gevent.sleep(4)),然後就又切到了 g1的eat任務,此時g1的eat還是處於阻塞狀態,接著就在兩個任務之間來回切
直到 g1的eat 又處於就緒狀態,打印 eat的第2個print;執行完 eat之後,g2的play還處於阻塞狀態,然後等其阻塞結束後執行 play的第2個print;
通過這種方式 gevent幫你監測了多個任務之間的IO阻塞(遇到IO阻塞就切走)
"""

stop = time.time()
print(stop-start)

"""
執行時間大約是阻塞最長的時間,如果這兩個任務改成串行,則執行時間就是 3+4 = 7秒多
"""

"""
gevent.sleep() 不等同於 time.sleep(),因為gevent只能識別它自己模擬的阻塞(所以,如果把gevent.sleep()改成了time.sleep(),則整個程序就還是串行)
如果想實現gevent識別所有的阻塞,就需要用到gevent模塊下的monkey,monkey下面有一個功能是 patch_all(),
monkey.patch_all()的作用是把下面凡是涉及到IO操作的都會幫你打一個標記,從而能被gevent識別
但凡需要gevent模塊監測一個IO操作,就需要在你整個文件的開頭導入 monkey,並且做一個 monkey.patch_all()的操作(一定要在整個文件的開頭寫,這樣文件下面多有的模塊也好、功能也好,都會被做上標記)
"""

運行結果:

技術分享圖片

gevent應用場景:單線程下多個任務是IO密集型(因為計算密集型gevent並不能提高效率)

基於gevent模塊實現並發的套接字通信

服務端:

from gevent import monkey,spawn;monkey.patch_all()
from socket import *

def comm(conn):
    while True:
        try:
            data = conn.recv(1024)
            if not data: break
            conn.send(data.upper())
        except ConnectionResetError:
            conn.close()
            break

def server(ip,port):
    server = socket(AF_INET,SOCK_STREAM)
    server.bind((ip,port))
    server.listen(5)

    while True:
        conn,client_addr = server.accept()
        spawn(comm,conn)
        # 這個地方就不需要再寫 .join(),因為程序能走到這一步說明server這個協程肯定已經起來了,這時線程就已經進入了while True死循環,那麽comm這個協程就肯定也能起來

    server.close()

if __name__ == "__main__":
    g = spawn(server,"127.0.0.1",8080)
    g.join()  # 這個地方需要寫 join(),從而保證server函數能真正起來

服務端:

from socket import *
from threading import Thread,current_thread


def client():
    client = socket(AF_INET,SOCK_STREAM)
    client.connect(("127.0.0.1",8080))
    while True:
        client.send(("hello %s"%current_thread().getName()).encode("utf-8"))

        data = client.recv(1024)
        print(data.decode("utf-8"))

    conn.close()

if __name__ == "__main__":
    for i in range(500):  # 模擬並發的效果
        t = Thread(target=client)
        t.start()  # 500個線程能很快起來

IO模型:

同步調用不等於阻塞(同步是:提交完後不管有沒有阻塞,不管是IO密集還是計算密集,我都在原地等著)

IO模型介紹:

為了更好的了解IO模型,需要先了解:同步(synchronous)IO、異步(asynchronous)IO、阻塞(blocking)IO、非阻塞(non-blocking)IO

以下討論的是linux環境下的netword IO,共5中IO Model:

1. blocking IO

2. nonblocking IO

3. IO multiplexing

4. signal driven IO

5. asynchronous IO

附:由於signal driven IO(信號驅動IO)在實際中並不常見,所以主要介紹其余四種IO Model

再說一下IO發生時涉及的對象和步驟。對於一個network IO(這裏我們以read為例),它會涉及到兩個系統對象,一個是調用這個IO 的process(or thread),另一個是系統內核(kernel)。當一個read操作發生時,該操作會經歷兩個階段:

1. 等待數據準備;

2. 將數據從內核拷貝到進程(線程)中

上述IO模型的區別就是在這兩個階段上各有不同

阻塞IO(blocking IO)

在Linux中,默認情況下所有的socket都是blocking,一個典型的讀操作流程如下:

技術分享圖片

blocking IO的特點就是在IO執行的兩個階段(等待數據和拷貝數據兩個階段)都被block了

所謂阻塞型IO是指系統調用(一般是IO接口)不返回調用結果並讓當前線程一直阻塞;只有當該系統調用獲得結果或者超時出錯時才會返回

除非特別指定,幾乎所有的IO接口(包括socket接口)都是阻塞型的

解決方法之多線程、多進程:

  在服務端使用多進程(多線程),讓每個鏈接都擁有獨立的進程(線程),這樣任何一個鏈接的阻塞都不會影響其他的鏈接;但該方法的問題是:在遇到要同時相應成百上千的鏈接請求時會嚴重占據系統資源,降低系統對外界響應效率,進程線程本身也容易進入假死狀態

改進方案之線程池、進程池:

  線程池(進程池)旨在減少創建和銷毀線程的頻率,維持一定合理數量的線程,並讓空閑的線程重新擔任起新的執行任務。“連接池”維持鏈接的緩存池,盡量重用自己的鏈接、減少創建和關閉鏈接的頻率。雖然“線程池”和連接池在一定程度上緩解了頻繁調用IO接口帶來的資源占用,但“池”使用有其上限,當請求大大超過上限時,“池”構成的系統對外界的響應並不比沒有池的效果好;所以“線程池”或“鏈接池”可以緩解部分壓力,但不能解決所有問題。

多線程模型可以高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,可以用非阻塞接口來嘗試解決這個問題

非阻塞IO

Linux下,可以通過設置socket使其變成non-blocking。 當對一個non-blocking socket執行讀操作時,流程如下:

技術分享圖片

可以看出,當用戶進程發出read操作時,如果kernel中的數據還沒有準備好,那麽它不會block用戶進程,而是立刻返回一個error。從用戶進程角度講,它發起一個read操作後,並不需要等待,而是馬上就得到了一個結果;用戶進程判斷結果是一個error時,它就知道數據還沒有準備好,於是用戶就可以在本次到下次再發起read詢問的時間間隔內做其他事情,或者直接再次發送read操作;一旦kernel中的數據準備好了,並且又再次收到了用戶進程的system call,那麽它馬上就將數據拷貝到了用戶內存(這一階段任然是阻塞的),然後返回。(註意:拷貝數據整個過程,進程仍然是屬於阻塞的狀態)

所以,在非阻塞IO中,用戶進程其實是需要不斷的主動詢問kernel數據準備好沒有

非阻塞IO示例:

# 服務端

#
socket套接字中的 accept,recv和send是阻塞IO,非阻塞IO模式中需要對這三個地方做處理 from socket import * server = socket(AF_INET,SOCK_STREAM) server.bind(("127.0.0.1",8080)) server.listen(5) server.setblocking(False) # Set the socket to blocking (flag is true) or non-blocking (false). # 設置socket是阻塞型還是非阻塞型;默認flag是True(阻塞型 conn_list = [] del_conn = [] while True: try: # 建鏈接 conn,client_addr = server.accept() conn_list.append(conn) print(conn_list) except BlockingIOError: # 如果程序在accept處遇到了IO阻塞,就執行 except try: # 接收數據、並準備發送數據的任務 send_list = [] for conn in conn_list: try: data = conn.recv(1024) # send也是IO,也有發生阻塞的可能性(例如發送數據過大但內存已不足),所以也需要解決這個阻塞問題 if not data: # linux 系統 del_conn.append(conn) continue send_list.append((conn,data.upper())) # 把要發送的數據及其相應的conn打包成元祖的形式添加到 send_list except BlockingIOError: # 如果某個conn在recv處遇到了IO阻塞,就跳過它繼續執行後面的conn continue except ConnectionResetError: # 如果某個conn當方面斷開了鏈接,需要把這個conn回收,並且把它從conn_list中remove掉 # conn.close() # 叠代對象在循環的過程中不能改變叠代對象的結構,所以此時不能把conn從conn_list中直接remove del_conn.append(conn) # 把這個conn添加到要刪除的列表中 # 發消息的任務 del_sent_list = [] for item in send_list: # 解決send的IO阻塞 try: conn = item[0] content = item[1] conn.send(content) del_sent_list.append(item) # 發送成功後,需要把這個元祖(conn,content)從 send_list中刪除 except BlockingIOError: # 如果沒發送成功 pass # 循環的最後一行代碼,可用pass代替continue # 刪除已發送的信息 for item in del_sent_list: send_list.remove(item) # 刪除當方面斷掉的客戶端連接 for conn in del_conn: # 此時從connlist中刪除這個conn conn_list.remove(conn) except: pass """ 服務端此時就一個線程, 實現了自己去監測程序中的IO,遇到IO就切到其他任務去運行,這與gevent實現的原理類似 """

# 客戶端
from socket import *

client = socket(AF_INET,SOCK_STREAM)
client.connect(("127.0.0.1",8080))

while True:
msg = input(">>>").strip()
if not msg:continue
client.send(msg.encode("utf-8"))

data = client.recv(1024)
print(data.decode("utf-8"))
 

但是非阻塞IO絕不推薦使用

原因: 1. 請求無法立即響應

    2. 客戶端沒有阻塞但卻在一直運行著,這就是一個死循環,程序一直處於就緒狀態,CPU占用率高,但程序卻不是一直在工作,而是在做大量無用的“詢問”,問操作系統“數據有沒有準備好”

多路復用IO(IO multiplexing)

IO multiplexing 也稱為 事件驅動IO(event driven IO);select/epoll的好處就在於單個process就可以同時處理多個網絡連接的IO。它的基本原理是select/epoll這個function會不斷的輪詢所負責的所有的socket,當某個socket有數據到大了,就通知用戶進程,如下圖所示:

技術分享圖片

當用戶進程調用了select,那麽整個進程會被block,同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據準備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從kernel拷貝到用戶進程。這個圖和blocking IO的圖其實沒有太大的不同,事實上還更差,因為這裏需要使用兩個系統調用(select 和recvfrom),而blocking IO只調用了一個系統調用(recvfrom)。但是,用select的優勢在於它可以同時處理多個connection。

在多路復用模型中,對於每一個socket,一般都設置成為 non-blocking,但是,如上圖所示,整個用戶的process其實一直被block的,只不過process是被select這個函數block,而不是被socket IO給block

結論: select的優勢在於處理多個連接,不適用於單個連接

select網絡IO模型示例(還沒完全理解):

import select
from socket import *

server = socket(AF_INET,SOCK_STREAM)
server.bind(("127.0.0.1",8080))
server.listen(5)
server.setblocking(False)

rlist = [server,]  # 用於存放收消息的套接字
w1ist = []        # 用於存放發消息的套接字
wdata = {}  # 用於存放將要發送的消息

while True:
    rl,wl,xl = select.select(rlist,w1ist,[],3)  # 監測select代理的套接字
    print("rl",rl)
    print("wl",wl)

    for sock in rl:
        if sock == server:
            conn,addr = sock.accept()
            rlist.append(conn)   # 把conn這個套接字添加到 rlist這個收消息的套接字列表中
        else:
            try: # 捕捉客戶端單方面斷開連接的異常
                data = sock.recv(1024)
                if not data:  # linux系統
                    sock.close()
                    rlist.remove(sock)
                    continue
                w1ist.append(sock)
                wdata[sock] = data.upper()
            except Exception:
                sock.close()
                rlist.remove(sock)

    for sock in w1ist:
        data = wdata[sock]
        sock.send(data)
        wdata.pop(sock)  # 發完之後要把消息從wdata中刪除
        w1ist.remove(sock)

server.close()

異步IO(Asynchronous IO):

Linux下的 asynchronous IO其實用的不多;它的流程如下:

技術分享圖片

用戶進程發起read操作之後,立刻就開始去做其他的事。另一方面,從kernel的角度,當它受到一個asynchronous read之後,首先它會立刻返回,所以不會對用戶進程產生任何block,然後,kernel會等待數據準備完成,然後將數據拷貝到用戶內存,當這一切都完成之後,kernel會給用戶進程發一個signal,告訴它read操作已經完成了

網絡編程進階:並發編程之協程、IO模型