1. 程式人生 > >《GO併發程式設計實戰》—— Concurrent Map

《GO併發程式設計實戰》—— Concurrent Map

宣告:本文是《Go併發程式設計實戰》的樣章,感謝圖靈授權併發程式設計網站釋出樣章,

我們在本章前面的部分中對Go語言提供的各種傳統同步工具和方法進行了逐一的介紹。在本節,我們將運用它們來構造一個併發安全的字典(Map)型別。

我們已經知道,Go語言提供的字典型別並不是併發安全的。因此,我們需要使用一些同步方法對它進行擴充套件。這看起來並不困難。我們只要使用讀寫鎖將針對一個字典型別值的讀操作和寫操作保護起來就可以了。確實,讀寫鎖應該是我們首先想到的同步工具。不過,我們還不能確定只使用它是否就足夠了。不管怎樣,讓我們先來編寫併發安全的字典型別的第一個版本。

我們先來確定併發安全的字典型別的行為。還記得嗎?依然,這需要宣告一個介面型別。我們在第4章帶領讀者編寫過OrderedMap介面型別及其實現型別。我們可以借鑑OrderedMap介面型別的宣告並編寫出需要在這裡宣告的介面型別ConcurrentMap。實際上,ConcurrentMap介面型別的方法集合應該是OrderedMap介面型別的方法集合的一個子集。我們只需從OrderedMap中去除那些代表有序Map特有行為的方法宣告即可。既然是這樣,我何不從這兩個自定義的字典介面型別中抽出一個公共介面呢?

這個公共的字典介面型別可以是這樣的:

// 泛化的Map的介面型別

type GenericMap interface {

// 獲取給定鍵值對應的元素值。若沒有對應元素值則返回nil。

Get(key interface{}) interface{}

// 新增鍵值對,並返回與給定鍵值對應的舊的元素值。若沒有舊元素值則返回(nil, true)。

Put(key interface{}, elem interface{}) (interface{}, bool)

// 刪除與給定鍵值對應的鍵值對,並返回舊的元素值。若沒有舊元素值則返回nil。

Remove(key interface{}) interface{}

// 清除所有的鍵值對。

Clear()

// 獲取鍵值對的數量。

Len() int

// 判斷是否包含給定的鍵值。

Contains(key interface{}) bool

// 獲取已排序的鍵值所組成的切片值。

Keys() []interface{}

// 獲取已排序的元素值所組成的切片值。

Elems() []interface{}

// 獲取已包含的鍵值對所組成的字典值。

ToMap() map[interface{}]interface{}

// 獲取鍵的型別。

KeyType() reflect.Type

// 獲取元素的型別。

ElemType() reflect.Type

}

然後,我們把這個名為GenericMap的字典介面型別嵌入到OrderedMap介面型別中,並去掉後者中的已在前者內宣告的那些方法。修改後的OrderedMap介面型別如下:

// 有序的Map的介面型別。

type OrderedMap interface {

GenericMap // 泛化的Map介面

// 獲取第一個鍵值。若無任何鍵值對則返回nil。

FirstKey() interface{}

// 獲取最後一個鍵值。若無任何鍵值對則返回nil。

LastKey() interface{}

// 獲取由小於鍵值toKey的鍵值所對應的鍵值對組成的OrderedMap型別值。

HeadMap(toKey interface{}) OrderedMap

// 獲取由小於鍵值toKey且大於等於鍵值fromKey的鍵值所對應的鍵值對組成的OrderedMap型別值。

SubMap(fromKey interface{}, toKey interface{}) OrderedMap

// 獲取由大於等於鍵值fromKey的鍵值所對應的鍵值對組成的OrderedMap型別值。

TailMap(fromKey interface{}) OrderedMap

}

我們要記得在修改完成後立即使用go test命令重新執行相關的功能測試,並以此確保這樣的重構沒有破壞任何現有的功能。

有了GenericMap介面型別之後,我們的ConcurrentMap介面型別的宣告就相當簡單了。由於後者沒有任何特殊的行為,所以我們只要簡單地將前者嵌入到後者的宣告中即可,就像這樣:

type ConcurrentMap interface {

GenericMap

}

下面我們來編寫該介面型別的實現型別。我們依然使用一個結構體型別來充當,並把它命名為myConcurrentMap。myConcurrentMap型別的基本結構如下:

type myConcurrentMap struct {

m       map[interface{}]interface{}

keyType reflect.Type

elemType reflect.Type

rwmutex sync.RWMutex

}

有了編寫myOrderedMap型別(還記得嗎?它的指標型別是OrderedMap的實現型別)的經驗,寫出myConcurrentMap型別的基本結構也是一件比較容易的事情。可以看到,在基本需要之外,我們只為myConcurrentMap型別加入了一個代表了讀寫鎖的rwmutex欄位。此外,我們需要為myConcurrentMap型別新增的那些指標方法的實現程式碼實際上也可以以myOrderedMap型別中的相應方法為藍本。不過,在實現前者的過程中要注意合理運用同步方法以保證它們的併發安全性。下面,我們就開始編寫它們。

