一、死鎖現象與遞迴鎖

程序也是有死鎖的

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,

它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,

如下就是死鎖

複製程式碼
 1 死鎖-------------------
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexA = Lock()
 5 mutexB = Lock()
 6 class MyThread(Thread):
 7     def run(self): 8  self.f1() 9  self.f2() 10 def f1(self): 11  mutexA.acquire() 12 print('\033[33m%s 拿到A鎖 '%self.name) 13  mutexB.acquire() 14 print('\033[45%s 拿到B鎖 '%self.name) 15  mutexB.release() 16  mutexA.release() 17 def f2(self): 18  mutexB.acquire() 19 print('\033[33%s 拿到B鎖 ' % self.name) 20 time.sleep(1) #睡一秒就是為了保證A鎖已經被別人那到了 21  mutexA.acquire() 22 print('\033[45m%s 拿到B鎖 ' % self.name) 23  mutexA.release() 24  mutexB.release() 25 if __name__ == '__main__': 26 for i in range(10): 27 t = MyThread() 28 t.start() #一開啟就會去呼叫run方法
複製程式碼

那麼怎麼解決死鎖現象呢?

解決方法,遞迴鎖:在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。

直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖

?
1
mutexA = mutexB = threading.RLock()  #一個執行緒拿到鎖,counter加1,該執行緒內又碰到加鎖的情況,<br>則counter繼續加1,這期間所有其他執行緒都只能等待,等待該執行緒釋放所有鎖,即counter遞減到0為止
複製程式碼
 1 # 2.解決死鎖的方法--------------遞迴鎖
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexB = mutexA = RLock()
 5 class MyThread(Thread):
 6     def run(self):
 7  self.f1() 8  self.f2() 9 def f1(self): 10  mutexA.acquire() 11 print('\033[33m%s 拿到A鎖 '%self.name) 12  mutexB.acquire() 13 print('\033[45%s 拿到B鎖 '%self.name) 14  mutexB.release() 15  mutexA.release() 16 def f2(self): 17  mutexB.acquire() 18 print('\033[33%s 拿到B鎖 ' % self.name) 19 time.sleep(1) #睡一秒就是為了保證A鎖已經被別人拿到了 20  mutexA.acquire() 21 print('\033[45m%s 拿到B鎖 ' % self.name) 22  mutexA.release() 23  mutexB.release() 24 if __name__ == '__main__': 25 for i in range(10): 26 t = MyThread() 27 t.start() #一開啟就會去呼叫run方法
複製程式碼

二、訊號量Semaphore(其實也是一把鎖)

Semaphore管理一個內建的計數器

Semaphore與程序池看起來類似,但是是完全不同的概念。

程序池:Pool(4),最大隻能產生四個程序,而且從頭到尾都只是這四個程序,不會產生新的。

訊號量:訊號量是產生的一堆程序/執行緒,即產生了多個任務都去搶那一把鎖

複製程式碼
 1 from threading import Thread,Semaphore,currentThread
 2 import time,random
 3 sm = Semaphore(5) #執行的時候有5個人
 4 def task():
 5     sm.acquire()
 6     print('\033[42m %s上廁所'%currentThread().getName())
 7     time.sleep(random.randint(1,3)) 8 print('\033[31m %s上完廁所走了'%currentThread().getName()) 9  sm.release() 10 if __name__ == '__main__': 11 for i in range(20): #開了10個執行緒 ,這20人都要上廁所 12 t = Thread(target=task) 13 t.start()
複製程式碼
複製程式碼
 1 hread-1上廁所
 2  Thread-2上廁所
 3  Thread-3上廁所
 4  Thread-4上廁所
 5  Thread-5上廁所
 6  Thread-3上完廁所走了 7 Thread-6上廁所 8 Thread-1上完廁所走了 9 Thread-7上廁所 10 Thread-2上完廁所走了 11 Thread-8上廁所 12 Thread-6上完廁所走了 13 Thread-5上完廁所走了 14 Thread-4上完廁所走了 15 Thread-9上廁所 16 Thread-10上廁所 17 Thread-11上廁所 18 Thread-9上完廁所走了 19 Thread-12上廁所 20 Thread-7上完廁所走了 21 Thread-13上廁所 22 Thread-10上完廁所走了 23 Thread-8上完廁所走了 24 Thread-14上廁所 25 Thread-15上廁所 26 Thread-12上完廁所走了 27 Thread-11上完廁所走了 28 Thread-16上廁所 29 Thread-17上廁所 30 Thread-14上完廁所走了 31 Thread-15上完廁所走了 32 Thread-17上完廁所走了 33 Thread-18上廁所 34 Thread-19上廁所 35 Thread-20上廁所 36 Thread-13上完廁所走了 37 Thread-20上完廁所走了 38 Thread-16上完廁所走了 39 Thread-18上完廁所走了 40 Thread-19上完廁所走了
