python全棧開發基礎【第二十五篇】死鎖,遞歸鎖,信號量,Event事件,線程Queue
一、死鎖現象與遞歸鎖
進程也是有死鎖的
所謂死鎖: 是指兩個或兩個以上的進程或線程在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,
它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的進程稱為死鎖進程,
如下就是死鎖
#死鎖現象 死鎖------------------- from threading import Thread,Lock,RLock import time mutexA = Lock() mutexB = Lock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘\033[33m%s 拿到A鎖 ‘%self.name) mutexB.acquire() print(‘\033[45%s 拿到B鎖 ‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘\033[33%s 拿到B鎖 ‘ % self.name) time.sleep(1) #睡一秒就是為了保證A鎖已經被別人那到了 mutexA.acquire() print(‘\033[45m%s 拿到B鎖 ‘ % self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t = MyThread() t.start() #一開啟就會去調用run方法
那麽怎麽解決死鎖現象呢?
解決方法,遞歸鎖:在Python中為了支持在同一線程中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變量,counter記錄了acquire的次數,從而使得資源可以被多次require。
直到一個線程所有的acquire都被release,其他的線程才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖
mutexA=mutexB=threading.RLock() #一個線程拿到鎖,counter加1,該線程內又碰到加鎖的情況,<br>則counter繼續加1,這期間所有其他線程都只能等待,等待該線程釋放所有鎖,即counter遞減到0為止
# 解決死鎖的方法--------------遞歸鎖 from threading import Thread,Lock,RLock import time mutexB = mutexA = RLock() class MyThread(Thread): def run(self): self.f1() self.f2() def f1(self): mutexA.acquire() print(‘\033[33m%s 拿到A鎖 ‘%self.name) mutexB.acquire() print(‘\033[45%s 拿到B鎖 ‘%self.name) mutexB.release() mutexA.release() def f2(self): mutexB.acquire() print(‘\033[33%s 拿到B鎖 ‘ % self.name) time.sleep(1) #睡一秒就是為了保證A鎖已經被別人拿到了 mutexA.acquire() print(‘\033[45m%s 拿到B鎖 ‘ % self.name) mutexA.release() mutexB.release() if __name__ == ‘__main__‘: for i in range(10): t = MyThread() t.start() #一開啟就會去調用run方法
二、信號量Semaphore(其實也是一把鎖)
Semaphore管理一個內置的計數器
Semaphore與進程池看起來類似,但是是完全不同的概念。
進程池:Pool(4),最大只能產生四個進程,而且從頭到尾都只是這四個進程,不會產生新的。
信號量:信號量是產生的一堆進程/線程,即產生了多個任務都去搶那一把鎖
# Semaphore舉例 from threading import Thread,Semaphore,currentThread import time,random sm = Semaphore(5) #運行的時候有5個人 def task(): sm.acquire() print(‘\033[42m %s上廁所‘%currentThread().getName()) time.sleep(random.randint(1,3)) print(‘\033[31m %s上完廁所走了‘%currentThread().getName()) sm.release() if __name__ == ‘__main__‘: for i in range(20): #開了10個線程 ,這20人都要上廁所 t = Thread(target=task) t.start()
三、Event
線程的一個關鍵特性是每個線程都是獨立運行且狀態不可預測。如果程序中的其 他線程需要通過判斷某個線程的狀態來確定自己下一步的操作,這時線程同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event對象。 對象包含一個可由線程設置的信號標誌,它允許線程等待某些事件的發生。在 初始情況下,Event對象中的信號標誌被設置為假。如果有線程等待一個Event對象, 而這個Event對象的標誌為假,那麽這個線程將會被一直阻塞直至該標誌為真。一個線程如果將一個Event對象的信號標誌設置為真,它將喚醒所有等待這個Event對象的線程。如果一個線程等待一個已經被設置為真的Event對象,那麽它將忽略這個事件, 繼續執行
rom threading import Event Event.isSet() #返回event的狀態值 Event.wait() #如果 event.isSet()==False將阻塞線程; Event.set() #設置event的狀態值為True,所有阻塞池的線程激活進入就緒狀態, 等待操作系統調度; Event.clear() #恢復
例1.,有多個工作線程嘗試鏈接MySQL,我們想要在鏈接前確保MySQL服務正常才讓那些工作線程去連接MySQL服務器,如果連接不成功,都會去嘗試重新連接。那麽我們就可以采用threading.Event機制來協調各個工作線程的連接操作
#首先定義兩個函數,一個是連接數據庫 # 一個是檢測數據庫 from threading import Thread,Event,currentThread import time e = Event() def conn_mysql(): ‘‘‘鏈接數據庫‘‘‘ count = 1 while not e.is_set(): #當沒有檢測到時候 if count >3: #如果嘗試次數大於3,就主動拋異常 raise ConnectionError(‘嘗試鏈接的次數過多‘) print(‘\033[45m%s 第%s次嘗試‘%(currentThread(),count)) e.wait(timeout=1) #等待檢測(裏面的參數是超時1秒) count+=1 print(‘\033[44m%s 開始鏈接...‘%(currentThread().getName())) def check_mysql(): ‘‘‘檢測數據庫‘‘‘ print(‘\033[42m%s 檢測mysql...‘ % (currentThread().getName())) time.sleep(5) e.set() if __name__ == ‘__main__‘: for i in range(3): #三個去鏈接 t = Thread(target=conn_mysql) t.start() t = Thread(target=check_mysql) t.start()
例2,紅綠燈的例子
from threading import Thread,Event,currentThread import time e = Event() def traffic_lights(): ‘‘‘紅綠燈‘‘‘ time.sleep(5) e.set() def car(): ‘‘‘車‘‘‘ print(‘\033[42m %s 等綠燈\033[0m‘%currentThread().getName()) e.wait() print(‘\033[44m %s 車開始通行‘ % currentThread().getName()) if __name__ == ‘__main__‘: for i in range(10): t = Thread(target=car) #10輛車 t.start() traffic_thread = Thread(target=traffic_lights) #一個紅綠燈 traffic_thread.start()
四、定時器(Timer)
指定n秒後執行某操作
from threading import Timer def func(n): print(‘hello,world‘,n) t = Timer(3,func,args=(123,)) #等待三秒後執行func函數,因為func函數有參數,那就再傳一個參數進去 t.start()
五、線程queue
queue隊列 :使用import queue,用法與進程Queue一樣
queue.
Queue
(maxsize=0) #先進先出
# 1.隊列----------- import queue q = queue.Queue(3) #先進先出 q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) print(q.get()) print(q.get()) print(q.get())
queue.
LifoQueue
(maxsize=0)#先進後出
# 2.堆棧---------- q = queue.LifoQueue() #先進後出(或者後進先出) q.put(‘first‘) q.put(‘second‘) q.put(‘third‘) q.put(‘for‘) print(q.get()) print(q.get()) print(q.get())
queue.
PriorityQueue
(maxsize=0) #存儲數據時可設置優先級的隊列
# 3.put進入一個元組,元組的第一個元素是優先級---------------- (通常也可以是數字,或者也可以是非數字之間的比較) 數字越小,優先級越高‘‘‘ q = queue.PriorityQueue() q.put((20,‘a‘)) q.put((10,‘b‘)) #先出來的是b,數字越小優先級越高嘛 q.put((30,‘c‘)) print(q.get()) print(q.get()) print(q.get())
六、多線程性能測試
1.多核也就是多個CPU
(1)cpu越多,提高的是計算的性能
(2)如果程序是IO操作的時候(多核和單核是一樣的),再多的cpu也沒有意義。
2.實現並發
第一種:一個進程下,開多個線程
第二種:開多個進程
3.多進程:
優點:可以利用多核
缺點:開銷大
4.多線程
優點:開銷小
缺點:不可以利用多核
5多進程和多進程的應用場景
1.計算密集型:也就是計算多,IO少
如果是計算密集型,就用多進程(如金融分析等)
2.IO密集型:也就是IO多,計算少
如果是IO密集型的,就用多線程(一般遇到的都是IO密集型的)
下例子練習:
# 計算密集型的要開啟多進程 from multiprocessing import Process from threading import Thread import time def work(): res = 0 for i in range(10000000): res+=i if __name__ == ‘__main__‘: l = [] start = time.time() for i in range(4): p = Process(target=work) #1.9371106624603271 #可以利用多核(也就是多個cpu) # p = Thread(target=work) #3.0401737689971924 l.append(p) p.start() for p in l: p.join() stop = time.time() print(‘%s‘%(stop-start))
# I/O密集型要開啟多線程 from multiprocessing import Process from threading import Thread import time def work(): time.sleep(3) if __name__ == ‘__main__‘: l = [] start = time.time() for i in range(400): # p = Process(target=work) #34.9549994468689 #因為開了好多進程,它的開銷大,花費的時間也就長了 p = Thread(target=work) #2.2151265144348145 #當開了多個線程的時候,它的開銷小,花費的時間也小了 l.append(p) p.start() for i in l : i.join() stop = time.time() print(‘%s‘%(stop-start))
python全棧開發基礎【第二十五篇】死鎖,遞歸鎖,信號量,Event事件,線程Queue