首先,我們來看Put、Remove和Clear這幾個方法。它們都屬於寫操作,都會改變myConcurrentMap型別的m欄位的值。

方法Put的功能是向myConcurrentMap型別值新增一個鍵值對。那麼,我們在這個操作的前後一定要分別鎖定和解鎖rwmutex的寫鎖。Put方法的實現如下:

func (cmap *myConcurrentMap) Put(key interface{}, elem interface{}) (interface{}, bool) {

if !cmap.isAcceptablePair(key, elem) {

return nil, false

}

cmap.rwmutex.Lock()

defer cmap.rwmutex.Unlock()

oldElem := cmap.m[key]

cmap.m[key] = elem

return oldElem, true

}

該實現中的isAcceptablePair方法的功能是檢查引數值key和elem是否均不為nil且它們的型別是否均與當前值允許的鍵型別和元素型別一致。在通過該檢查之後,我們就需要對rwmutex進行鎖定了。相應的,我們使用defer語句來保證對它的及時解鎖。與此類似,我們在Remove和Clear方法的實現中也應該加入相同的操作。

與這些代表著寫操作的方法相對應的,是代表讀操作的方法。在ConcurrentMap介面型別中,此類方法有Get、Len、Contains、Keys、Elems和ToMap。我們需要分別在這些方法的實現中加入對rwmutex的讀鎖的鎖定和解鎖操作。以Get方法為例,我們應該這樣來實現它:

func (cmap *myConcurrentMap) Get(key interface{}) interface{} {

cmap.rwmutex.RLock()

defer cmap.rwmutex.RUnlock()

return cmap.m[key]

}

這裡有兩點需要特別注意。

  • 我們在使用寫鎖的時候,要注意方法間的呼叫關係。比如,一個代表寫操作的方法中呼叫了另一個代表寫操作的方法。顯然,我們在這兩個方法中都會用到讀寫鎖中的寫鎖。但如果使用不當,我們就會使前者被永遠鎖住。當然,對於代表寫操作的方法呼叫代表讀操作的方法的這種情況來說,也會是這樣。請看下面的示例:
func (cmap *myConcurrentMap) Remove(key interface{}) interface{} { cmap.rwmutex.Lock() defer cmap.rwmutex.Unlock() oldElem := cmap.Get() delete(cmap.m, key) return oldElem }

可以看到,我們在Remove方法中呼叫了Get方法。並且,在這個呼叫之前,我們已經鎖定了rwmutex的寫鎖。然而,由前面的展示可知,我們在Get方法的開始處對rwmutex的讀鎖進行了鎖定。由於這兩個鎖定操作之間的互斥性,所以我們一旦呼叫這個Remove方法就會使當前Goroutine永遠陷入阻塞。更嚴重的是,在這之後,其他Goroutine在呼叫該*myConcurrentMap型別值的一些方法(涉及到其中的rwmutex欄位的讀鎖或寫鎖)的時候也會立即被阻塞住。

我們應該避免這種情況的方式。這裡有兩種解決方案。第一種解決方案是,把Remove方法中的oldElem := cmap.Get()語句與在它前面的那兩條語句的位置互換,即變為:

oldElem := cmap.Get() cmap.rwmutex.Lock() defer cmap.rwmutex.Unlock()

這樣可以保證在解鎖讀鎖之後才會去鎖定寫鎖。相比之下,第二種解決方案更加徹底一些,即:消除掉方法間的呼叫。也就是說,我們需要把oldElem := cmap.Get()語句替換掉。在Get方法中,體現其功能的語句是oldElem := cmap.m[key]。因此,我們把後者作為前者的替代品。若如此,那麼我們必須保證該語句出現在對寫鎖的鎖定操作之後。這樣,我們才能依然確保其在鎖的保護之下。實際上,通過這樣的修改,我們升級了Remove方法中的被用來保護從m欄位中獲取對應元素值的這一操作的鎖(由讀鎖升級至寫鎖)。

  • 對於rwmutex欄位的讀鎖來說,雖然鎖定它的操作之間不是互斥的,但是這些操作與相應的寫鎖的鎖定操作之間卻是互斥的。我們在上一條注意事項中已經說明了這一點。因此,為了最小化對寫操作的效能的影響,我們應該在鎖定讀鎖之後儘快的對其進行解鎖。也就是說,我們要在相關的方法中儘量減少持有讀鎖的時間。這需要我們綜合的考量。