複製程式碼

三、Event

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

?
1
2
3
4
5
from  threading  import  Event
Event.isSet()  #返回event的狀態值
Event.wait()  #如果 event.isSet()==False將阻塞執行緒;
Event. set ()  #設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
Event.clear()  #恢復

例如1.,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作

複製程式碼
 1 #首先定義兩個函式,一個是連線資料庫
 2 # 一個是檢測資料庫
 3 from threading import Thread,Event,currentThread
 4 import time
 5 e = Event()
 6 def conn_mysql():
 7     '''連結資料庫'''
 8     count = 1
 9     while not e.is_set():  #當沒有檢測到時候
10         if count >3: #如果嘗試次數大於3,就主動拋異常
11             raise ConnectionError('嘗試連結的次數過多')
12         print('\033[45m%s 第%s次嘗試'%(currentThread(),count)) 13 e.wait(timeout=1) #等待檢測(裡面的引數是超時1秒) 14 count+=1 15 print('\033[44m%s 開始連結...'%(currentThread().getName())) 16 def check_mysql(): 17 '''檢測資料庫''' 18 print('\033[42m%s 檢測mysql...' % (currentThread().getName())) 19 time.sleep(5) 20  e.set() 21 if __name__ == '__main__': 22 for i in range(3): #三個去連結 23 t = Thread(target=conn_mysql) 24  t.start() 25 t = Thread(target=check_mysql) 26 t.start()
複製程式碼

2.例如2,紅綠燈的例子

複製程式碼
 1 from  threading import Thread,Event,currentThread
 2 import time
 3 e = Event()
 4 def traffic_lights():
 5     '''紅綠燈'''
 6     time.sleep(5)
 7  e.set() 8 def car(): 9 '''車''' 10 print('\033[42m %s 等綠燈\033[0m'%currentThread().getName()) 11  e.wait() 12 print('\033[44m %s 車開始通行' % currentThread().getName()) 13 if __name__ == '__main__': 14 for i in range(10): 15 t = Thread(target=car) #10輛車 16  t.start() 17 traffic_thread = Thread(target=traffic_lights) #一個紅綠燈 18 traffic_thread.start()
複製程式碼

四、定時器(Timer)

指定n秒後執行某操作

from threading import Timer
def func(n):
    print('hello,world',n)
t = Timer(3,func,args=(123,))  #等待三秒後執行func函式,因為func函式有引數,那就再傳一個引數進去
t.start()

五、執行緒queue

queue佇列 :使用import queue,用法與程序Queue一樣

queue.Queue(maxsize=0) #先進先出

複製程式碼
1 # 1.佇列-----------
2 import queue
3 q = queue.Queue(3) #先進先出
4 q.put('first')
5 q.put('second')
6 q.put('third')
7 print(q.get())
8 print(q.get()) 9 print(q.get())
複製程式碼

queue.LifoQueue(maxsize=0)#先進後出

複製程式碼
1 # 2.堆疊----------
2 q = queue.LifoQueue() #先進後出(或者後進先出)
3 q.put('first')
4 q.put('second')
5 q.put('third')
6 q.put('for')
7 print(q.get())
8 print(q.get()) 9 print(q.get())
複製程式碼

queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

複製程式碼
 1 # ----------------
 2 '''3.put進入一個元組,元組的第一個元素是優先順序
 3 (通常也可以是數字,或者也可以是非數字之間的比較)
 4 數字越小,優先順序越高'''
 5 q = queue.PriorityQueue()
 6 q.put((20,'a'))
 7 q.put((10,'b'))  #先出來的是b,數字越小優先順序越高嘛
 8 q.put((30,'c'))
 9 print(q.get())
10 print(q.get())
11 print(q.get())
複製程式碼

六、多執行緒效能測試

