1. 程式人生 > >python執行緒同步原語--原始碼閱讀

python執行緒同步原語--原始碼閱讀

前面兩篇文章,寫了python執行緒同步原語的基本應用。下面這篇文章主要是通過閱讀原始碼來了解這幾個類的內部原理和是怎麼協同一起工作來實現python多執行緒的。

相關文章連結:python同步原語--執行緒鎖 

                      python--執行緒同步原語                           

 

 

一、關於Condition

Condition的用法:

用來記錄執行緒的狀態變數

檢視Condition的原始碼,會看到作者給開發者提供的文件說明。‘Class that implemets a condition variable’寫得很明白,這是一個用來記錄執行緒狀態的類。

 

1. Condition物件初始化

從這段程式碼可以看出,Condition使用了threading模組的Rlock類,關於Rlock的用法可以看我之前寫的一篇文章python同步原語--執行緒鎖 。在物件初始化的同時,將Rlock的請求鎖和釋放鎖方法賦給了內部的self.acquire和self.release物件方法。當初始化物件同時初始化這兩個方法,也就是說,每個物件在例項化的時候都會例項一個新的可重入鎖(RLock)。這樣可以避免不同物件(condition例項物件)間對類中共享方法的爭奪,避免出現死鎖的問題。

 

 

這段程式碼非常的重要。如果熟悉python的上下文管理的朋友應該一看就明白,這是上下文管理中的進入和退出操作。當在呼叫with時,程式會自動呼叫_ _enter_ _方法,在程式執行完畢,退出此上下文環境時,自動呼叫_ _exit_ _方法。那麼在這裡的_ _enter_ _和_ _exit_ _方法分別有什麼用呢?通過閱讀原始碼發現,前者是呼叫了Rlock的acquier方法(獲取鎖),而後者呼叫了Rlock的release方法(釋放鎖)。在下面我會繼續講這兩個方法在類中的作用。

 

2. wait()方法

原始碼中對wait()方法的定義是‘Wait  untified  or  until  a  timeout  occurs’。意思是阻塞等待知道有提示(notify)或者超時時間(timeout)的到達。

 

再看看wait()函式的內部邏輯

_is_owned()方法是判斷此Condition物件是否有獲取到鎖,如果沒有獲取到鎖(可能是可重入鎖的獲取次數已經達到預定值,不過這種情況很少發生),就會報出錯誤。接下來是對需要等待的程式進行一些列的處理。先是給這個程式分配鎖,對它的程式空間和內部變數進行封鎖。同時把這個加鎖後的程式放進雙端佇列(deque)‘等待者們’中。

好像wait()方法的功能到此就結束了。但是注意到下面還有try函式塊,旁邊一行註釋寫著‘restore  state  no matter  what’然後又舉了一個KeyboardInterrupt的異常情況。意思是當出現了例如鍵盤輸入ctrl+C這類操作的時候,程式如何退出阻塞。如果在呼叫wait方法的時候沒有傳入timeout引數,那麼,等待者程式就會重新獲取鎖。如果有timeout引數,就會根據引數來確定退出阻塞的時間。這就是為什麼我們有時在輸入ctrl+C強行退出阻塞的時候,程式會等待一會兒才給出退出程式的提示的原因。

 

 3. notify()方法

接下來這個notify()方法在Condition類中也是非常的重要(queue模組內部也呼叫了這個函式)

notify()方法內部實現:

notify直接翻譯過來就是‘提示’的意思。那麼為什麼Condition物件需要‘提示’呢?閱讀原始碼下來,其真正的功能不是提示,而是鎖的釋放,並且在釋放了指定數量的waiters之後,順便將他們從‘等待者們’佇列中刪除。如果直接理解為提示,就會很難理解了。但這是老外在定義函式時的寫法,本人的理解是,有點像給阻塞的程式發出訊號(提示),停止阻塞(釋放鎖),這麼理解應該也算勉強解釋得過去吧。

 

Condition內部另外還有一個notify_all()方法,這個方法對‘等待者們’佇列中的所有的程式都發出‘提示’,釋放鎖,而沒有像notify中那樣有數量n的限制。

原始碼:

 

那麼總結上面的Condititon內部的方法實現,可以看出,Condition類是為了實現一種狀態的‘儲存’,即在多執行緒程式設計的情況下,由於執行緒間共享空間而容易引發錯誤,往往需要讓一些執行緒先執行,而後面的執行緒等待(阻塞)。那麼如果這些程式需要阻塞等待,就會呼叫Condition類例項物件的wait方法,當結束等待的訊號發出時,就會呼叫Condition的notify方法對佇列中的程式進行釋放鎖操作。

 

二、關於SegmaphoreBoundedSegmaphore

如果在主機執行IO密集型任務的時候再執行這種短時間內完成大量任務(多執行緒)的程式時,計算機就有很大可能會宕機。

這時候就可以為這段程式新增一個計數器(counter)功能,來限制一個時間點內的執行緒數量。當每次進行IO操作時,都需要向segmaphore請求資源(鎖),如果沒有請求到,就阻塞等待,請求成功才就像執行任務。

 

那麼segmaphore的內部實現是怎樣的呢?實質上segmaphore也是鎖,其內部也是通過Lock和Condition實現的。Lock是單鎖,而segmaphore是可以自己定義的多鎖。在初始化segmaphore時,需要傳入引數counter。當執行緒向segmaphore請求資源(鎖)時,內部的counter會自動減1。當釋放資源(鎖)的時,counter就會自動加1。

segmaphore主要有兩個方法,acquire()和release()方法。

1. acquire()方法

官方的定義:

def acquire(self, blocking=True, timeout=None):

當內部的counter(原始碼實際上是用value變數儲存)等於0的時候,其他執行緒acquire會阻塞。這個時候,之前向segmaphore發出請求並獲得鎖的執行緒,它們如果同時執行完任務並希望釋放鎖時,那麼鎖的釋放是隨機的。任何一個完成任務的執行緒都會釋放鎖,這個順序跟執行緒向請求的時間和任務完成的時間是沒有任何關係的。

引數的解析:

1)blocking:預設為True,當執行緒請求不到資源的時候,會阻塞等待。如果設定為False,則執行緒請求不到資源時不會阻塞。

2)timeout:如果設定blocking = True,即預設值時,經過timeout時間會退出阻塞。

 

2. release()方法

這個方法與Lock的release方法很像,具體可以看看我之前寫的關於鎖的一篇文章。

連結:python同步原語--執行緒鎖 

原始碼:

解析:

當一個例項請求釋放鎖的時候,segmaphore內部的_value會自動加1,同時呼叫notify方法,將被鎖住的執行緒‘喚醒’。

 

 

三、關於Event類

閱讀原始碼知道,Event是也基於Condition和Lock實現的

 

1. set()方法

在 python--執行緒同步原語 這篇文章我曾經寫過一個案例,在程序中呼叫一次event.set()函式就可以一次性通知(釋放)所有阻塞的等待的鎖。其內部實現的原理在這裡,最關鍵的一個方法是notify_all()。

在呼叫set()方法的時候,方法內部會將_flags設定為True,即等待的事件會退出阻塞。

 

2. clear()方法

clear()方法不同作太多解析了,就是內部的_flags重新設定為False

 

3. wait()方法

wait()方法的定義:

def wait(self, timeout=None):

內部實現:

原理還是很簡單的,實際上Event的wait()方法正是呼叫了Condition中的例項方法wait()。呼叫wait()方法的時候可以傳入引數timeout(超時時間),作為Event事件自動退出阻塞的時間界限。