Go 系列文章 10: sync
原文和後續更新:
ofollow,noindex" target="_blank">https://github.com/cch123/golang-notes/blob/master/sync.md
線性一致性模型
從原理上來講,atomic 操作和非 atomic 操作之間不滿足線性一致性模型。這和現代計算機的 CPU 亂序執行,以及 compiler 為優化而進行的指令重排有關。在 C++ 中針對各種場景和效能需求提供了各種 memory order 選項:
- memory_order_relaxedRelaxed operation: 只保證當前操作的原子性,不保證其它讀寫的順序,也不進行任何多餘的同步。就是說 CPU 和編譯器可以任意重排其它指令。
- memory_order_consume和 load 搭配使用時,相當於執行了一個 consume 操作,當前執行緒依賴 loaded 的值的讀寫都不能被 reorder 到 load 操作之前。其它執行緒中對依賴的變數的寫操作如果 release 了同一個 atomic 變數,在當前執行緒中馬上可見。
- memory_order_acquire只能用在 load 操作中,當前執行緒中在該 load 之後發生的所有讀寫,都不能被 reorder 到 load 之前。其它執行緒中所有寫入操作,如果對該 atomic 變數執行了 release 操作,那麼其之前的所有寫操作在當前執行緒都看得到。
- memory_order_release只能用在 store 操作中,當前執行緒中發生在 store 之前的所有讀寫都不能被 reorder 到 store 操作之後。當前執行緒在 store 之前發生的所有寫操作在其它執行緒執行同一個 atomic 變數的獲取操作之後便都是可見的了。所有對原子變數的寫入都會在 consume 相同原子變數的執行緒中可見。
- memory_order_acq_rel提供給 read-modify-write 操作用。這種操作會既執行 acquire 又執行 release。當前執行緒中的讀寫不能被 reorder 到該操作之前或之後。其它執行緒中對同一 atomic 變數執行 rlease 操作的寫在當前執行緒中執行 rmw 之前都可見,並且 rmw 操作結果對其它 acquire 相同 atomic 變數的執行緒也是可見的。
- memory_order_seq_cst可以在 load、store 和 rmw 操作中使用。load 操作使用時,相當於執行了 acquire,store 相當於執行了 release,rmw 相當於執行了 acquire 和 release。所有執行緒間觀察到的修改順序都是一致的。
這裡面時序最為嚴格的是 memory_order_seq_cst,這就是我們常說的“線性一致性”。Go 語言的 atomic 類似這個最嚴格的時序。簡單說明即:
- 當一個 goroutine 中對某個值進行 atomic.Store,在另一個 goroutine 中對同一個變數進行 atomic.Load,那麼 Load 之後可以看到 Store 的結果,且可以看到 Store 之前的其它記憶體寫入操作(在 C++ 的文件中可能被稱為 side effect)。
- atomic.Store 全域性有序,即你在任何一個 goroutine 中觀察到的全域性 atomic 變數們的變化順序一定是一致的,不會出現有違邏輯順序的出現次序。這個有一些難理解,看一下下面這個 C++ 的例子:
#include <thread> #include <atomic> #include <cassert> std::atomic<bool> x = {false}; std::atomic<bool> y = {false}; std::atomic<int> z = {0}; void write_x() { x.store(true, std::memory_order_seq_cst); } void write_y() { y.store(true, std::memory_order_seq_cst); } void read_x_then_y() { while (!x.load(std::memory_order_seq_cst)) ; if (y.load(std::memory_order_seq_cst)) { ++z; } } void read_y_then_x() { while (!y.load(std::memory_order_seq_cst)) ; if (x.load(std::memory_order_seq_cst)) { ++z; } } int main() { std::thread a(write_x); std::thread b(write_y); std::thread c(read_x_then_y); std::thread d(read_y_then_x); a.join(); b.join(); c.join(); d.join(); assert(z.load() != 0);// will never happen }
在非線性一致的場景下,可能會出現執行緒 c 和執行緒 d 觀察到的 x,y 值分別為 c: true, false; d: false, true。從而導致最終 z 的結果為 0。
而線性一致的場景下,我們可以用全域性事件發生的順序來推斷最終的記憶體狀態。但因為這是最嚴格的時序,所以 compiler 和硬體同步的成本較高。如果我們的 atomic 變數只用來做全域性的簡單計數,比如 counter,那麼在 Go 中就一定會比 C++ 一類提供了 memory order 選項的語言消耗更多的成本。
但如果 atomic.Load 和 atomic.Store 提供像 C++ 一樣的 memory_order 選項,那麼又會帶給程式員一定的心智負擔,所以看起來 Go 官方並不打算提供這樣的選項。
atomic 的實現
TEXT ·AddUint32(SB),NOSPLIT,$0-20 MOVQaddr+0(FP), BP MOVLdelta+8(FP), AX MOVLAX, CX LOCK XADDLAX, 0(BP) ADDLAX, CX MOVLCX, new+16(FP) RET
0x0036 00054 (atomic.go:10)MOVL$10, CX 0x003b 00059 (atomic.go:10)LOCK 0x003c 00060 (atomic.go:10)XADDLCX, (AX)
在 intel 平臺上被翻譯為:
mov ecx, 0xa lock xadd DWORD PTR [rax], ecx
lock 指令字首可以使許多指令操作(ADD, ADC, AND, BTC, BTR, BTS, CMPXCHG, CMPXCH8B, DEC, INC, NEG, NOT, OR, SBB, SUB, XOR, XADD, and XCHG)變成原子操作。CMPXCHG 指令用來實現 CAS 操作。
atomic.CompareAndSwap 即是使用 lock cmpxchg 來實現的。
在使用 lock 指令時,會導致 CPU 鎖匯流排。
waitgroup
// 在主 goroutine 中 Add 和 Wait,在其它 goroutine 中 Done // 在第一次使用之後,不能對 WaitGroup 再進行拷貝 type WaitGroup struct { noCopy noCopy // state1 的高 32 位是計數器,低 32 位是 waiter 計數 // 64 位的 atomic 操作需要按 64 位對齊,但是 32 位編譯器沒法保證這種對齊 // 所以分配 12 個位元組(多分配了 4 個位元組) // 當 state 沒有按 8 對齊時,我們可以偏 4 個位元組來使用 // 按 8 對齊時: // 0000...00000000...00000000...0000 // |- 4 bytes-||- 4 bytes -||- 4 bytes -| //使用使用不使用 // 沒有按 8 對齊時: // |- 4 bytes-||- 4 bytes -||- 4 bytes -| //不使用使用使用 // |-low->---------> ------> -----------> high-| state1 [12]byte semauint32 } func (wg *WaitGroup) state() *uint64 { // 判斷 state 是否按照 8 位元組對齊 if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { // 已對齊時,使用低 8 位元組即可 return (*uint64)(unsafe.Pointer(&wg.state1)) } else { // 未對齊時,使用高 8 位元組 return (*uint64)(unsafe.Pointer(&wg.state1)) return (*uint64)(unsafe.Pointer(&wg.state1[4])) } } // Add 一個 delta,delta 可能是負值,在 WaitGroup 的 counter 上增加該值 // 如果 counter 變成 0,所有阻塞在 Wait 函式上的 goroutine 都會被釋放 // 如果 counter 變成了負數,Add 會直接 panic // 當 counter 是 0 且 Add 的 delta 為正的操作必須發生在 Wait 呼叫之前。 // 而當 counter > 0 且 Add 的 delta 為負的操作則可以發生在任意時刻。 // 一般來講,Add 操作應該在建立 goroutine 或者其它需要等待的事件發生之前呼叫 // 如果 wg 被用來等待幾組獨立的事件集合 // 新的 Add 呼叫應該在所有 Wait 呼叫返回之後再呼叫 // 參見 wg 的 example func (wg *WaitGroup) Add(delta int) { statep := wg.state() state := atomic.AddUint64(statep, uint64(delta)<<32) v := int32(state >> 32) // counter 高位 4 位元組 w := uint32(state) // waiter counter,截斷,取低位 4 個位元組 if v < 0 { panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } if v > 0 || w == 0 { return } // 當前 goroutine 已經把 counter 設為 0,且 waiter 數 > 0 // 這時候不能有狀態的跳變 // - Add 不能和 Wait 進行併發呼叫 // - Wait 如果發現 counter 已經等於 0,則不應該對 waiter 數加一了 // 這裡是對 wg 誤用的簡單檢測 if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 重置 waiter 計數為 0 *statep = 0 for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false) } } // Done 其實就是 wg 的 counter - 1 // 進入 Add 函式後 // 如果 counter 變為 0 會觸發 runtime_Semrelease 通知所有阻塞在 Wait 上的 g func (wg *WaitGroup) Done() { wg.Add(-1) } // Wait 會阻塞直到 wg 的 counter 變為 0 func (wg *WaitGroup) Wait() { statep := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) // counter w := uint32(state) // waiter count if v == 0 { // counter return } // 如果沒成功,可能有併發,迴圈再來一次相同流程 // 成功直接返回 if atomic.CompareAndSwapUint64(statep, state, state+1) { runtime_Semacquire(&wg.sema) // 和上面的 Add 裡的 runtime_Semrelease 是對應的 if *statep != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }
once
// 內含一個鎖和用來做原子操作的變數 type Once struct { mMutex done uint32 } // Do 被用來執行那些只能執行一次的初始化操作 //config.once.Do(func() { config.init(filename) }) // // 因為對 Do 的呼叫直到其中的一個 f 執行之後才會返回,所以 // f 中不能呼叫同一個 once 例項的 Do 函式,否則會死鎖 // 如果 f 內 panic 了,Do 也認為已經返回了,未來對 Do 的呼叫不會再執行 f // once.Do(f) 被呼叫多次時,只有第一次呼叫會真正的執行 f // 對於每一個要執行的 f,都需要一個對應的 once 例項 // 在 done 已經被改成 1 之後 // 所有進入函式呼叫的行為會用 atomic 讀取值之後直接返回 func (o *Once) Do(f func()) { // 輕量級的原子變數 load if atomic.LoadUint32(&o.done) == 1 { // 如果原子 load 後發現已經是 1 了,直接返回 return } // Slow-path. o.m.Lock() defer o.m.Unlock() // 在 atomic load 的時候為 0,不代表進入 lock 之後也是 0 // 所以還需要再判斷一次 // 臨界區內的判斷和修改是比較穩妥的 if o.done == 0 { defer atomic.StoreUint32(&o.done, 1) f() } }
once.Do 實際上是一種優化,只要過程被執行過了,那麼之後所有判斷都走 atomic,不用進入臨界區。
Mutex
// Mutex 是互斥鎖 // 其零值是一個 unlocked 的互斥量 // 在被首次使用之後,Mutex 就不應該發生拷貝動作了 type Mutex struct { state int32 semauint32 }
加鎖過程:
// 對 m 上鎖 // 如果鎖已經在使用中 // 呼叫 Lock 的 goroutine 會陷入阻塞 // 直到 mutex 變為可用 func (m *Mutex) Lock() { // 當前直接就是已解鎖的 mutex // 直接用 atomic cas,更快 if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { return } var waitStartTime int64 starving := false awoke := false iter := 0 old := m.state for { // 這裡 if 有 starvation 的判斷 // 在飢餓模式時不能自旋,因為所有權被移交給等待的 goroutine 了 // 所以我們沒辦法獲得 mutex if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 這裡會做積極的自旋 // 沒有其它飢餓的 goroutine 的話,我們儘量直接就設定 mutexWoken flag // 這樣在 Unlock 的時候就不用喚醒其它被阻塞的 goroutine 了 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && // 設定 mutexWoken flag atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 進 runtime 自旋 runtime_doSpin() iter++ old = m.state continue } new := old // 如果 mutex 處於 starving 狀態,就不應該武斷地搶鎖了 // 新來的 goroutine 應該先去排隊 if old&mutexStarving == 0 { // 說明老狀態裡沒有 starving 那一位 // 即說明原來的 mutex 不是 starvation 狀態 // 給新的 state 標上 locked 這位 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 當前 goroutine 將 mutex 切換到 starvation 模式 // 如果 mutex 當前已經被 unlock 了,就不要做這個切換了 // Unlock 的時候會認為一個 starving 的 mutex 一定會有等待的 goroutine, // 這種情況下一定為 true if starving && old&mutexLocked != 0 { new |= mutexStarving } if awoke { // 當前 goroutine 是處於 awoke 狀態 // 但是從 mutex 裡拿到的狀態並沒有 mutexWoken 這個 flag // 說明這裡發生了 bug // PS: 這種情況下應該是沒有 waiters 的 // PS: 而是當前的加鎖的新 goroutine 直接進入喚醒流程 if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } // goroutine 被從 sleep 喚醒 // 所以我們需要在兩種情況(starving和非 starving 的)下 // :都 reset 掉這個 flag new &^= mutexWoken } if atomic.CompareAndSwapInt32(&m.state, old, new) { if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 如果之前已經等待過了,那麼直接插到佇列最前面 queueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) // 如果等待時間超過了閾值,那麼就進入 starving 狀態 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // 如果當前 goroutine 被喚醒,且 mutex 處於 starvation 狀態 // 這時候控制權被移交到給了我們,但 mutex 不知道怎麼回事處於不一致的狀態: // mutexLocked 標識位還沒有設定,但我們卻仍然認為當前 goroutine 正在等待這個 mutex。說明是個 bug,需要修正 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { // 退出飢餓模式 // 必須要在這裡退出,且考慮等待時間 // 飢餓模式很低效,一旦兩個 goroutine 同時將 mutex 切換到飢餓模式 // 可能會彼此無限地鎖下去 // ?? delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } awoke = true iter = 0 } else { old = m.state } } }
解鎖過程:
// 解鎖 m,對未加鎖的 mutex 解鎖會引起錯誤 // 被加鎖的 mutex 並不是和具體的某個 goroutine 繫結的 // 完全可以在一個 goroutine 中加鎖並在另外的 goroutine 中解鎖 func (m *Mutex) Unlock() { // 幹掉 mutexLocked 的標識位 new := atomic.AddInt32(&m.state, -mutexLocked) if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果新的狀態表示 mutex 之前沒有處於飢餓狀態 if new&mutexStarving == 0 { old := new for { // 如果當前沒有處於飢餓模式等待中的 goroutine,或者當前這個 goroutine 已經 // 被喚醒或搶到了鎖,沒有必要再喚醒其它 goroutine 了 // 飢餓模式中,管理權會直接會被直接從 unlocking goroutine 移交給下一個 waiter // 當前 goroutine 並不在這個鏈條中, // 因為我們在 unlock 上面的 mutex 時,沒有觀察到 mutexStarving 的標識位 // 所以直接 return 讓路 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 獲取喚醒其它人的權力 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 飢餓模式: 將 mutex 的所有權移交給下一個 waiter // 注意: mutexLocked 沒有設定,waiter 被喚醒後會設定這個標識 // 但是 mutex 在 waiter 被喚醒後,如果 mutexStarving 位是 1 的話 // 仍然會被認為是上鎖的,所以新來的 goroutine 是沒法獲取這個鎖的 runtime_Semrelease(&m.sema, true) } }
sync.RWMutex
reader 加解鎖過程:
// RWMutex 是 reader/writer 互斥鎖 // 這種鎖可以被任意數量的 reader 或者單獨的一個 writer 所持有 // 其零值是遇 unlocked mutex // // RWMutex 在首次使用後就不應該被拷貝了 // // 如果一個 goroutine 持有了 RWMutex 用來做讀操作 // 這時候另一個 goroutine 可能會呼叫 Lock // 在這之後,就不會有任何 goroutine 會獲得 read lock 了 // 直到最初的 read lock 被釋放。 // 需要注意,這種鎖是禁止遞迴的 read locking 的。 // 這是為了保證鎖最終一定能夠到達可用狀態; // 一個阻塞的 Lock 的呼叫會排它地阻止其它 readers 獲取到這個鎖 type RWMutex struct { wMutex// held if there are pending writers writerSemuint32 // semaphore for writers to wait for completing readers readerSemuint32 // semaphore for readers to wait for completing writers readerCount int32// number of pending readers readerWaitint32// number of departing readers } const rwmutexMaxReaders = 1 << 30 // RLock 鎖住 rw 來進行讀操作 // // 不能被使用來做遞迴的 read locking; 一個阻塞的 Lock 呼叫會阻止其它新 readers 獲取當前鎖 func (rw *RWMutex) RLock() { if atomic.AddInt32(&rw.readerCount, 1) < 0 { // 有 writer 掛起,等待其操作完畢。 runtime_Semacquire(&rw.readerSem) } } // RUnlock 相當於 RLock 呼叫的逆向操作; // 其不會影響到其它同時持鎖的 reader 們 // 如果當前 rw 不是被鎖住讀的狀態,那麼就是一個 bug func (rw *RWMutex) RUnlock() { if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 { if r+1 == 0 || r+1 == -rwmutexMaxReaders { throw("sync: RUnlock of unlocked RWMutex") } // 有 writer 正在掛起 if atomic.AddInt32(&rw.readerWait, -1) == 0 { // 最後一個 reader 負責 unblock writer runtime_Semrelease(&rw.writerSem, false) } } }
writer 加解鎖過程:
// Lock 對 rw 加寫鎖 // 如果當前鎖已經被鎖住進行讀或者進行寫 // Lock 會阻塞,直到鎖可用 func (rw *RWMutex) Lock() { // First, resolve competition with other writers. // 首先需要解決和其它 writer 進行的競爭,這裡是去搶 RWMutex 中的 Mutex 鎖 rw.w.Lock() // 搶到了上面的鎖之後,通知所有 reader,現在有一個掛起的 writer 等待寫入了 r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders // 等待最後的 reader 將其喚醒 if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 { runtime_Semacquire(&rw.writerSem) } } // Unlock 將 rw 的讀鎖解鎖。如果當前 rw 沒有處於鎖定讀的狀態,那麼就是 bug // // 像 Mutex 一樣,一個上鎖的 RWMutex 並沒有和特定的 goroutine 繫結。 // 可以由一個 goroutine Lock 它,並由其它的 goroutine 解鎖 func (rw *RWMutex) Unlock() { // 告訴所有 reader 現在沒有活躍的 writer 了 r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { throw("sync: Unlock of unlocked RWMutex") } // Unblock 掉所有正在阻塞的 reader for i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false) } // 讓其它的 writer 可以繼續工作 rw.w.Unlock() }
參考資料
http://www.weixianmanbu.com/article/736.html
https://www.cnblogs.com/gaochundong/p/lock_free_programming.html