1. 程式人生 > >《GO併發程式設計實戰》—— 條件變數

《GO併發程式設計實戰》—— 條件變數

QQ20141107-1宣告:本文是《Go併發程式設計實戰》的樣章,感謝圖靈授權併發程式設計網站釋出樣章,

我們在第6章講多執行緒程式設計的時候詳細說明過條件變數的概念、原理和適用場景。因此,我們在本小節僅對sync程式碼包中與條件變數相關的API進行簡單的介紹,並使用它們來改造我們之前實現的*myDataFile型別的相關方法。

在Go語言中,sync.Cond型別代表了條件變數。與互斥鎖和讀寫鎖不同,簡單的宣告無法創建出一個可用的條件變數。為了得到這樣一個條件變數,我們需要用到sync.NewCond函式。該函式的宣告如下:

func NewCond(l Locker) *Cond

我們在第6章中說過,條件變數總是要與互斥量組合使用。因此,sync.NewCond函式的唯一引數是sync.Locker型別的,而具體的引數值既可以是一個互斥鎖也可以是一個讀寫鎖。sync.NewCond函式在被呼叫之後會返回一個*sync.Cond型別的結果值。我們可以呼叫該值擁有的幾個方法來操縱對應的條件變數。

型別*sync.Cond的方法集合中有三個方法,即:Wait方法、Signal方法和Broadcast方法。它們分別代表了等待通知、單發通知和廣播通知的操作。

方法Wait會自動的對與該條件變數關聯的那個鎖進行解鎖,並且使呼叫方所在的Goroutine被阻塞。一旦該方法收到通知,就會試圖再次鎖定該鎖。如果鎖定成功,它就會喚醒那個被它阻塞的Goroutine。否則,該方法會等待下一個通知,那個Goroutine也會繼續被阻塞。而方法Signal和Broadcast的作用都是傳送通知以喚醒正在為此而被阻塞的Goroutine。不同的是,前者的目標只有一個,而後者的目標則是所有。

我們在第6章的“執行緒的同步”小節中詳細的描述過這些操作的行為和意義。讀者可以在需要時回顧其中的內容。

在上一小節,我們在*myDataFile型別的Read方法和Write方法的實現中使用到了讀寫鎖fmutex。在Read方法中,我們對一種邊界情況進行了特殊處理,即:如果*os.File型別的f欄位的ReadAt方法在被呼叫後返回了一個非nil且等於io.EOF的錯誤值,那麼Read方法就忽略這個錯誤並再次嘗試讀取相同位置的資料塊,直到讀取成功為止。從這個特殊處理的具體流程上來看,似乎使用條件變數來作為輔助手段會帶來一些好處。下面我們就來動手試驗一下。

我們先在結構體型別myDataFile增加一個型別為*sync.Cond的欄位rcond。為了快速實現想法,我們暫時不考慮怎樣初始化這個欄位,而直接去改造Read方法和Write方法。

在Read方法中,我們使用一個for迴圈來達到重新嘗試獲取資料塊的目的。為此,我們添加了若干條重複的語句、降低了程式的效能,還造成了一個潛在的問題——在某個情況下讀寫鎖fmutex不會被讀解鎖。為了解決這一系列新生的問題,我們使用代表條件變數的欄位rcond。Read方法的第三個版本如下:

func (df *myDataFile) Read() (rsn int64, d Data, err error) {

// 讀取並更新讀偏移量

// 省略若干條語句

 

//讀取一個數據塊

rsn = offset / int64(df.dataLen)

bytes := make([]byte, df.dataLen)

df.fmutex.RLock()

defer df.fmutex.RUnlock()

for {

_, err = df.f.ReadAt(bytes, offset)

if err != nil {

if err == io.EOF {

df.rcond.Wait()

continue

}

return

}

d = bytes

return

}

}

在這裡,我們假設條件變數rcond與讀寫鎖fmutex中的“讀鎖”相關聯。可以看到,我們讓defer df.fmutex.RUnlock()語句迴歸了,並刪除了所有return語句和continue語句前面的針對fmutex的讀解鎖操作。這都得益於新增在continue語句前面的df.rcond.Wait()。新增這條語句的意義在於:當發現由檔案內容讀取造成的EOF錯誤時,要讓當前Goroutine暫時放棄fmutex的“讀鎖”並等待通知的到來。放棄fmutex的“讀鎖”也就意味著Write方法中的資料塊寫操作不會受到它的阻礙了。在寫操作完成之後,我們應該及時向條件變數rcond傳送通知以喚醒為此而等待的Goroutine。請注意,在某個Goroutine被喚醒之後,應該再次檢查需要被滿足的條件。在這裡,這個需要被滿足的條件是在進行檔案內容讀取時不會造成EOF錯誤。如果該條件被滿足,那麼就可以進行後續的操作了。否則,應該再次放棄“讀鎖”並等待通知。這也是我們依然保留for迴圈的原因。

