1. 程式人生 > >day36 GIL鎖與執行緒池

day36 GIL鎖與執行緒池

多程序與多執行緒效率對比

#
# """ # # 計算密集型 # """  # from threading import Thread # from multiprocessing import Process # import time # # a = 1 # def task(): # global a # for i in range(10000000): # a +=1 # a * 10 / 2 - 3 # # s = time.time() # #多執行緒 # # t1 = Thread(target=task) # # t2 = Thread(target=task) # # t3 = Thread(target=task) # # if __name__ == '__main__': # # # 多程序 # t1 = Process(target=task) # t2 = Process(target=task) # t3 = Process(target=task) # t1.start() # t2.start() # t3.start() # # t1.join() # t2.join() # t3.join() # # print(time.time() - s) # """ IO型任務 """ from threading import Thread from multiprocessing import Process import time def task(): # for i in range(10): with open(r"D:\脫產5期內容\day34\視訊\1.執行緒理論.mp4",mode="rb")as f: while True: data = f.read(1024) if not data: break s = time.time() if __name__ == '__main__': # 多執行緒 t1 = Thread(target=task) t2 = Thread(target=task) t3 = Thread(target=task) # 多程序 # t1 = Process(target=task) # t2 = Process(target=task) # t3 = Process(target=task) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() print(time.time() - s)

GIL鎖

什麼是GIL
    全域性直譯器鎖,是加在直譯器上的互斥鎖
    只存在於cpython直譯器中
    
    python的記憶體回收管理機制,簡稱GC

GIL的加鎖與解鎖時機

加鎖的時機:在呼叫直譯器時立即加鎖
解鎖時機:
    當前程序遇到IO或超時

為什麼需要GIL
    由於cpython的記憶體管理是非執行緒安全的,於是cpython就給直譯器加了個鎖,解決了安全問題,但是降低了效率,雖然有解決方  案,但是由於牽涉太多,一旦被修改,很多以前的基於GIL的程式都需要修改,所以變成了歷史遺留問題


GIL帶來的問題
    即使在多核處理器情況下,也無法真正的並行
    先有多執行緒模組,有這個問題,所以後來有了多程序模組彌補這個問題

總結:
    1.在單核情況下,無論是IO密集型還是計算密集,HIL都不會產生影響
    2.如果是多核下,IO密集型會受到GIl的影響,但是很明顯IO速度比計算速度慢
    3.IO密集型多執行緒, 因為多執行緒開銷小,節省資源,對於計算密集型,應該使用多程序,因為在cpytho中多執行緒是無法並行的

GIL與執行緒鎖的區別

from threading import  Thread,Lock
import time

lock = Lock() a = 0 def task(): global a lock.acquire() temp = a time.sleep(0.01) a = temp + 1 lock.release() ts = [] for i in range(10): t1 = Thread(target=task) t1.start() ts.append(t1) for i in ts: i.join() print(a) GIL使用用於保護直譯器相關的資料,直譯器也是一段程式,肯定有其定義各種資料 GIL並不能保證自己定義的資料的安全,所有一旦 多核cpu中,程序可以並行,執行緒不能並行

多執行緒TCP

客戶端
from threading import Thread
import socket

c = socket.socket()
c.connect(("127.0.0.1",8989))

def send_msg():
    while True:
        msg = input(">>>:") if not msg: continue c.send(msg.encode("utf-8")) send_t = Thread(target=send_msg) send_t.start() while True: try: data = c.recv(1024) print(data.decode("utf-8")) except: c.close() break 伺服器端 from concurrent.futures import ThreadPoolExecutor from threading import Thread import socket  server = socket.socket() server.bind(("127.0.0.1",8989)) server.listen()  pool = ThreadPoolExecutor(3) def task(client): while True: try: data = client.recv(1024) if not data: client.close() break client.send(data.upper()) except Exception: client.close() break while True: client,addr = server.accept() # t = Thread(target=task,args=(client,)) # t.start() pool.submit(task,client)

執行緒池與程序池

池為容器,本質上就是一個儲存程序或執行緒的列表

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
from  threading import active_count,current_thread
import os,time
# 建立執行緒池 指定最大執行緒數為3  如果不指定 預設為CPU核心數 * 5
# pool = ThreadPoolExecutor(3)# 不會立即開啟子執行緒 # # print(active_count()) # # def task(): # print("%s running.." % current_thread().name) # time.sleep(1) # # #提交任務到執行緒池 # for i in range(10): # pool.submit(task) #  # 建立程序池 最大程序數為3 預設為cpu個數 pool = ProcessPoolExecutor(3)# 不會立即開啟子程序  # time.sleep(10) def task(): print("%s running.." % os.getpid()) time.sleep(1) if __name__ == '__main__': # #提交任務到程序池 for i in range(10): pool.submit(task) # 第一次提交任務時會建立程序 ,後續再提交任務,直接交給以及存在的程序來完成,如果沒有空閒程序就等待  # 與訊號量的區別 ,訊號量也是一種鎖 適用於保證同一時間能有多少個程序或執行緒訪問 # 而執行緒/程序池,沒有對資料訪問進行限制僅僅是控制數量

同步與非同步

"""
阻塞 非阻塞
程式遇到了IO操作,無法繼續執行程式碼,叫做阻塞
程式沒有遇到IO操作,正常執行中,就叫非阻塞
它們指的是程式的狀態

    就緒  執行  阻塞

就緒和阻塞給人的感覺就是卡主了


同步 非同步
同步(呼叫/執行/任務/提交),發起任務後必須等待任務結束,拿到一個結果才能繼續執行
非同步                     發起任務後不需要關係任務的執行過程,可以繼續往下執行

非同步效率高於同步
但是並不是所有任務都可以非同步執行,判斷一個任務是否可以非同步的條件是,任務發起方是否立即需要執行結果


同步不等於阻塞  非同步不等於非阻塞
當使用非同步方式發起任務時 任務中可能包含io操作  非同步也可能阻塞
同步提交任務 也會卡主程式 但是不等同阻塞,因為任務中可能在做一對計算任務,CPU沒走
"""

# 使用執行緒池 來執行非同步任務

from concurrent.futures import ThreadPoolExecutor import time pool = ThreadPoolExecutor() def task(i): time.sleep(1) print("sub thread run..") i += 100 return i fs = [] for i in range(10): f = pool.submit(task,i) # submit就是一非同步的方式提交任務 # print(f) # print(f.result()) # result是阻塞的 會等到這任務執行完成才繼續執行 ,會非同步變成同步 fs.append(f) # 是一個阻塞函式,會等到池子中所有任務完成後繼續執行 pool.shutdown(wait=True) # pool.submit(task,1) # 注意 在shutdown之後 就不能提交新任務了 for i in fs: print(i.result()) print("over")

使用執行緒池 來發起非同步任務

阻塞函式
pool.shutdown()

同步不是阻塞,非同步也不完全是非阻塞,也有可能阻塞