依據前面的示例和注意事項說明,讀者可以試著實現Remove、Clear、Len、Contains、Keys、Elems和ToMap方法。它們實現起來並不困難。注意,我們想讓*myConcurrentMap型別成為ConcurrentMap介面型別的實現型別。因此,這些方法都必須是myConcurrentMap型別的指標方法。這包括馬上要提及的那兩個方法。

方法KeyType和ElemType的實現極其簡單。我們可以直接分別返回myConcurrentMap型別的keyType欄位和elemType欄位的值。這兩個欄位的值應該是在myConcurrentMap型別值的使用方初始化它的時候給出的。

按照慣例,我們理應提供一個可以方便的建立和初始化併發安全的字典值的函式。我們把它命名為NewConcurrentMap,其實現如下:

func NewConcurrentMap(keyType, elemType reflect.Type) ConcurrentMap {

return &myConcurrentMap{

keyType: keyType,

elemType: elemType,

m:       make(map[interface{}]interface{})}

}

這個函式並沒有什麼特別之處。由於myConcurrentMap型別的rwmutex欄位並不需要額外的初始化,所以它並沒有出現在該函式中的那個複合字面量中。此外,為了遵循面向介面程式設計的原則,我們把該函式的結果的型別宣告為了ConcurrentMap,而不是它的實現型別*myConcurrentMap。如果將來我們編寫出了另一個ConcurrentMap介面型別的實現型別,那麼就應該考慮調整該函式的名稱。比如變更為NewDefaultConcurrentMap,或者其他。

待讀者把還未實現的*myConcurrentMap型別的那幾個方法都補全之後(可以利用NewConcurrentMap函式來檢驗這個型別是否是一個合格的ConcurrentMap介面的實現型別),我們就開始一起為該型別編寫功能和效能測試了。

參照我們之前為*myOrderedMap型別編寫的功能測試,我們可以很快的照貓畫虎的創建出*myConcurrentMap型別的功能測試函式。這些函式和本小節前面講到的所有程式碼都被放到了goc2p專案的basic/map1程式碼包中。其中,介面型別ConcurrentMap的宣告和myConcurrentMap型別的基本結構及其所有的指標方法均在庫原始碼檔案cmap.go中。因此,我們應該把對應的測試程式碼放到cmap_test.go檔案中。

既然有了很好的參照,作者並不想再贅述*myConcurrentMap型別的功能測試函數了。我希望讀者能夠先獨立的編寫出來並通過go test命令的檢驗,然後再去與cmap_test.go檔案中的程式碼對照。

另外,在myConcurrentMap型別及其指標方法的實現中,我們多處用到了讀寫鎖和反射API(宣告在reflect程式碼包中的那些公開的程式實體)。它們執行的都是可能會對程式效能造成一定影響的操作。因此,針對*myConcurrentMap型別的效能測試(或稱基準測試)是很有必要的。這樣我們才能知道它的值在效能上到底與官方的字典型別有怎樣的差別。

我們在測試原始碼檔案cmap_test.go檔案中宣告兩個基準測試函式——BenchmarkConcurrentMap和BenchmarkMap。顧名思義,這兩個函式是分別被用來測試*myConcurrentMap型別和Go語言官方的字典型別的值的效能的。

在BenchmarkConcurrentMap函式中,我們執行這樣一個流程。

(1) 初始化一個*myConcurrentMap型別的值,同時設定鍵型別和元素型別均為int32型別。

(2) 執行迭代次數預先給定(即該函式的*testing.B型別的引數b的欄位N的值)的迴圈。在單次迭代中,我們向字典型別值新增一個鍵值對,然後再試圖從該值中獲取與當前鍵值對應的元素值。

(3) 打印出一行提示資訊,包含該值的鍵型別、元素型別以及長度等內容。

下面是該函式的實現:

func BenchmarkConcurrentMap(b *testing.B) {

keyType := reflect.TypeOf(int32(2))

elemType := keyType

cmap := NewConcurrentMap(keyType, elemType)

var key, elem int32

fmt.Printf("N=%d.\n", b.N)

b.ResetTimer()

for i := 0; i < b.N; i++ {

b.StopTimer()

seed := int32(i)

key = seed

elem = seed << 10

b.StartTimer()

cmap.Put(key, elem)

_ = cmap.Get(key)

b.StopTimer()

b.SetBytes(8)

b.StartTimer()

}

ml := cmap.Len()

b.StopTimer()

mapType := fmt.Sprintf("ConcurrentMap<%s, %s>",

keyType.Kind().String(), elemType.Kind().String())

b.Logf("The length of % value is %d.\n", mapType, ml)

b.StartTimer()

}