1.多核也就是多個CPU
(1)cpu越多,提高的是計算的效能
(2)如果程式是IO操作的時候(多核和單核是一樣的),再多的cpu也沒有意義。
2.實現併發
第一種:一個程序下,開多個執行緒
第二種:開多個程序
3.多程序:
優點:可以利用多核
缺點:開銷大
4.多執行緒
優點:開銷小
缺點:不可以利用多核
5多程序和多程序的應用場景
1.計算密集型:也就是計算多,IO少
如果是計算密集型,就用多程序(如金融分析等)
2.IO密集型:也就是IO多,計算少
如果是IO密集型的,就用多執行緒(一般遇到的都是IO密集型的)
下例子練習:
複製程式碼
 1 # 計算密集型的要開啟多程序
 2 from  multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     res = 0
 7     for i in range(10000000): 8 res+=i 9 if __name__ == '__main__': 10 l = [] 11 start = time.time() 12 for i in range(4): 13 p = Process(target=work) #1.9371106624603271 #可以利用多核(也就是多個cpu) 14 # p = Thread(target=work) #3.0401737689971924 15  l.append(p) 16  p.start() 17 for p in l: 18  p.join() 19 stop = time.time() 20 print('%s'%(stop-start))
複製程式碼
複製程式碼
 1 # I/O密集型要開啟多執行緒
 2 from multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     time.sleep(3)
 7 if __name__ == '__main__': 8 l = [] 9 start = time.time() 10 for i in range(400): 11 # p = Process(target=work) #34.9549994468689 #因為開了好多程序,它的開銷大,花費的時間也就長了 12 p = Thread(target=work) #2.2151265144348145 #當開了多個執行緒的時候,它的開銷小,花費的時間也小了 13  l.append(p) 14  p.start() 15 for i in l : 16  i.join() 17 stop = time.time() 18 print('%s'%(stop-start))
複製程式碼
 

七、python標準模組----concurrent.futures

一、死鎖現象與遞迴鎖

程序也是有死鎖的

所謂死鎖: 是指兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,

它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序,

如下就是死鎖

複製程式碼
 1 死鎖-------------------
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexA = Lock()
 5 mutexB = Lock()
 6 class MyThread(Thread):
 7     def run(self): 8  self.f1() 9  self.f2() 10 def f1(self): 11  mutexA.acquire() 12 print('\033[33m%s 拿到A鎖 '%self.name) 13  mutexB.acquire() 14 print('\033[45%s 拿到B鎖 '%self.name) 15  mutexB.release() 16  mutexA.release() 17 def f2(self): 18  mutexB.acquire() 19 print('\033[33%s 拿到B鎖 ' % self.name) 20 time.sleep(1) #睡一秒就是為了保證A鎖已經被別人那到了 21  mutexA.acquire() 22 print('\033[45m%s 拿到B鎖 ' % self.name) 23  mutexA.release() 24  mutexB.release() 25 if __name__ == '__main__': 26 for i in range(10): 27 t = MyThread() 28 t.start() #一開啟就會去呼叫run方法
複製程式碼

那麼怎麼解決死鎖現象呢?

解決方法,遞迴鎖:在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。

這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。

直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖

?
1
mutexA = mutexB = threading.RLock()  #一個執行緒拿到鎖,counter加1,該執行緒內又碰到加鎖的情況,<br>則counter繼續加1,這期間所有其他執行緒都只能等待,等待該執行緒釋放所有鎖,即counter遞減到0為止
複製程式碼
 1 # 2.解決死鎖的方法--------------遞迴鎖
 2 from  threading import Thread,Lock,RLock
 3 import time
 4 mutexB = mutexA = RLock()
 5 class MyThread(Thread):
 6     def run(self):
 7  self.f1() 8  self.f2() 9 def f1(self): 10  mutexA.acquire() 11 print('\033[33m%s 拿到A鎖 '%self.name) 12  mutexB.acquire() 13 print('\033[45%s 拿到B鎖 '%self.name) 14  mutexB.release() 15  mutexA.release() 16 def f2(self): 17  mutexB.acquire() 18 print('\033[33%s 拿到B鎖 ' % self.name) 19 time.sleep(1) #睡一秒就是為了保證A鎖已經被別人拿到了 20  mutexA.acquire() 21 print('\033[45m%s 拿到B鎖 ' % self.name) 22  mutexA.release() 23  mutexB.release() 24 if __name__ == '__main__': 25 for i in range(10): 26 t = MyThread() 27 t.start() #一開啟就會去呼叫run方法
複製程式碼

二、訊號量Semaphore(其實也是一把鎖)

Semaphore管理一個內建的計數器

Semaphore與程序池看起來類似,但是是完全不同的概念。

程序池:Pool(4),最大隻能產生四個程序,而且從頭到尾都只是這四個程序,不會產生新的。

訊號量:訊號量是產生的一堆程序/執行緒,即產生了多個任務都去搶那一把鎖

