1. 程式人生 > >python並發學習總結

python並發學習總結

也會 不定 啟動 enc 兩個 star 嘗試 到你 target

目錄

  • 一、理解操作系統
  • 二、任務類型
  • 三、Socket模塊
  • 四、一個簡單的C/S程序
  • 五、使用阻塞IO實現並發
    • 方案一:阻塞IO+多進程
    • 方案二:阻塞IO+多線程
    • 阻塞IO模型的思考和總結
  • 六、使用非阻塞IO實現並發
    • 方案一:非阻塞IO+Try+輪詢
    • 方案二:非阻塞IO+Select代理輪詢
      • select函數接口說明
      • 關於輪詢效率的思考
    • 方案三:非阻塞IO+Selectors+回調函數+事件循環(待後續補充)
    • 方案四:非阻塞IO+協程+回調函數+事件循環(待後續補充)
    • 非阻塞IO的思考和總結(待後續補充)
  • 七、關於同步/異步,阻塞IO/非阻塞IO的區別和思考

一、理解操作系統

操作系統(OS

)統管了計算機的所有硬件,並負責為應用程序分配和回收硬件資源。
硬件資源總是有限的,而應用程序對資源的欲望都是貪婪的。
當多個應用程序發生硬件資源爭奪時,OS負責出面調度,保證多任務的資源分配以保證系統穩定執行。
只有CPU可以執行代碼,所以應用程序(任務)執行前,必須申請到CPU資源,同一時刻,一個CPU只能執行一個任務代碼。
計算機的CPU數量(資源方)遠遠小於需要執行的任務數(需求方),操作系統將CPU的資源按照時間片劃分,並根據任務類型分配,各任務輪流使用CPU
CPU的執行/切換速度非常快,對於用戶而言,多任務看上去就像同時執行一樣,此稱為並發。

如下是串行和並發的對比:
技術分享圖片


計算機的內存、硬盤、網卡、屏幕、鍵盤等硬件提供了數據交換的場所。
OS

提供了IO接口以實現數據交換,數據交換的過程一般不需要CPU的參與。
IO接口有兩種類型:
1、阻塞型IO
發生IO(數據交換)的時候,調用線程無法向下執行剩余代碼,意圖占用CPU但不執行任何代碼,單線程阻塞型IO自身無法支持並發
2、非阻塞型IO
發生IO(數據交換)的時候,調用線程可以向下執行剩余代碼,單線程非阻塞型IO自身可以支持並發

如下是阻塞型IO和非阻塞型IO的對比:
技術分享圖片

二、任務類型

根據一個任務執行期間占用CPU的比例來劃分,有兩種類型:
1、CPU密集型
絕大部分時間都是占用CPU並執行代碼,比如科學計算任務
2、IO密集型
絕大部分時間都未占用CPU,而是在發生IO操作,比如網絡服務

三、Socket模塊

OS提供了阻塞IO和非阻塞IO兩種類型的接口,應用程序可以自行選擇。
Socket模塊封裝了兩種接口,Socket模塊提供的函數默認是阻塞IO類型。
用戶可以選擇手工切換至非阻塞IO類型,使用socketobj.setblocking(False)切換至非阻塞IO模式。
下面將通過一個簡單的例子程序來記錄對並發的學習思考及總結。

四、一個簡單的C/S程序

客戶端:循環接收用戶的輸入,並發送給服務器。從服務器接收反饋並打印至屏幕。
服務器:將接收到的用戶輸入,變成大寫並返回給客戶端。

客戶端代碼固定,主要思考服務器端的代碼。
一般我們會這樣寫服務端代碼:

# 服務器端
import socket

addr = (‘127.0.0.1‘, 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.listen(5)
print(‘監聽中...‘)

while True:  # 鏈接循環
    conn, client = server.accept()
    print(f‘一個客戶端上線 -> {client}‘)

    while True:  # 消息循環
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode(‘utf-8‘)}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f‘客戶端丟失,原因是: {why}‘)
            break

    conn.close()

客戶端代碼保持不變:

# 客戶端
import socket

addr = (‘127.0.0.1‘, 8080)
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(addr)
print(f‘服務器{addr}連接成功‘)