在這段程式碼中,我們用到了引數b的幾個方法。我們在第5章講基準測試的時候說明過它們的功用。這裡再簡單回顧一下。b.ResetTimer方法的功能是將針對該函式的本次執行的計時器歸零。而b.StartTimer方法和b.StopTimer方法的功能則分別是啟動和停止這個計時器。在該函式體中,我們使用這三個方法忽略掉一些無關緊要的語句的執行時間。更具體的講,我們只對for語句的for子句及其程式碼塊中的cmap.Put(key, elem)語句和_ = cmap.Get(key)語句,以及ml := cmap.Len()語句的執行時間進行計時。注意,只要它們的耗時不超過1秒或由go test命令的benchtime標記給定的時間,那麼測試執行程式就會嘗試著多次執行該函式並在每次執行前增加b.N的值。所以,我們去掉無關語句的執行耗時也意味著會讓BenchmarkConcurrentMap函式被執行更多次。

除此之外,我們還用到了b.SetBytes方法。它的作用是記錄在單次操作中被處理的位元組的數量。在這裡,我們每次記錄一個鍵值對所用的位元組數量。由於鍵和元素的型別都是int32型別的,所以它們共會用掉8個位元組。

在編寫完成BenchmarkConcurrentMap函式之後,我們便可以如法炮製針對Go官方的字典型別的基準測試函式BenchmarkMap了。請注意,為了公平起見,我們在初始化這個字典型別值的時候也要把它的鍵型別和元素型別都設定為interface{},就像這樣:

imap := make(map[interface{}]interface{})

但是,在為其新增鍵值對的時候要讓鍵和元素值的型別均為int32型別。

在一切準備妥當之後,我們在相應目錄下使用命令

go test -bench=”.” -run=”^$” -benchtime=1s -v

執行goc2p專案的basic/map1程式碼包中的基準測試。

稍等片刻,標準輸出上會出現如下內容:

PASS

BenchmarkConcurrentMap N=1.

N=100.

N=10000.

N=1000000.

1000000             1612 ns/op           4.96 MB/s

--- BENCH: BenchmarkConcurrentMap

cmap_test.go:240: The length of ConcurrentMap<int32, int32>alue is 1.

cmap_test.go:240: The length of ConcurrentMap<int32, int32>alue is 100.

cmap_test.go:240: The length of ConcurrentMap<int32, int32>alue is 10000.

cmap_test.go:240: The length of ConcurrentMap<int32, int32>alue is 1000000.

BenchmarkMap   N=1.

N=100.

N=10000.

N=1000000.

N=2000000.

2000000               856 ns/op           9.35 MB/s

--- BENCH: BenchmarkMap

cmap_test.go:268: The length of Map<int32, int32> value is 1.

cmap_test.go:268: The length of Map<int32, int32> value is 100.

cmap_test.go:268: The length of Map<int32, int32> value is 10000.

cmap_test.go:268: The length of Map<int32, int32> value is 1000000.

cmap_test.go:268: The length of Map<int32, int32> value is 2000000.

ok     basic/map1     258.327s

我們看到,測試執行程式執行BenchmarkConcurrentMap函式的次數是4,而執行BenchmarkMap函式的次數是5。這從以“N=”為起始的輸出內容和測試日誌的行數上都可以看得出來。由我們前面提到的測試執行程式多次執行基準測試函式的前提條件已經可知,Go語言提供的字典型別的值的效能要比我們自行擴充套件的併發安全的*myConcurrentMap型別的值的效能好。具體的效能差距可以參看測試輸出中的那兩行代表了測試細節的內容,即:

1000000             1612 ns/op           4.96 MB/s

2000000               856 ns/op           9.35 MB/s

前者代表針對*myConcurrentMap型別值的測試細節。測試執行程式在1秒鐘之內最多可以執行相關操作(包括新增鍵值對、根據鍵值獲取元素值和獲取字典型別值的長度)的次數為一百萬,平均每次執行的耗時為1612納秒。並且,根據我們在BenchmarkConcurrentMap函式中的設定,它每秒可以處理4.86兆位元組的資料。

另一方面,Go語言方法的字典型別的值的測試細節是這樣的:測試執行程式在1秒鐘之內最多可以執行相關操作的次數為兩百萬,平均每次執行的耗時為856納秒,根據BenchmarkMap函式中的設定,它每秒可以處理9.35兆位元組的資料。

從上述測試細節可以看出,前者在效能上要比後者差,且差距將近一倍。這樣的差距幾乎都是由*myConcurrentMap型別及其方法中使用的讀寫鎖造成的。

由此,我們也印證了,同步工具在為程式的併發安全提供支援的同時也會對其效能造成了不可忽視的損耗。這也使我們認識到:在使用同步工具的時候應該仔細斟酌並儘量平衡各個方面的指標,以使其無論是在功能上還是在效能上都能達到我們的要求。

順便提一句,Go語言未對自定義泛型提供支援,以至於我們在編寫此類擴充套件的時候並不是那麼方便。有時候,我們不得不使用反射API。但是,眾所周知,它們對程式效能的負面影響也是不可小覷的。因此,我們應該儘量減少對它們的使用。