這裡有兩點需要特別注意。

  • 一定要在呼叫rcond的Wait方法之前鎖定與之關聯的那個“讀鎖”,否則就會造成對Wait方法的呼叫永遠無法返回。這種情況會導致流程執行的停滯,甚至整個程式的死鎖!導致這種結果的原因與條件變數和讀寫鎖的內部實現方式有關(結果也許並不應該是這樣,作者已經向Go語言官方提交了一個issue;Go語言官方已經接受了這個issue,並承諾將會在Go 1.4版本中改進它)。假設,與條件變數rcond關聯的是某個讀寫鎖的“寫鎖”或普通的互斥鎖,那麼對rcond.Wait方法的呼叫將會引發一個執行時恐慌。原因是,該方法會先對與之關聯的鎖進行解鎖,而試圖解鎖未被鎖定的鎖就會引發一個執行時恐慌。
  • 一定不要忘記在讀操作完成之前解鎖與條件變數rcond關聯的那個“讀鎖”,否則對讀寫鎖的寫鎖定操作將會阻塞相關的Goroutine。其根本原因是,條件變數rcond的Wait方法在返回之前會重新鎖定與之關聯的那個“讀鎖”。因此,在結束這個從檔案中讀取一個數據塊的流程之前,我們應該呼叫fmutex欄位的RLock方法。那條defer語句就起到了這個作用。

我們對Read方法的這次改進使得它的實現變得更加簡潔和清晰了。不過,要想使其中的條件變數rcond真正發揮作用,還需要Write方法的配合。換句話說,為了讓rcond.Wait方法可以適時的返回,我們要在向檔案寫入一個數據塊之後及時的向rcond傳送通知。添加了這一操作的Write方法如下:

func (df *myDataFile) Write(d Data) (wsn int64, err error) {

// 省略若干條語句

var bytes []byte

// 省略若干條語句

df.fmutex.Lock()

defer df.fmutex.Unlock()

_, err = df.f.Write(bytes)

df.rcond.Signal()

return

}

由於一個數據塊只能由某一個讀操作讀取,所以我們只是使用條件變數的Signal方法去通知某一個為此等待的Wait方法,並以此喚醒某一個相關的Goroutine。這可以免去其它相關的Goroutine中的一些無謂操作。

與Wait方法不同,我們在呼叫條件變數的Signal方法和Broadcast方法之前無需鎖定與之關聯的鎖。隨之,相應的解鎖操作也是不需要的。在這個Write方法中的鎖定操作和解鎖的操作針對的並不是df.rcond.Signal()語句。

我們一直在說,條件變數rcond是與讀寫鎖fmutex的“讀鎖”關聯的。這是怎樣做到的呢?讀者還記得我們在上一節提到讀寫鎖的RLocker方法嗎?它會返回當前讀寫鎖中的“讀鎖”。這個結果值同時也是sync.Locker介面的實現。因此,我們可以把它作為引數值傳給sync.NewCond函式。所以,我們在NewDataFile函式中的宣告df變數的語句的後面加入了這樣一條語句:

df.rcond = sync.NewCond(df.fmutex.RLocker())

在這之後,我們就可以像前面那樣使用這個條件變量了。

隨著對*myDataFile型別和NewDataFile函式的改造的完成,我們也將結束本節。Go語言提供的互斥鎖、讀寫鎖和條件變數都基本遵循了POSIX標準中描述的對應的同步工具的行為規範。它們簡單且高效。我們可以使用它們為複雜的型別提供併發安全的保證。在一些情況下,它們比通道更加靈活。在只需對一個或多個臨界區進行保護的時候,使用鎖往往會對程式的效能損耗更小。

好了,現在簡單預告一下後面的內容。在下一節中,我們將會介紹對程式效能損耗更小的同步工具——原子操作。同樣的,我們會使用這一工具進一步改造*myDataFile型別及其方法。