複製程式碼
 1 from threading import Thread,Semaphore,currentThread
 2 import time,random
 3 sm = Semaphore(5) #執行的時候有5個人
 4 def task():
 5     sm.acquire()
 6     print('\033[42m %s上廁所'%currentThread().getName())
 7     time.sleep(random.randint(1,3)) 8 print('\033[31m %s上完廁所走了'%currentThread().getName()) 9  sm.release() 10 if __name__ == '__main__': 11 for i in range(20): #開了10個執行緒 ,這20人都要上廁所 12 t = Thread(target=task) 13 t.start()
複製程式碼
複製程式碼
 1 hread-1上廁所
 2  Thread-2上廁所
 3  Thread-3上廁所
 4  Thread-4上廁所
 5  Thread-5上廁所
 6  Thread-3上完廁所走了 7 Thread-6上廁所 8 Thread-1上完廁所走了 9 Thread-7上廁所 10 Thread-2上完廁所走了 11 Thread-8上廁所 12 Thread-6上完廁所走了 13 Thread-5上完廁所走了 14 Thread-4上完廁所走了 15 Thread-9上廁所 16 Thread-10上廁所 17 Thread-11上廁所 18 Thread-9上完廁所走了 19 Thread-12上廁所 20 Thread-7上完廁所走了 21 Thread-13上廁所 22 Thread-10上完廁所走了 23 Thread-8上完廁所走了 24 Thread-14上廁所 25 Thread-15上廁所 26 Thread-12上完廁所走了 27 Thread-11上完廁所走了 28 Thread-16上廁所 29 Thread-17上廁所 30 Thread-14上完廁所走了 31 Thread-15上完廁所走了 32 Thread-17上完廁所走了 33 Thread-18上廁所 34 Thread-19上廁所 35 Thread-20上廁所 36 Thread-13上完廁所走了 37 Thread-20上完廁所走了 38 Thread-16上完廁所走了 39 Thread-18上完廁所走了 40 Thread-19上完廁所走了
複製程式碼

三、Event

執行緒的一個關鍵特性是每個執行緒都是獨立執行且狀態不可預測。如果程式中的其 他執行緒需要通過判斷某個執行緒的狀態來確定自己下一步的操作,這時執行緒同步問題就會變得非常棘手。為了解決這些問題,我們需要使用threading庫中的Event物件。 物件包含一個可由執行緒設定的訊號標誌,它允許執行緒等待某些事件的發生。在 初始情況下,Event物件中的訊號標誌被設定為假。如果有執行緒等待一個Event物件, 而這個Event物件的標誌為假,那麼這個執行緒將會被一直阻塞直至該標誌為真。一個執行緒如果將一個Event物件的訊號標誌設定為真,它將喚醒所有等待這個Event物件的執行緒。如果一個執行緒等待一個已經被設定為真的Event物件,那麼它將忽略這個事件, 繼續執行

?
1
2
3
4
5
from  threading  import  Event
Event.isSet()  #返回event的狀態值
Event.wait()  #如果 event.isSet()==False將阻塞執行緒;
Event. set ()  #設定event的狀態值為True,所有阻塞池的執行緒啟用進入就緒狀態, 等待作業系統排程;
Event.clear()  #恢復

例如1.,有多個工作執行緒嘗試連結MySQL,我們想要在連結前確保MySQL服務正常才讓那些工作執行緒去連線MySQL伺服器,如果連線不成功,都會去嘗試重新連線。那麼我們就可以採用threading.Event機制來協調各個工作執行緒的連線操作

複製程式碼
 1 #首先定義兩個函式,一個是連線資料庫
 2 # 一個是檢測資料庫
 3 from threading import Thread,Event,currentThread
 4 import time
 5 e = Event()
 6 def conn_mysql():
 7     '''連結資料庫'''
 8     count = 1
 9     while not e.is_set():  #當沒有檢測到時候
10         if count >3: #如果嘗試次數大於3,就主動拋異常
11             raise ConnectionError('嘗試連結的次數過多')
12         print('\033[45m%s 第%s次嘗試'%(currentThread(),count)) 13 e.wait(timeout=1) #等待檢測(裡面的引數是超時1秒) 14 count+=1 15 print('\033[44m%s 開始連結...'%(currentThread().getName())) 16 def check_mysql(): 17 '''檢測資料庫''' 18 print('\033[42m%s 檢測mysql...' % (currentThread().getName())) 19 time.sleep(5) 20  e.set() 21 if __name__ == '__main__': 22 for i in range(3): #三個去連結 23 t = Thread(target=conn_mysql) 24  t.start() 25 t = Thread(target=check_mysql) 26 t.start()
複製程式碼

