互斥鎖、死鎖、遞迴鎖、訊號量、Event
互斥鎖
互斥鎖也叫使用者鎖、同步鎖。
在多程序/多執行緒程式中,當多個執行緒處理一個公共資料時,會有資料安全問題,唯一能保證資料安全的,就是通過加鎖的方式,同一時間只能有一個修改資料的操作,將處理資料變為序列。雖然犧牲了速度,但是保證了資料安全。
來看一個不加鎖的栗子:
import time
def sub():
global num
temp = num
time.sleep(0.001)
num = temp -1
num = 100
t_l = []
for i in range(100):
t = threading.Thread(target=sub,args=())
t.start()
t_l.append(t)
for t in t_l:
t.join()
print(num)
在上面這個程式中,我們開一百個執行緒,每個執行緒都對全域性變數num實現-1的操作,如果順利,最終num的值應該為0.
實際執行過程是這樣的:
100個執行緒開始搶GIL,搶到的將被CPU執行:
step1: 執行global num
step2: temp = num 賦值操作
step3: 發生I/O阻塞,掛起,GIL釋放 (下一步的num=temp-1 還未被執行,因此全域性變數num的值仍然為100)
剩餘的99個執行緒搶GIL鎖,重複上面的步驟。
剩餘的98個執行緒搶GIL鎖,重複上面的步驟。
。。。
如果阻塞時間夠長(比如大於0.1秒),在阻塞期間,100個執行緒都被被切換一遍的話,那麼最終num的值是99;
如果阻塞時間短一點,在某個時刻,前面阻塞的執行緒恢復並搶到了GIL被CPU繼續執行,那麼執行num=temp-1賦值操作 ,全域性變數num的值被改變,執行緒結束,下一個被執行的執行緒拿到的num值就是99……依次類推,最終num的值經過多次賦值操作後將變得不確定,這取決於有多多少執行緒從阻塞中恢復過來。
如果不阻塞的話,每個執行緒都會執行對num 賦值操作,下一個執行緒拿到的num就是上一個執行緒減一的結果,最終num的值歸零。
下面我們進行加鎖操作:
lock = threading.Lock() # 獲取鎖物件
lock.acquire() # 加鎖
資料操作部分
lock.release() # 釋放鎖
import threading
import time
def sub(lock):
global num
lock.acquire() #獲得鎖
temp = num
time.sleep(0.01)
num = temp -1
lock.release() # 執行完資料修改,釋放鎖
num = 100
lock = threading.Lock()
# 例項化一個使用者鎖/互斥鎖,這個鎖是全域性變數,
# 每個執行緒獲取到鎖才能執行,執行完了釋放,下一個執行緒才能獲取鎖
t_l = []
for i in range(100):
t = threading.Thread(target=sub,args=(lock,))
t.start()
t_l.append(t)
for t in t_l:
t.join()
print(num)
上面加鎖和解鎖的操作也可以通過上下文管理來實現:
with lock:
資料修改部分
死鎖和遞迴鎖
兩個或兩個以上的程序或執行緒在執行過程中,因爭奪資源而造成的一種互相等待的現象,若無外力作用,它們都將無法推進下去。此時稱系統處於死鎖狀態或系統產生了死鎖,這些永遠在互相等待的程序稱為死鎖程序。看一個栗子:
import threading
import time
class MyThread(threading.Thread):
def run(self):
self.foo()
self.bar()
def foo(self):
LockA.acquire()
print('%s got LockA '% self.name)
LockB.acquire()
print('%s got LockB'%self.name)
LockB.release()
LockA.release()
# Thread-1在執行完foo函式後,釋放鎖。然後繼續執行bar函式,重新獲取鎖。
def bar(self):
LockB.acquire()
print('%s got LockB ' % self.name)
time.sleep(1)
# 讓Thread-1獲得了LockB後阻塞,OS切換執行緒Thread-2,其先執行foo,獲取到LockA,然後需要獲取LockB,才能執行下去,才能釋放鎖。而LockB在Thread-1手中,Thread-1從阻塞中恢復,需要獲得LockA才能繼續執行下去,才能釋放鎖。於是兩個執行緒互相等待,發生死鎖。
LockA.acquire() # 因為LockA已經被其它執行緒搶走了,所以這裡卡死了。
print('%s got LockA' % self.name)
LockA.release()
LockB.release()
LockA = threading.Lock()
LockB = threading.Lock()
for i in range(10):
t = MyThread()
t.start()
'''死鎖了
Thread-1 got LockA
Thread-1 got LockB
Thread-1 got LockB
Thread-2 got LockA
'''
解決方案就是使用遞迴鎖:
在Python中為了支援在同一執行緒中多次請求同一資源,python提供了可重入鎖RLock。
這個RLock內部維護著一個Lock和一個counter變數,counter記錄了acquire的次數,從而使得資源可以被多次require。直到一個執行緒所有的acquire都被release,其他的執行緒才能獲得資源。上面的例子如果使用RLock代替Lock,則不會發生死鎖:
rlock = threading.RLock() # 拿到一個可重入鎖物件,將上面的所有鎖都更換為rlock。
Semaphore訊號量
互斥鎖同時只允許一個執行緒更改資料,而Semaphore是同時允許一定數量的執行緒更改資料。
Semaphore管理一個內建的計數器,
每當呼叫acquire()時內建計數器-1;
呼叫release() 時內建計數器+1;
計數器不能小於0;當計數器為0時,acquire()將阻塞執行緒直到其他執行緒呼叫release()。
鎖訊號量與程序池的概念很像,但是要區分開,訊號量涉及到加鎖的概念。
看一個栗子:
import threading
import multiprocessing
import time, os, random
def go_wc(sem,name):
with sem:
print('員工%s 搶到一個茅坑,開始蹲...'% name)
time.sleep(random.uniform(1,3))
print('員工%s 完事了,感到身心愉悅...'% name)
if __name__ == '__main__': # 如果是程序的話,在windows系統下,程序必須寫到if __name__ == '__main__':內,否則報錯
print('大家開始上廁所》》》')
sem = threading.Semaphore(3) # 設定最大為3
# sem = multiprocessing.Semaphore(3)
t_l = []
for i in range(10):
t = threading.Thread(target=go_wc, args=(sem,i))
# t = multiprocessing.Process(target=go_wc,args=(sem,i))
t.start()
t_l.append(t)
for t in t_l:
t.join()
print('大家都完事了《《《')
'''
大家開始上廁所》》》
員工1 搶到一個茅坑,開始蹲...
員工0 搶到一個茅坑,開始蹲...
員工5 搶到一個茅坑,開始蹲...
員工0 完事了,感到身心愉悅... 同一時刻,只有3個坑,其中一個完事了,下一個才能開始
員工4 搶到一個茅坑,開始蹲...
'''
Event事件
在多執行緒環境中,每個執行緒的執行一般是獨立的,如果一個執行緒的執行依賴於另一個執行緒的狀態,那麼就有必要引入某種標誌位來進行判斷,event就相當於一個全域性的標誌位。event常用於主執行緒控制其他執行緒的執行。
建立一個event物件:
event = threading.Event()
event物件的方法:
1. event.isSet() 或 event.is_set(), 返回event物件的bool值,event物件的初始bool值是False.
2. event.wait() 如果上面是True, 啥也不做,往下執行,如果上面是False, 則阻塞執行緒. wait(num)為超時設定,超過num秒,繼續往下執行。
3. event.set() 設定event物件True
4. event.clear() 恢復為False
5. 圖示:
上栗子:
import threading
import time
def request():
print('waitting for server...')
event.wait() #阻塞,等待主執行緒開啟伺服器
print('connecting to server....')
if __name__ == '__main__':
event = threading.Event()
for i in range(5):
t = threading.Thread(target=request)
t.start()
print('attemp to start server')
time.sleep(3)
event.set() # 開啟伺服器後,更改event狀態