併發map --- sync map分析
[TOC]
本文基於1.10原始碼分析
如之前的文章可以看到,golang中的map是不支援併發操作的,golang推薦使用者直接用讀寫鎖對map進行保護,也有第三方類庫使用分段鎖。在1.19版本中,golang基於原本的map,新增了一個支援併發操作的map,叫sync map。
下面我們先介紹一下它的用法,然後在介紹原理,最後詳細看看程式碼。
用法
基本api有這幾個
- Store 寫入
- Load 讀取,返回值有兩個,第一個是value,第二個是bool變量表示key是否存在
- Delete 刪除
- LoadOrStore 存在就讀,不存在就寫
- Range 遍歷,注意遍歷的快照
sync map底層使用map[interface{}]* entry來做儲存,所以無論key還是value都是支援多種資料型別。
一個簡單的例子:
package main import ( "fmt" "sync" ) type MySyncMap struct { sync.Map } func (m MySyncMap) Print(k interface{}) { value, ok := m.Load(k) fmt.Println(value, ok) } func main() { var syncMap MySyncMap syncMap.Print("Key1") syncMap.Store("Key1", "Value1") syncMap.Print("Key1") syncMap.Store("Key2", "Value2") syncMap.Store("Key3", 2) syncMap.Print("Key3") syncMap.Store(4, 4) syncMap.Print(4) syncMap.Delete("Key1") syncMap.Print("Key1") }
輸出:
<nil> false Value1 true 2 true 4 true <nil> false
設計原理
常用方案比較
併發hashmap的方案有很多,下面簡單提一下幾種,然後再討論golang實現時的考慮。
第一種是最簡單的,直接在不支援併發的hashmap上,使用一個讀寫鎖的保護,這也是golang sync map還沒出來前,大家常用的方法。這種方法的缺點是寫會堵塞讀。
第二種是資料庫常用的方法,分段鎖,每一個讀寫鎖保護一段區間,golang的第三方庫也有人是這麼實現的。java的ConcurrentHashMap也是這麼實現的。平均情況下這樣的效能還挺好的,但是極端情況下,如果某個區間有熱點寫,那麼那個區間的讀請求也會受到影響。
第三種方法是我們C++自己造輪子時經常用的,使用使用連結串列法解決衝突,然後連結串列使用CAS去解決併發下衝突,這樣讀寫都是無鎖,我覺得這種挺好的,效能非常高,不知為啥其他語言不這麼實現。
然後在《An overview of sync.Map》中有提到,在cpu核數很多的情況下,因為cache contention,reflect.New、sync.RWMutex、atomic.AddUint32都會很慢,golang團隊為了適應cpu核很多的情況,沒有采用上面的幾種常見的方案。
golang sync map的目標是實現適合讀多寫少的場景、並且要求穩定性很好,不能出現像分段鎖那樣讀經常被阻塞的情況。golang sync map基於map做了一層封裝,在大部分情況下,不過寫入效能比較差。下面來詳細說說實現。
實現思路
要讀受到的影響儘量小,那麼最容易想到的想法,就是 讀寫分離 。golang sync map也是受到這個想法的啟發(我自認為)設計出來的。使用了兩個map,一個叫read,一個叫dirty,兩個map儲存的都是指標,指向value資料本身,所以兩個map是共享value資料的,更新value對兩個map同時可見。
dirty可以進行增刪查,當時都要進行加互斥鎖。
read中存在的key,可以無鎖的讀,藉助CAS進行無鎖的更新、刪除操作,但是不能新增key,相當於dirty的一個cache,由於value共享,所以能通過read對已存在的value進行更新。
read不能新增key,那麼資料怎麼來的呢?sync map中會記錄miss cache的次數,當miss次數大於等於dirty元素個數時,就會把dirty變成read,原來的dirty清空。
為了方便dirty直接變成read,那麼得保證read中存在的資料dirty必須有,所以在dirty是空的時候,如果要新增一個key,那麼會把read中的元素複製到dirty中,然後寫入新key。
然後刪除操作也很有意思,使用的是延遲刪除,優先看read中沒有,read中有,就把read中的對應entry指標中的p置為nil,作為一個標記。在read中標記為nil的,只有在dirty提升為read時才會被實際刪除。
原始碼
結構
// The zero Map is empty and ready for use. A Map must not be copied after first use. type Map struct { mu Mutex // read contains the portion of the map's contents that are safe for // concurrent access (with or without mu held). // // The read field itself is always safe to load, but must only be stored with // mu held. // // Entries stored in read may be updated concurrently without mu, but updating // a previously-expunged entry requires that the entry be copied to the dirty // map and unexpunged with mu held. read atomic.Value // readOnly // dirty contains the portion of the map's contents that require mu to be // held. To ensure that the dirty map can be promoted to the read map quickly, // it also includes all of the non-expunged entries in the read map. // // Expunged entries are not stored in the dirty map. An expunged entry in the // clean map must be unexpunged and added to the dirty map before a new value // can be stored to it. // // If the dirty map is nil, the next write to the map will initialize it by // making a shallow copy of the clean map, omitting stale entries. dirty map[interface{}]*entry // misses counts the number of loads since the read map was last updated that // needed to lock mu to determine whether the key was present. // // Once enough misses have occurred to cover the cost of copying the dirty // map, the dirty map will be promoted to the read map (in the unamended // state) and the next store to the map will make a new dirty copy. misses int } //read的實際結構體 // readOnly is an immutable struct stored atomically in the Map.read field. type readOnly struct { mmap[interface{}]*entry amended bool // true if the dirty map contains some key not in m. } // expunged is an arbitrary pointer that marks entries which have been deleted // from the dirty map. var expunged = unsafe.Pointer(new(interface{})) // An entry is a slot in the map corresponding to a particular key. type entry struct { // p points to the interface{} value stored for the entry. // // If p == nil, the entry has been deleted and m.dirty == nil. // // If p == expunged, the entry has been deleted, m.dirty != nil, and the entry // is missing from m.dirty. // // Otherwise, the entry is valid and recorded in m.read.m[key] and, if m.dirty // != nil, in m.dirty[key]. // // An entry can be deleted by atomic replacement with nil: when m.dirty is // next created, it will atomically replace nil with expunged and leave // m.dirty[key] unset. // // An entry's associated value can be updated by atomic replacement, provided // p != expunged. If p == expunged, an entry's associated value can be updated // only after first setting m.dirty[key] = e so that lookups using the dirty // map find the entry. p unsafe.Pointer // *interface{} }