2.例如2,紅綠燈的例子

複製程式碼
 1 from  threading import Thread,Event,currentThread
 2 import time
 3 e = Event()
 4 def traffic_lights():
 5     '''紅綠燈'''
 6     time.sleep(5)
 7  e.set() 8 def car(): 9 '''車''' 10 print('\033[42m %s 等綠燈\033[0m'%currentThread().getName()) 11  e.wait() 12 print('\033[44m %s 車開始通行' % currentThread().getName()) 13 if __name__ == '__main__': 14 for i in range(10): 15 t = Thread(target=car) #10輛車 16  t.start() 17 traffic_thread = Thread(target=traffic_lights) #一個紅綠燈 18 traffic_thread.start()
複製程式碼

四、定時器(Timer)

指定n秒後執行某操作

from threading import Timer
def func(n):
    print('hello,world',n)
t = Timer(3,func,args=(123,))  #等待三秒後執行func函式,因為func函式有引數,那就再傳一個引數進去
t.start()

五、執行緒queue

queue佇列 :使用import queue,用法與程序Queue一樣

queue.Queue(maxsize=0) #先進先出

複製程式碼
1 # 1.佇列-----------
2 import queue
3 q = queue.Queue(3) #先進先出
4 q.put('first')
5 q.put('second')
6 q.put('third')
7 print(q.get())
8 print(q.get()) 9 print(q.get())
複製程式碼

queue.LifoQueue(maxsize=0)#先進後出

複製程式碼
1 # 2.堆疊----------
2 q = queue.LifoQueue() #先進後出(或者後進先出)
3 q.put('first')
4 q.put('second')
5 q.put('third')
6 q.put('for')
7 print(q.get())
8 print(q.get()) 9 print(q.get())
複製程式碼

queue.PriorityQueue(maxsize=0) #儲存資料時可設定優先順序的佇列

複製程式碼
 1 # ----------------
 2 '''3.put進入一個元組,元組的第一個元素是優先順序
 3 (通常也可以是數字,或者也可以是非數字之間的比較)
 4 數字越小,優先順序越高'''
 5 q = queue.PriorityQueue()
 6 q.put((20,'a'))
 7 q.put((10,'b'))  #先出來的是b,數字越小優先順序越高嘛
 8 q.put((30,'c'))
 9 print(q.get())
10 print(q.get())
11 print(q.get())
複製程式碼

六、多執行緒效能測試

1.多核也就是多個CPU
(1)cpu越多,提高的是計算的效能
(2)如果程式是IO操作的時候(多核和單核是一樣的),再多的cpu也沒有意義。
2.實現併發
第一種:一個程序下,開多個執行緒
第二種:開多個程序
3.多程序:
優點:可以利用多核
缺點:開銷大
4.多執行緒
優點:開銷小
缺點:不可以利用多核
5多程序和多程序的應用場景
1.計算密集型:也就是計算多,IO少
如果是計算密集型,就用多程序(如金融分析等)
2.IO密集型:也就是IO多,計算少
如果是IO密集型的,就用多執行緒(一般遇到的都是IO密集型的)
下例子練習:
複製程式碼
 1 # 計算密集型的要開啟多程序
 2 from  multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     res = 0
 7     for i in range(10000000): 8 res+=i 9 if __name__ == '__main__': 10 l = [] 11 start = time.time() 12 for i in range(4): 13 p = Process(target=work) #1.9371106624603271 #可以利用多核(也就是多個cpu) 14 # p = Thread(target=work) #3.0401737689971924 15  l.append(p) 16  p.start() 17 for p in l: 18  p.join() 19 stop = time.time() 20 print('%s'%(stop-start))
複製程式碼
複製程式碼
 1 # I/O密集型要開啟多執行緒
 2 from multiprocessing import Process
 3 from threading import Thread
 4 import time
 5 def work():
 6     time.sleep(3)
 7 if __name__ == '__main__': 8 l = [] 9 start = time.time() 10 for i in range(400): 11 # p = Process(target=work) #34.9549994468689 #因為開了好多程序,它的開銷大,花費的時間也就長了 12 p = Thread(target=work) #2.2151265144348145 #當開了多個執行緒的時候,它的開銷小,花費的時間也小了 13  l.append(p) 14  p.start() 15 for i in l : 16  i.join() 17 stop = time.time() 18 print('%s'%(stop-start))
複製程式碼
 

七、python標準模組----concurrent.futures