Python3入門之執行緒threading常用方法
Python3入門之執行緒threading常用方法
Python3 執行緒中常用的兩個模組為:
- _thread
- threading(推薦使用)
thread 模組已被廢棄。使用者可以使用 threading 模組代替。所以,在 Python3 中不能再使用"thread" 模組。為了相容性,Python3 將 thread 重新命名為 "_thread"。
下面將介紹threading模組常用方法:
1. threading.Lock()
如果多個執行緒共同對某個資料修改,則可能出現不可預料的結果,為了保證資料的正確性,需要對多個執行緒進行同步。
使用 Thread 物件的 Lock 和 Rlock 可以實現簡單的執行緒同步,這兩個物件都有 acquire 方法和 release 方法,對於那些需要每次只允許一個執行緒操作的資料,可以將其操作放到 acquire 和 release 方法之間。
來看看多個執行緒同時操作一個變數怎麼把內容給改亂了(在windows下不會出現內容混亂情況,可能python在Windows下自動加上鎖了;不過在Linux 下可以測試出內容會被改亂):
#!/usr/bin/env python3 import time, threading # 假定這是你的銀行存款: balance = 0 def change_it(n): # 先存後取,結果應該為0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): change_it(n) t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) 執行結果: [[email protected] ~]# python3 thread_lock.py 5 [[email protected] ~]# python3 thread_lock.py 5 [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 8 [[email protected] ~]# python3 thread_lock.py -8 [[email protected] ~]# python3 thread_lock.py 5 [[email protected] ~]# python3 thread_lock.py -8 [[email protected] ~]# python3 thread_lock.py 3 [[email protected] ~]# python3 thread_lock.py 5
我們定義了一個共享變數balance
,初始值為0
,並且啟動兩個執行緒,先存後取,理論上結果應該為0
,但是,由於執行緒的排程是由作業系統決定的,當t1、t2交替執行時,只要迴圈次數足夠多,balance
的結果就不一定是0
了。
如果我們要確保balance
計算正確,就要給change_it()
上一把鎖,當某個執行緒開始執行change_it()
時,我們說,該執行緒因為獲得了鎖,因此其他執行緒不能同時執行change_it()
,只能等待,直到鎖被釋放後,獲得該鎖以後才能改。由於鎖只有一個,無論多少執行緒,同一時刻最多隻有一個執行緒持有該鎖,所以,不會造成修改的衝突。建立一個鎖就是通過threading.Lock()
來實現:
#!/usr/bin/env python3 import time, threading balance = 0 lock = threading.Lock() def change_it(n): # 先存後取,結果應該為0: global balance balance = balance + n balance = balance - n def run_thread(n): for i in range(100000): # 先要獲取鎖: lock.acquire() try: # 放心地改吧: change_it(n) finally: # 改完了一定要釋放鎖: lock.release() t1 = threading.Thread(target=run_thread, args=(5,)) t2 = threading.Thread(target=run_thread, args=(8,)) t1.start() t2.start() t1.join() t2.join() print(balance) 執行結果: [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 0 [[email protected] ~]# python3 thread_lock.py 0
當多個執行緒同時執行lock.acquire()
時,只有一個執行緒能成功地獲取鎖,然後繼續執行程式碼,其他執行緒就繼續等待直到獲得鎖為止。
獲得鎖的執行緒用完後一定要釋放鎖,否則那些苦苦等待鎖的執行緒將永遠等待下去,成為死執行緒。所以我們用try...finally
來確保鎖一定會被釋放。
鎖的好處就是確保了某段關鍵程式碼只能由一個執行緒從頭到尾完整地執行,壞處當然也很多,首先是阻止了多執行緒併發執行,包含鎖的某段程式碼實際上只能以單執行緒模式執行,效率就大大地下降了。其次,由於可以存在多個鎖,不同的執行緒持有不同的鎖,並試圖獲取對方持有的鎖時,可能會造成死鎖,導致多個執行緒全部掛起,既不能執行,也無法結束,只能靠作業系統強制終止。
2. threading.Rlock()
RLock允許在同一執行緒中被多次acquire。而Lock卻不允許這種情況。注意:如果使用RLock,那麼acquire和release必須成對出現,即呼叫了n次acquire,必須呼叫n次的release才能真正釋放所佔用的瑣。
import threading lock = threading.Lock() #Lock物件 lock.acquire() lock.acquire() #產生了死瑣。 lock.release() lock.release() import threading rLock = threading.RLock() #RLock物件 rLock.acquire() rLock.acquire() #在同一執行緒內,程式不會堵塞。 rLock.release() rLock.release()
3. threading.Condition()
可以把Condiftion理解為一把高階的瑣,它提供了比Lock, RLock更高階的功能,允許我們能夠控制複雜的執行緒同步問題。threadiong.Condition在內部維護一個瑣物件(預設是RLock),可以在建立Condigtion物件的時候把瑣物件作為引數傳入。Condition也提供了acquire, release方法,其含義與瑣的acquire, release方法一致,其實它只是簡單的呼叫內部瑣物件的對應的方法而已。Condition還提供wait方法、notify方法、notifyAll方法(特別要注意:這些方法只有在佔用瑣(acquire)之後才能呼叫,否則將會報RuntimeError異常。):
acquire()/release():獲得/釋放 Lock
wait([timeout]):執行緒掛起,直到收到一個notify通知或者超時(可選的,浮點數,單位是秒s)才會被喚醒繼續執行。wait()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。呼叫wait()會釋放Lock,直至該執行緒被Notify()、NotifyAll()或者超時執行緒又重新獲得Lock.
notify(n=1):通知其他執行緒,那些掛起的執行緒接到這個通知之後會開始執行,預設是通知一個正等待該condition的執行緒,最多則喚醒n個等待的執行緒。notify()必須在已獲得Lock前提下才能呼叫,否則會觸發RuntimeError。notify()不會主動釋放Lock。
notifyAll(): 如果wait狀態執行緒比較多,notifyAll的作用就是通知所有執行緒(這個一般用得少)
現在寫個捉迷藏的遊戲來具體介紹threading.Condition的基本使用。假設這個遊戲由兩個人來玩,一個藏(Hider),一個找(Seeker)。遊戲的規則如下:1. 遊戲開始之後,Seeker先把自己眼睛蒙上,蒙上眼睛後,就通知Hider;2. Hider接收通知後開始找地方將自己藏起來,藏好之後,再通知Seeker可以找了; 3. Seeker接收到通知之後,就開始找Hider。Hider和Seeker都是獨立的個體,在程式中用兩個獨立的執行緒來表示,在遊戲過程中,兩者之間的行為有一定的時序關係,我們通過Condition來控制這種時序關係。
#!/usr/bin/python3.4 # -*- coding: utf-8 -*- import threading, time def Seeker(cond, name): time.sleep(2) cond.acquire() print('%s :我已經把眼睛蒙上了!'% name) cond.notify() cond.wait() for i in range(3): print('%s is finding!!!'% name) time.sleep(2) cond.notify() cond.release() print('%s :我贏了!'% name) def Hider(cond, name): cond.acquire() cond.wait() for i in range(2): print('%s is hiding!!!'% name) time.sleep(3) print('%s :我已經藏好了,你快來找我吧!'% name) cond.notify() cond.wait() cond.release() print('%s :被你找到了,唉~^~!'% name) if __name__ == '__main__': cond = threading.Condition() seeker = threading.Thread(target=Seeker, args=(cond, 'seeker')) hider = threading.Thread(target=Hider, args=(cond, 'hider')) seeker.start() hider.start() 執行結果: seeker :我已經把眼睛蒙上了! hider is hiding!!! hider is hiding!!! hider :我已經藏好了,你快來找我吧! seeker is finding!!! seeker is finding!!! seeker is finding!!! seeker :我贏了! hider :被你找到了,唉~^~!
上面不同顏色的notify和wait一一對應關係,通知--->等待;等待--->通知
4. threading.Semaphore和BoundedSemaphore
Semaphore:Semaphore 在內部管理著一個計數器。呼叫 acquire() 會使這個計數器 -1,release() 則是+1(可以多次release(),所以計數器的值理論上可以無限).計數器的值永遠不會小於 0,當計數器到 0 時,再呼叫 acquire() 就會阻塞,直到其他執行緒來呼叫release()
import threading, time def run(n): # 獲得訊號量,訊號量減一 semaphore.acquire() time.sleep(1) print("run the thread: %s" % n) # 釋放訊號量,訊號量加一 semaphore.release() #semaphore.release() # 可以多次釋放訊號量,每次釋放計數器+1 #semaphore.release() # 可以多次釋放訊號量,每次釋放計數器+1 if __name__ == '__main__': num = 0 semaphore = threading.Semaphore(2) # 最多允許2個執行緒同時執行(即計數器值);在多次釋放訊號量後,計數器值增加後每次可以執行的執行緒數也會增加 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('----all threads done---') print(num)
BoundedSemaphore:類似於Semaphore;不同在於BoundedSemaphore 會檢查內部計數器的值,並保證它不會大於初始值,如果超了,就引發一個 ValueError。多數情況下,semaphore 用於守護限制訪問(但不限於 1)的資源,如果 semaphore 被 release() 過多次,這意味著存在 bug
import threading, time def run(n): semaphore.acquire() time.sleep(1) print("run the thread: %s" % n) semaphore.release() # 如果再次釋放訊號量,訊號量加一,這是超過限定的訊號量數目,這時會報錯ValueError: Semaphore released too many times #semaphore.release() if __name__ == '__main__': num = 0 semaphore = threading.BoundedSemaphore(2) # 最多允許2個執行緒同時執行 for i in range(20): t = threading.Thread(target=run, args=(i,)) t.start() while threading.active_count() != 1: pass # print threading.active_count() else: print('----all threads done---') print(num)
5. threading.Event
事件處理的機制:全域性定義了一個“Flag”,如果“Flag”值為 False,那麼當程式執行 event.wait 方法時就會阻塞;如果“Flag”值為True,那麼執行event.wait 方法時便不再阻塞。
clear:將“Flag”設定為False
set:將“Flag”設定為True
用 threading.Event 實現執行緒間通訊,使用threading.Event可以使一個執行緒等待其他執行緒的通知,我們把這個Event傳遞到執行緒物件中,
Event預設內建了一個標誌,初始值為False。一旦該執行緒通過wait()方法進入等待狀態,直到另一個執行緒呼叫該Event的set()方法將內建標誌設定為True時,該Event會通知所有等待狀態的執行緒恢復執行。
通過Event來實現兩個或多個執行緒間的互動,下面是一個紅綠燈的例子,即起動一個執行緒做交通指揮燈,生成幾個執行緒做車輛,車輛行駛按紅燈停,綠燈行的規則。
import threading, time import random def light(): if not event.isSet(): #初始化evet的flag為真 event.set() #wait就不阻塞 #綠燈狀態 count = 0 while True: if count < 10: print('\033[42;1m---green light on---\033[0m') elif count < 13: print('\033[43;1m---yellow light on---\033[0m') elif count < 20: if event.isSet(): event.clear() print('\033[41;1m---red light on---\033[0m') else: count = 0 event.set() #開啟綠燈 time.sleep(1) count += 1 def car(n): while 1: time.sleep(random.randrange(3, 10)) #print(event.isSet()) if event.isSet(): print("car [%s] is running..." % n) else: print('car [%s] is waiting for the red light...' % n) event.wait() #紅燈狀態下呼叫wait方法阻塞,汽車等待狀態 if __name__ == '__main__': car_list = ['BMW', 'AUDI', 'SANTANA'] event = threading.Event() Light = threading.Thread(target=light) Light.start() for i in car_list: t = threading.Thread(target=car, args=(i,)) t.start()
6. threading.active_count()
返回當前存活的執行緒物件的數量;通過計算len(threading.enumerate())長度而來
The returned count is equal to the length of the list returned by enumerate().
import threading, time def run(): thread = threading.current_thread() print('%s is running...'% thread.getName()) #返回執行緒名稱 time.sleep(10) #休眠10S方便統計存活執行緒數量 if __name__ == '__main__': #print('The current number of threads is: %s' % threading.active_count()) for i in range(10): print('The current number of threads is: %s' % threading.active_count()) #返回當前存活執行緒數量 thread_alive = threading.Thread(target=run, name='Thread-***%s***' % i) thread_alive.start() thread_alive.join() print('\n%s thread is done...'% threading.current_thread().getName())
7. threading.current_thread()
Return the current Thread object, corresponding to the caller's thread of control.
返回當前執行緒物件
>>> threading.current_thread <function current_thread at 0x00000000029F6C80> >>> threading.current_thread() <_MainThread(MainThread, started 4912)> >>> type(threading.current_thread()) <class 'threading._MainThread'>
繼承執行緒threading方法;通過help(threading.current_thread())檢視。
import threading, time def run(n): thread = threading.current_thread() thread.setName('Thread-***%s***' % n) #自定義執行緒名稱 print('-'*30) print("Pid is :%s" % thread.ident) # 返回執行緒pid #print('ThreadName is :%s' % thread.name) # 返回執行緒名稱 print('ThreadName is :%s'% thread.getName()) #返回執行緒名稱 time.sleep(2) if __name__ == '__main__': #print('The current number of threads is: %s' % threading.active_count()) for i in range(3): #print('The current number of threads is: %s' % threading.active_count()) #返回當前存活執行緒數量 thread_alive = threading.Thread(target=run, args=(i,)) thread_alive.start() thread_alive.join() print('\n%s thread is done...'% threading.current_thread().getName()) 執行結果: Pid is :11792 ThreadName is :Thread-***0*** ------------------------------ Pid is :12124 ThreadName is :Thread-***1*** ------------------------------ Pid is :11060 ThreadName is :Thread-***2*** MainThread thread is done...
8. threading.enumerate()
Return a list of all Thread objects currently alive
返回當前存在的所有執行緒物件的列表
import threading, time def run(n): thread = threading.current_thread() thread.setName('Thread-***%s***' % n) print('-'*30) print("Pid is :%s" % thread.ident) # 返回執行緒pid #print('ThreadName is :%s' % thread.name) # 返回執行緒名稱 print('ThreadName is :%s'% threading.enumerate()) #返回所有執行緒物件列表 time.sleep(2) if __name__ == '__main__': #print('The current number of threads is: %s' % threading.active_count()) threading.main_thread().setName('Chengd---python') for i in range(3): #print('The current number of threads is: %s' % threading.active_count()) #返回當前存活執行緒數量 thread_alive = threading.Thread(target=run, args=(i,)) thread_alive.start() thread_alive.join() print('\n%s thread is done...'% threading.current_thread().getName()) 執行結果: Pid is :12096 ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-2, initial)>] ------------------------------ Pid is :10328 ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-***1***, started 10328)>, <Thread(Thread-3, initial)>] ------------------------------ Pid is :6032 ThreadName is :[<_MainThread(Chengd---python, started 12228)>, <Thread(Thread-***0***, started 12096)>, <Thread(Thread-***1***, started 10328)>, <Thread(Thread-***2***, started 6032)>] Chengd---python thread is done...
9.threading.get_ident()
返回執行緒pid
import threading, time def run(n): print('-'*30) print("Pid is :%s" % threading.get_ident()) # 返回執行緒pid if __name__ == '__main__': threading.main_thread().setName('Chengd---python') #自定義執行緒名 for i in range(3): thread_alive = threading.Thread(target=run, args=(i,)) thread_alive.start() thread_alive.join() print('\n%s thread is done...'% threading.current_thread().getName()) #獲取執行緒名
10. threading.main_thread()
返回主執行緒物件,類似 threading.current_thread();只不過一個是返回當前執行緒物件,一個是返回主執行緒物件
import threading, time def run(n): print('-'*30) print("Now Pid is :%s" % threading.current_thread().ident) # 返回當前執行緒pid print("Main Pid is :%s" % threading.main_thread().ident) # 返回主執行緒pid print('Now thread is %s...' % threading.current_thread().getName()) # 獲取當前執行緒名 print('Main thread is %s...' % threading.main_thread().getName()) # 獲取主執行緒執行緒名 if __name__ == '__main__': threading.main_thread().setName('Chengd---python') #自定義執行緒名 for i in range(3): thread_alive = threading.Thread(target=run, args=(i,)) thread_alive.start() time.sleep(2) thread_alive.join() 執行結果: ------------------------------ Now Pid is :8984 Main Pid is :3992 Now thread is Thread-1... Main thread is Chengd---python... ------------------------------ Now Pid is :4828 Main Pid is :3992 Now thread is Thread-2... Main thread is Chengd---python... ------------------------------ Now Pid is :12080 Main Pid is :3992 Now thread is Thread-3... Main thread is Chengd---python...