sync map結構
mu是用來保護dirty的互斥鎖
missed是記錄沒命中read的次數
注意對於entry.p,有兩個特殊值,一個是 nil ,另一個是 expunged 。 nil代表的意思是,在read中被刪除了,但是dirty中還在,所以能直接更新值 (如果dirty==nill的特殊情況,下次寫入新值時會複製); expunged代表資料在ditry中已經被刪除了,更新值的時候要先把這個entry複製到dirty。
Load 讀取
// Load returns the value stored in the map for a key, or nil if no // value is present. // The ok result indicates whether value was found in the map. func (m *Map) Load(key interface{}) (value interface{}, ok bool) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() // Avoid reporting a spurious miss if m.dirty got promoted while we were // blocked on m.mu. (If further loads of the same key will not miss, it's // not worth copying the dirty map for this key.) read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { e, ok = m.dirty[key] // Regardless of whether the entry was present, record a miss: this key // will take the slow path until the dirty map is promoted to the read // map. m.missLocked() } m.mu.Unlock() } if !ok { return nil, false } return e.load() } func (e *entry) load() (value interface{}, ok bool) { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return nil, false } return *(*interface{})(p), true } func (m *Map) missLocked() { m.misses++ if m.misses < len(m.dirty) { return } m.read.Store(readOnly{m: m.dirty}) m.dirty = nil m.misses = 0 }
讀取時,先去read讀取;如果沒有,就加鎖,然後去dirty讀取,同時呼叫missLocked(),再解鎖。在missLocked中,會遞增misses變數,如果 misses>len(dirty),那麼把dirty提升為read,清空原來的dirty 。
在程式碼中,我們可以看到一個double check,檢查read沒有,上鎖,再檢查read中有沒有,是因為有可能在第一次檢查之後,上鎖之前的間隙,dirty提升為read了,這時如果不double check,可能會導致一個存在的key卻返回給呼叫方說不存在。 在下面的其他操作中,我們經常會看到這個double check。
Store 寫入
// Store sets the value for a key. func (m *Map) Store(key, value interface{}) { read, _ := m.read.Load().(readOnly) if e, ok := read.m[key]; ok && e.tryStore(&value) { return } m.mu.Lock() read, _ = m.read.Load().(readOnly) if e, ok := read.m[key]; ok { if e.unexpungeLocked() { // The entry was previously expunged, which implies that there is a // non-nil dirty map and this entry is not in it. m.dirty[key] = e } e.storeLocked(&value) } else if e, ok := m.dirty[key]; ok { e.storeLocked(&value) } else { if !read.amended { // We're adding the first new key to the dirty map. // Make sure it is allocated and mark the read-only map as incomplete. m.dirtyLocked() m.read.Store(readOnly{m: read.m, amended: true}) } m.dirty[key] = newEntry(value) } m.mu.Unlock() } // tryStore stores a value if the entry has not been expunged. // // If the entry is expunged, tryStore returns false and leaves the entry // unchanged. func (e *entry) tryStore(i *interface{}) bool { p := atomic.LoadPointer(&e.p) if p == expunged { return false } for { if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) { return true } p = atomic.LoadPointer(&e.p) if p == expunged { return false } } } func (m *Map) dirtyLocked() { if m.dirty != nil { return } read, _ := m.read.Load().(readOnly) m.dirty = make(map[interface{}]*entry, len(read.m)) for k, e := range read.m { if !e.tryExpungeLocked() { m.dirty[k] = e } } } func (e *entry) tryExpungeLocked() (isExpunged bool) { p := atomic.LoadPointer(&e.p) for p == nil { if atomic.CompareAndSwapPointer(&e.p, nil, expunged) { return true } p = atomic.LoadPointer(&e.p) } return p == expunged } // unexpungeLocked ensures that the entry is not marked as expunged. // // If the entry was previously expunged, it must be added to the dirty map // before m.mu is unlocked. func (e *entry) unexpungeLocked() (wasExpunged bool) { return atomic.CompareAndSwapPointer(&e.p, expunged, nil) }
寫入的時候,先看read中能否查到key,在read中存在的話,直接通過read中的entry來更新值;在read中不存在,那麼就上鎖,然後double check。這裡需要留意,分幾種情況:
- double check發現read中存在,如果是expunged,那麼就先嚐試把expunged替換成nil,最後如果entry.p==expunged就複製到dirty中,再寫入值;否則不用替換直接寫入值。
- dirty中存在,直接更新
- dirty中不存在,如果此時dirty為空,那麼需要將read複製到dirty中,最後再把新值寫入到dirty中。複製的時候呼叫的是dirtyLocked(),在複製到dirty的時候,read中為nil的元素,會更新為expunged,並且不復制到dirty中。
我們可以看到,在更新read中的資料時,使用的是tryStore,通過CAS來解決衝突,在CAS出現衝突後,如果發現數據被置為expung,tryStore那麼就不會寫入資料,而是會返回false,在Store流程中,就是接著往下走,在dirty中寫入。
再看下情況1的時候,為啥要那麼做。double check的時候,在read中存在,那麼就是說在加鎖之前,有併發執行緒先寫入了key,然後由Load觸發了dirty提升為read,這時dirty可能為空,也可能不為空,但無論dirty狀態如何,都是可以直接更新entry.p。如果是expunged的話,那麼要先替換成nil,再複製entry到dirty中。
疑問:這裡不太懂,為啥在read中直接更新就用cas去更新,跑到下面的流程,就用原子更新,可是儘管上了鎖,key在read中存在,那麼就會併發寫,為啥可以不用cas更新??
Delete 刪除
// Delete deletes the value for a key. func (m *Map) Delete(key interface{}) { read, _ := m.read.Load().(readOnly) e, ok := read.m[key] if !ok && read.amended { m.mu.Lock() read, _ = m.read.Load().(readOnly) e, ok = read.m[key] if !ok && read.amended { delete(m.dirty, key) } m.mu.Unlock() } if ok { e.delete() } } func (e *entry) delete() (hadValue bool) { for { p := atomic.LoadPointer(&e.p) if p == nil || p == expunged { return false } if atomic.CompareAndSwapPointer(&e.p, p, nil) { return true } } }
刪除很簡單,read中存在,就把read中的entry.p置為nil,如果只在ditry中存在,那麼就直接從dirty中刪掉對應的entry。