併發程式設計之IO模型
一、阻塞IO(blocking IO)
from concurrent.futures import ThreadPoolExecutor import socket server = socket.socket() # 重用埠 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(("127.0.0.1",9999)) server.listen(5) # 執行緒池 pool = ThreadPoolExecutor(3) def data_handler(conn): print("一個新連線..") while True: data = conn.recv(1024) conn.send(data.upper()) while True: conn,addr = server.accept() # 切到處理資料的任務去執行 pool.submit(data_handler,conn) 執行緒池阻塞IO服務端
import socket c = socket.socket() c.connect(("127.0.0.1",9999)) while True: msg = input(">>>:") if not msg:continue c.send(msg.encode("utf-8")) data = c.recv(1024) print(data.decode("utf-8")) 執行緒池阻塞IO客服端
二、非阻塞IO(non-blocking IO)
from concurrent.futures import ThreadPoolExecutor import socket server = socket.socket() # 重用埠 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(("192.168.11.210",9999)) server.listen(5) # 設定是否為阻塞 預設阻塞 server.setblocking(False) def data_handler(conn): print("一個新連線..") while True: data = conn.recv(1024) conn.send(data.upper()) # 已連線的客戶端 clients = [] # 需要傳送的資料 send_datas = [] # 已經發送完的 需要刪除的資料 del_datas = [] # 待關閉的客戶端 closed_cs = [] import time while True: try: conn,addr = server.accept() # 切到處理資料的任務去執行 # 程式碼走到這裡才算是連線成功 # 把連線成功的客戶端存起來 clients.append(conn) except BlockingIOError: # print("沒有可以處理的連線 就幹別的活兒") #要處理的是已經連線成功的客戶端 # 接收資料 for c in clients: try: data = c.recv(1024) if not data: # 對方關閉了連線 c.close() # 從客戶端列表中刪除它 closed_cs.append(c) continue print("收到%s" % data.decode("utf-8")) # 現在非阻塞 send直接往快取賽 如果快取滿了 肯定有錯誤需要單獨處理髮送 # c.send(data.upper()) send_datas.append((c,data)) except BlockingIOError: pass except ConnectionResetError: # 對方關閉了連線 c.close() # 從客戶端列表中刪除它 closed_cs.append(c) # 處理髮送資料 for data in send_datas: try: data[0].send(data[1].upper()) # 傳送成功需要刪除 不能直接刪除 # send_datas.remove(data) del_datas.append(data) except BlockingIOError: continue except ConnectionResetError: # 客戶端連線需要刪除 data[0].close() closed_cs.append(data[0]) # 等待發送的資料需要刪除 del_datas.append(data) # 刪除無用的資料 for d in del_datas: #從待發送的列表中刪除 send_datas.remove(d) del_datas.clear() for c in closed_cs: clients.remove(c) closed_cs.clear() 伺服器
import socket c = socket.socket() c.connect(("127.0.0.1",9999)) while True: msg = input(">>>:") if not msg:continue c.send(msg.encode("utf-8")) data = c.recv(1024) print(data.decode("utf-8")) 客服端
li = [1,2,3,4,5,6] def mytlist_iter(): for i in range(len(li)): yield li[i] for j in mytlist_iter(): if j == 5: li.append(1000) d = {"a":1,"b":2} for k in d: if k == "a": d.pop(k) 迭代期間不能修改被迭代的物件
三、多路複用IO(IO multiplexing)
from concurrent.futures import ThreadPoolExecutor import socket import select # select 幫你從一堆連線中找出來需要被處理的連線 server = socket.socket() # 重用埠 server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1) server.bind(("192.168.11.210",9999)) server.listen(5) # 設定是否為阻塞 預設阻塞 # server.setblocking(False) def data_handler(conn): print("一個新連線..") while True: data = conn.recv(1024) conn.send(data.upper()) # 需要檢測的 是否可讀取的列表(recv就是一個讀取操作) rlist = [server,] # 需要檢測的 是否寫入的列表(send就是寫入操作) wlist = [] # 需要傳送的資料 目前是因為 我們要把接收的資料在發回去 所以搞了這個東西 正常沒有這種需求 # 目前客戶端與伺服器端 互動 是必須客戶端傳送資料 伺服器端才能返回資料正常沒有這種需求 dic = {} while True: # 用於檢測需要處理的連線 需要不斷檢測 所以迴圈 # rl目前可讀的客戶端列表wl目前可寫的客戶端列表 rl,wl,xl = select.select(rlist,wlist,[]) # select預設阻塞 阻塞到任意一個連線可以被處理 print(len(rl)) # 處理可讀的socket for c in rl: # 無論是客戶端還是伺服器只要可讀就會執行到這裡 if c == server: # 接收客戶端的連線請求 (一個讀操作) conn,addr = c.accept() # 將新連線也交給select來檢測 rlist.append(conn) else:# 不是伺服器 就是客戶端 客戶端可讀 可以執行recv try: data = c.recv(1024) if not data: c.close() rlist.remove(c) print("%s 傳送 %s" % (c,data.decode("utf-8"))) # 給客戶端傳送資料 前要保證目前可以傳送 將客戶端加入檢測列表 wlist.append(c)# 正常開發中 不可能必須客戶端傳送資料過來後 才能 給客戶端傳送 # 所以這個新增到檢測列表的操作 應該建立連線後立即執行 # 要傳送的資料 dic[c] = data except ConnectionResetError: # 客戶端關閉連線 c.close() rlist.remove(c) # 處理可寫的socket for c in wl: print(c) try: c.send(dic[c].upper()) # 刪除資料 dic.pop(c) # 從檢測列表中刪除已傳送完成的客戶端 wlist.remove(c) except ConnectionResetError: c.close() # 關閉連線 dic.pop(c) # 刪除要傳送的資料 wlist.remove(c) # 從待檢測的列表中刪除 except BlockingIOError:#可能快取滿了 發不了 pass 伺服器
import socket c = socket.socket() c.connect(("192.168.11.210",9999)) while True: msg = input(">>>:") if not msg:continue c.send(msg.encode("utf-8")) data = c.recv(1024) print(data.decode("utf-8")) 客戶端
四、非同步IO(Asynchronous I/O)
import asyncio asyncio.coroutine() from concurrent.futures importThreadPoolExecutor def task(): print("read start") with open(r"D:\python視訊存放目錄\上海python全棧4期\day40\多路複用,降低CPU佔用\伺服器.py",encoding="utf-8") as f: text = f.read() # f.write() print("read end") return text def fin(f): print("fin") print(f.result()) pool = ThreadPoolExecutor(1) future = pool.submit(task) future.add_done_callback(fin) print("主 over") # 這種方式看起來像是非同步IO 但是對於子執行緒而言不是 # 在子執行緒中 執行read 是阻塞的 以為CPU必須切走 但是不能保證切到當前程式的其他執行緒 # 想要的效果就是 在執行read 是不阻塞 還能幹其他活誰能實現只有協程 # asyncio 內部是使用的是協程 模擬非同步IO
conclusion:
網路 IO模型
一、阻塞IO模型
多執行緒 多程序 執行緒池 程序池 全是阻塞IO
二、非阻塞IO
協程是一種非阻塞IO
1.setblocking(False) 將阻塞修改為非阻塞
2.一旦是非阻塞 在執行accept recv send 就會立馬嘗試讀寫資料 一旦資料沒準備好就拋異常
3.捕獲異常
4.如果沒有異常說明資料準備好了 直接處理
5.捕獲到異常 那就做別的事情
可以實現單執行緒併發的效果 會大量佔用CPU資源
三、多路複用
將所有連線交給select來管理 管什麼? 管哪個連線可以被處理
作為處理任務的一方事情變少了 不需要重複不斷的問作業系統拿資料 而是等待select返回需要處理的連線,等待則意味著select是阻塞的
非同步IO 不僅僅指網路IO 也包括本地IO
非阻塞IO 和 多路複用 解決都是網路IO的阻塞問題
本地IO 可以通過子執行緒 或子程序 來避免阻塞 但是對子執行緒或子程序而言 依然會阻塞
最終的解決方案就是協程 asyncio 該模快實現非同步IO 內部使用協程實現