while True:  # 消息循環
    inp = input(‘>>>‘).strip()
    if not inp: continue

    try:
        client.send(inp.encode(‘utf-8‘))
        response = client.recv(1024)
        print(response.decode(‘utf-8‘))

    except ConnectionResetError as why:
        print(f‘服務端丟失,原因是: {why}‘)
        break

client.close()

這種形式的編碼我稱為:單線程+阻塞IO+循環串行,有如下幾個特點:
1、編碼簡單,模型簡潔,可讀性強
2、串行提供服務,用戶使用服務器必須一個一個排隊

單一線程的阻塞IO模型是無法支持並發的,如果要支持並發,有如下兩類解決方案。

五、使用阻塞IO實現並發

單線程阻塞IO,本質上是無法實現並發的。因為一旦發生IO阻塞,線程就會阻塞,下方代碼不會繼續執行。如果要使用單線程阻塞IO來實現並發,需要增加線程數目或者進程數目,當某一個線程/進程發生阻塞的時候,由OS調度至另一個線程/進程執行。


方案一:阻塞IO+多進程

服務器端代碼
import socket
from multiprocessing import Process

def task(conn):
    """通信循環處理函數"""

    while True:
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode(‘utf-8‘)}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f‘客戶端丟失,原因是: {why}‘)
            break

if __name__ == ‘__main__‘:  # windows下需要把新建進程寫到main中,不然會報錯
    addr = (‘127.0.0.1‘, 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.listen(5)
    print(‘監聽中...‘)

    while True:
        conn, client = server.accept()
        print(f‘一個客戶端上線 -> {client}‘)

        p = Process(target=task, args=(conn,))  # 開啟子進程處理與用戶的消息循環
        p.start()

將服務器對用戶的消息循環操作封裝到進程中,單進程依然會發生阻塞
進程之間的調度交由OS負責(重要)
進程太重,創建和銷毀進程都需要比較大的開銷,此外,一臺設備所能涵蓋的進程數量非常有限(一般就幾百左右)。
進程之間的切換開銷也不小。
當進程數小於等於CPU核心數的時候,可以實現真正的並行,當進程數大於CPU核心的時候,依然以並發執行。


方案二:阻塞IO+多線程

服務器端代碼
import socket
from threading import Thread

def task(conn):
    """通信循環處理函數"""

    while True:
        try:
            request = conn.recv(1024)
            if not request:
                break
            print(f"request: {request.decode(‘utf-8‘)}")
            conn.send(request.upper())

        except ConnectionResetError as why:
            print(f‘客戶端丟失,原因是: {why}‘)
            break

if __name__ == ‘__main__‘:
    addr = (‘127.0.0.1‘, 8080)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(addr)
    server.listen(5)
    print(‘監聽中...‘)

    while True:
        conn, client = server.accept()
        print(f‘一個客戶端上線 -> {client}‘)

        t = Thread(target=task, args=(conn,))  # 啟動多線程處理與用戶的消息循環
        t.start()

將服務器對用戶的操作封裝到線程中,單線程中依然會發生IO阻塞。
線程之間的調度交由OS負責(重要)。
線程較輕,創建和銷毀的開銷都比較小,但是線程數量也不會太大,一臺設備一般能容納幾百至上千的線程。
註意:因為CPython的GIL的存在,使用CPython編寫的多線程代碼,只能使用一個CPU核心,換句話說,使用官方的解釋器執行Python多線程代碼,無法並行(單進程中)。
線程之間的切換開銷比較小。
實際上,多線程的最大問題並不是並發數太少,而是數據安全問題。
線程之間共享同一進程的數據,在頻繁發生IO操作的過程中,難免需要修改共享數據,這就需要增加額外的處理,當線程數量大量增加時,如何妥善處理數據安全的問題就會變成主要困難。


阻塞IO模型的思考和總結

1、多線程和多進程都是基於阻塞IO模式提供的並發,兩者編程模型比較簡單,可讀性也很高。
2、如果使用多線程/進程的方案來提供並發,當線程/進程數量不斷增大時,系統穩定性將會下降。雖然可以使用線程/進程池來提供一定的優化,但超過一定數量之後,池子發揮的效果也會越來越小。所以,兩者都無法支持超大規模的並發(如C10M及以上)。
3、線程/進程切換都交由OS調度,調度策略依據OS的算法,應用程序無法主動控制,無法針對任務的特性做一些必要的調度算法調整。
4、編碼思維直接、易理解,學習曲線平緩。
5、多線程/進程的方案可以理解為單純的增加資源,如果要想支持超大規模的並發,單純的增加資源的行為並不合理(資源不可能無限或者總得考慮成本以及效率,而且數量越大,原有的缺點就會越凸顯)。
6、另一種解決方案的核心思路是:改變IO模型。

六、使用非阻塞IO實現並發

單線程非阻塞IO模型,本身就直接支持並發,為啥?請回頭看看阻塞IO和非阻塞IO的流程圖片。
非阻塞IO接口的核心是:調用線程一旦向OS發起IO調用,OS就直接返回結果,因此,調用線程不會被阻塞而可以執行下方代碼。不過也正因為不會阻塞,調用線程無法判斷立即返回的結果是不是期望結果,所以調用線程需要增加額外的操作對返回結果進行判斷,正因為這一點,就增加了編程難度(增加的難度可不是一點啊)。

對立即返回的結果進行判斷的方案有兩種:

  1. 輪詢
    線程定期/不定期主動發起查詢和判斷
  2. 回調函數+事件循環
    線程在發起IO時註冊回調函數,然後統一處理事件循環

註意:非阻塞IO實現並發有多種解決方案,編程模型的可讀性都不高,有些方案的編程思維甚至晦澀、難以理解、且編碼困難。


方案一:非阻塞IO+Try+輪詢

服務器端代碼
import socket

addr = (‘127.0.0.1‘, 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print(‘監聽中...‘)

# 需要執行接收的conn對象放入此列表
recv_list = []

# 需要發送數據的conn對象和數據放入此列表
send_list = []

# 執行鏈接循環
while True:
    try:
        conn, client = server.accept()
        # 執行成功,說明返回值是conn,client
        print(f‘一個客戶端上線 -> {client}‘)
        # 將成功鏈接的conn放入列表,當accept發生錯誤的時候執行conn的消息接收操作
        recv_list.append(conn)

    except BlockingIOError:
        # 執行accept不成功,意味著當前未有任何連接
        # 在下一次執行accept之前,可以執行其他的任務(消息接收操作)

        # 無法對處於遍歷期間的接收列表執行remove操作,使用臨時列表存儲需要刪除的conn對象
        del_recv_list = []

        # 對已經成功鏈接的conn列表執行接收操作
        for conn in recv_list:
            # 對每一個conn對象,執行recv獲取request
            try:
                # recv也是非阻塞
                request = conn.recv(1024)
                # 執行成功,就要處理request
                if not request:
                    # 當前conn鏈接已經失效
                    conn.close()
                    # 不再接收此conn鏈接的消息,將失效conn加入刪除列表
                    del_recv_list.append(conn)
                    # 當前conn處理完畢,切換下一個
                    continue
                # request有消息,處理,然後需要加入發送列表中
                response = request.upper()
                # 發送列表需要存放元組,發送conn和發送的數據
                send_list.append((conn, response))

            except BlockingIOError:
                # 當前conn的數據還沒有準備好,處理下一個conn
                continue
            except ConnectionResetError:
                # 當前conn失效,不再接收此conn消息
                conn.close()
                del_recv_list.append(conn)

        # 無法處理發送列表遍歷期間的remove,使用臨時列表
        del_send_list = []

        # 接收列表全部處理完畢,準備處理發送列表
        for item in send_list:
            conn = item[0]
            response = item[1]

            # 執行發送
            try:
                conn.send(response)
                # 發送成功,就應該從發送列表中移除此項目
                del_send_list.append(item)

            except BlockingIOError:
                # 發送緩沖區有可能已經滿了,留待下次發送處理
                continue
            except ConnectionResetError:
                # 鏈接失效
                conn.close()
                del_recv_list.append(conn)
                del_send_list.append(item)

        # 刪除接收列表中已經失效的conn對象
        for conn in del_recv_list:
            recv_list.remove(conn)

        # 刪除發送列表中已經發送或者不需要發送的對象
        for item in del_send_list:
            send_list.remove(item)

服務器使用單線程實現了並發。
對於accept接收到的多個conn對象,加入列表,並通過遍歷讀取列表、發送列表來提供多用戶訪問。

單線程中的Socket模塊提供的IO函數都被設置成:非阻塞IO類型。
增加了額外操作:對非阻塞調用立即返回的結果,使用了Try來判斷是否為期望值。
因為不知道何時返回的結果是期望值,所以需要不停的發起調用,並通過Try來判斷,即,輪詢。
兩次輪詢期間,線程可以執行其他任務。但是模型中也只是不停的發起輪詢,並沒有利用好這些時間。

編碼模型復雜,難理解。

優化:此模型中的主動輪詢的工作由程序負責,其實可以交由OS代為操作。這樣的話,應用程序就不需要編寫輪詢的部分,可以更聚焦於業務邏輯(upper()的部分),Python提供了Select模塊以處理應用程序的輪詢工作。


方案二:非阻塞IO+Select代理輪詢

服務器端代碼
import socket
import select

addr = (‘127.0.0.1‘, 8080)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(addr)
server.setblocking(False)
server.listen(5)
print(‘監聽中...‘)

# 最開始的server對象需要被監聽,一旦可讀,說明可以執行accept
read_list = [server,]

# 需要監聽的寫列表,一旦wl中可寫對象處理完send,應該將它也從此列表中刪除
write_list = []

# 用於臨時存放某一個sock對象需要發送的數據
data_dic = {}

# 不停的發起select查詢
while True:

    # 發起select查詢,嘗試得到可以操作的socket對象
    rl, wl, xl = select.select(read_list, write_list, [], 1)

    # 操作可讀列表
    for sock in rl:
        # 如果可讀列表中的對象是server,意味著有鏈接,則server可執行accept
        if sock is server:
            # 執行accept一定不會報錯,所以不需要try
            conn, client = sock.accept()
            # 一旦獲得conn,就需要將此conn加入可讀列表
            read_list.append(conn)
        else:
            # 說明可讀的對象是普通的conn對象,執行recv時要處理鏈接失效問題
            try:
                request = sock.recv(1024)

            except (ConnectionResetError, ConnectionAbortedError):
                # 此鏈接失效
                sock.close()
                read_list.remove(sock)
            else:
                # 還需要繼續判斷request的內容
                if not request:
                    # 說明此conn鏈接失效
                    sock.close()
                    # 不再監控此conn
                    read_list.remove(sock)
                    continue
                # 處理請求
                response = request.upper()
                # 加入發送列表
                write_list.append(sock)
                # 保存發送的數據
                data_dic[sock] = response

    # 操作可寫列表
    for sock in wl:
        # 執行發送操作,send也會出錯
        try:
            sock.send(data_dic[sock])
            # 發送完畢後,需要移除發送列表
            write_list.remove(sock)
            # 需要移除發送數據
            data_dic.pop(sock)

        except (ConnectionResetError, ConnectionAbortedError):
            # 此鏈接失效
            sock.close()
            read_list.remove(sock)
            write_list.remove(sock)

服務器使用單線程實現了並發。
使用了Select模塊之後,應用程序不再需要編寫主動輪詢的代碼,而是將此部分工作交由Select模塊的select函數代為處理。
應用程序只需要遍歷select函數返回的可操作socket列表,並處理相關業務邏輯即可。
雖然應用程序將輪詢工作甩給了select,自己不用編寫代碼。不過select函數的底層接口效率不高,使用epoll接口可以提升效率,此接口被封裝在Selectors模塊中。
此外,select函數是一個阻塞IO,在並發數很少的時候,線程大部分時間會阻塞在select函數上。所以select函數應該適用於隨時隨刻都有socket準備好、大規模並發的場景。
編碼困難,模型難理解。


select函數接口說明


def select(rlist, wlist, xlist, timeout=None): # real signature unknown; restored from __doc__
    """
    select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist)
    
    Wait until one or more file descriptors are ready for some kind of I/O.
    The first three arguments are sequences of file descriptors to be waited for:
    rlist -- wait until ready for reading
    wlist -- wait until ready for writing
    xlist -- wait for an ``exceptional condition‘‘
    If only one kind of condition is required, pass [] for the other lists.
    A file descriptor is either a socket or file object, or a small integer
    gotten from a fileno() method call on one of those.
    
    The optional 4th argument specifies a timeout in seconds; it may be
    a floating point number to specify fractions of seconds.  If it is absent
    or None, the call will never time out.
    
    The return value is a tuple of three lists corresponding to the first three
    arguments; each contains the subset of the corresponding file descriptors
    that are ready.
    
    *** IMPORTANT NOTICE ***
    On Windows, only sockets are supported; on Unix, all file
    descriptors can be used.
    """
    pass
  1. 輸入4個參數(3位置,1默認),返回3個值
  2. select函數是阻塞IO,函數的返回必須等到至少1個文件描述符準備就緒
  3. 位置參數rlist/wlist/xlist分為是:需要監控的讀列表/寫列表/例外列表(第3參數暫不理解)
  4. windows下,列表中只能放socket對象,unix下,可以放任何文件描述符
  5. 第4參數如果是None(默認),則會永久阻塞,否則按照給定的值(單位是秒)發生超時,可以使用小數如0.5秒
  6. 返回值是3個列表,裏面涵蓋的是可以操作的文件描述符對象

關於輪詢效率的思考

輪詢操作,效率不高。
輪詢的工作視角是:發起者定期/不定期主動發起詢問,如果數據沒有準備好,就繼續發起詢問。如果數據準備好了,發起者就處理這些數據。
假設,調用者在第35次主動輪詢的時候發現數據準備好了,那麽意味著前34次主動輪詢的操作是沒有任何收益的。
調用者要想知道數據是否就緒,就要主動詢問,而主動詢問的效率又比較低。
這個矛盾的核心關鍵在於:如何得知數據準備就緒這件事呢?

使用回調函數+事件循環
此種方案中,調用者不會主動發起輪詢,而是被動的等待IO操作完成,並由OS向調用者發起準備就緒的事件通知。

方案三:非阻塞IO+Selectors+回調函數+事件循環(待後續補充)

pass

方案四:非阻塞IO+協程+回調函數+事件循環(待後續補充)

pass

非阻塞IO的思考和總結(待後續補充)

  1. 如果將一個IO密集型任務的IO模型設置為非阻塞,則此任務類型將會從IO密集型逐漸轉變為CPU密集型。
  2. 非阻塞IO的編程模型比較困難,可讀性較差,模型理解困難
    pass

七、關於同步/異步,阻塞IO/非阻塞IO的區別和思考

  1. 阻塞IO和非阻塞IO指的是OS提供的兩種IO接口,區別在於調用時是否立即返回。
  2. 同步和異步指的是兩個任務之間的執行模型
    同步:兩個任務關聯性大,任務相互依賴,對任務執行的前後順序有一定要求
    異步:兩個任務關聯性小,任務可以相互獨立,任務執行順序沒有要求
  3. 網上有很多關於同步阻塞、同步非阻塞、異步阻塞、異步非阻塞的各種理解,站在不同的角度,理解都不一樣。我覺得應該把同步/異步劃為一類,用於描述任務執行模型,而把阻塞/非阻塞IO劃為一類,用於描述IO調用模型。

如下是我根據網上的各種解釋,結合自己的思考給出的一個關於同步/異步簡單的例子:

  1. 同步
    第一天,晚飯時間到了,你餓了,你走到你老婆面前說:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
    你跟著老婆走到廚房,你老婆花了30分鐘的時間給你做飯。這期間,你就站在身邊,啥也不幹,就這樣註視著她,你老婆問你:你站這幹嘛?你說:我要等你做完飯再走。30分鐘後,你吃到了晚飯。

  2. 異步+輪詢
    第二天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
    你老婆花了30分鐘的時間給你做飯,但是你不再跟著你老婆走到廚房。這期間,你在客廳看電視,不過你實在餓得不行了,於是你每過5分鐘,就跑到廚房詢問:老婆,飯做好了沒?你老婆回答:還要一會。30分鐘後,你吃到了晚飯。

  3. 異步+事件通知
    第三天,晚飯時間到了,你餓了,你大喊:老婆,我餓了,快點做飯!你老婆回答:好的,我去做飯。
    你老婆花了30分鐘的時間給你做飯,你也不再跟著你老婆走到廚房。這期間,你在客廳看電視,你知道你老婆在做飯,你也不會去催她,專心看電視。30分鐘後,你老婆喊你:飯做好了。最後你吃到了晚飯。

python並發學習總結