Golang 中 Mutex 的原始碼實現
本文基於 go1.11 版本。
Mutex 使用
在深入原始碼之前,要先搞清楚一點,對 Golang 中互斥鎖sync.Mutex
的操作是程式員的主動行為,可以看作是是一種協議,而不是強制在操作前必須先獲取鎖。
這樣說可能有點抽象,看下面這段程式碼:
package main import ( "fmt" "sync" "time" ) type People struct { muxsync.Mutex Name string Ageuint8 } func (p *People) IncAge() { p.mux.Lock() time.Sleep(3 * time.Second) p.Age++ p.mux.Unlock() } func main() { leo := &People{Name: "leo", Age: 18} innerIncTime := time.Now().Second() fmt.Println("with mutexinc time:", innerIncTime) go leo.IncAge() time.Sleep(time.Second) outerIncTime := time.Now().Second() fmt.Println("without mutex inc time:", outerIncTime) leo.Age++ fmt.Println("without mutex inc result:", leo.Age) fmt.Println("mutex status:", leo.mux) time.Sleep(2 * time.Second) fmt.Println("with mutex inc result:", leo.Age) fmt.Println("Two seconds later mutex status:", leo.mux) }
在執行leo.Age++
之前已經加鎖了,如果是需要強制獲取鎖的話,這裡會等待 3 秒直到鎖釋放後才能執行,而這裡沒有獲取鎖就可以直接對 Age 欄位進行操作,輸出結果:
with mutexinc time: 19 without mutex inc time: 20 without mutex inc result: 19 mutex status: {1 0} with mutex inc result: 20 Two seconds later mutex status: {0 0}
所以,如果在一個 goroutine 中對鎖執行了 Lock(),在另一個 goroutine 可以不用理會這個鎖,直接進行操作(當然不建議這麼做)。
還有一點需要注意的是,鎖只和具體變數關聯,與特定 goroutine 無關。雖然可以在一個 goroutine 中加鎖,在另一個 goroutine 中解鎖(如通過指標傳遞變數,或全域性變數都可以),但還是建議在同一個程式碼塊中進行成對的加鎖解鎖操作。
原始碼分析
這是 Mutex 的ofollow,noindex">原始碼連結 。
Mutex 結構
Mutex 表示一個互斥鎖,其零值就是未加鎖狀態的 mutex,無需初始化。在首次使用後不要做值拷貝,這樣可能會使鎖失效。
type Mutex struct { state int32// 表示鎖當前的狀態 semauint32 // 訊號量,用於向處於 Gwaitting 的 G 傳送訊號 }
幾個常量
這裡用位操作來表示鎖的不同狀態。
const ( mutexLocked = 1 << iota // mutex is locked mutexWoken mutexStarving mutexWaiterShift = iota starvationThresholdNs = 1e6 )
mutexLocked
值為 1,第一位為 1,表示 mutex 已經被加鎖。根據mutex.state & mutexLocked
的結果來判斷 mutex 的狀態:該位為 1 表示已加鎖,0 表示未加鎖。
mutexWoken
值為 2,第二位為 1,表示 mutex 是否被喚醒。根據mutex.state & mutexWoken
的結果判斷 mutex 是否被喚醒:該位為 1 表示已被喚醒,0 表示未被喚醒。
mutexStarving
值為 4,第三位為 1,表示 mutex 是否處於飢餓模式。根據mutex.state & mutexWoken
的結果判斷 mutex 是否處於飢餓模式:該位為 1 表示處於飢餓模式,0 表示正常模式。
mutexWaiterShift
值為 3,表示mutex.state
右移 3 位後即為等待的goroutine
的數量。
starvationThresholdNs
值為 1000000 納秒,即 1ms,表示將 mutex 切換到飢餓模式的等待時間閾值。這個常量在原始碼中有大篇幅的註釋,理解這段註釋對理解程式邏輯至關重要,翻譯整理如下:
引入這個常量是為了保證出現 mutex 競爭情況時的公平性。mutex 有兩種操作模式:正常模式和飢餓模式 。
正常模式下,等待者以 FIFO 的順序排隊來獲取鎖,但被喚醒的等待者發現並沒有獲取到 mutex,並且還要與新到達的 goroutine 們競爭 mutex 的所有權。新到達的 goroutine 們有一個優勢 —— 它們已經執行在 CPU 上且可能數量很多,所以一個醒來的等待者有很大可能會獲取不到鎖。在這種情況下它處在等待佇列的前面。如果一個 goroutine 等待 mutex 釋放的時間超過 1ms,它就會將 mutex 切換到飢餓模式。
在飢餓模式下,mutex 的所有權直接從對 mutex 執行解鎖的 goroutine 傳遞給等待佇列前面的等待者。新到達的 goroutine 們不要嘗試去獲取 mutex,即使它看起來是在解鎖狀態,也不要試圖自旋(等也白等,在飢餓模式下是不會給你的),而是自己乖乖到等待佇列的尾部排隊去。
如果一個等待者獲得 mutex 的所有權,並且看到以下兩種情況中的任一種:1) 它是等待佇列中的最後一個,或者 2) 它等待的時間少於 1ms,它便將 mutex 切換回正常操作模式。
正常模式有更好地效能,因為一個 goroutine 可以連續獲得好幾次 mutex,即使有阻塞的等待者。而飢餓模式可以有效防止出現位於等待佇列尾部的等待者一直無法獲取到 mutex 的情況。
自旋鎖操作
在開始看 Mutex 原始碼前要先介紹幾個與自旋鎖相關的函式,原始碼中通過這幾個函式實現了對自旋鎖的操作。這幾個函式實際執行的程式碼都是在 runtime 包中實現的。
runtime_canSpin
程式碼具體位置 。
由於 Mutex 的特性,自旋需要比較保守的進行,原因參考上面starvationThresholdNs
常量的註釋。
限制條件是:只能自旋少於 4 次,而且僅當執行在多核機器上並且 GOMAXPROCS>1;最少有一個其它正在執行的 P,並且本地的執行佇列 runq 裡沒有 G 在等待。與 runtime mutex 相反,不做被動自旋,因為可以在全域性 runq 上或其它 P 上工作。
// Active spinning for sync.Mutex. //go:linkname sync_runtime_canSpin sync.runtime_canSpin //go:nosplit func sync_runtime_canSpin(i int) bool { // sync.Mutex is cooperative, so we are conservative with spinning. // Spin only few times and only if running on a multicore machine and // GOMAXPROCS>1 and there is at least one other running P and local runq is empty. // As opposed to runtime mutex we don't do passive spinning here, // because there can be work on global runq or on other Ps. if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }
runtime_doSpin
程式碼具體位置同上。
執行自旋操作,這個函式是用匯編實現的,函式內部迴圈呼叫 PAUSE 指令。PAUSE 指令什麼都不做,但是會消耗 CPU 時間。
/go:linkname sync_runtime_doSpin sync.runtime_doSpin //go:nosplit func sync_runtime_doSpin() { procyield(active_spin_cnt) }
runtime_SemacquireMutex
程式碼具體位置 。傳送獲取到 Mutex 的訊號。
//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) { semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile) }
runtime_Semrelease
程式碼具體位置同上。傳送釋放 Mutex 的訊號。
//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease func sync_runtime_Semrelease(addr *uint32, handoff bool) { semrelease1(addr, handoff) }
Lock
這是 Lock 方法的全部原始碼,先看一下整體邏輯,下面會分段解釋:
首先是直接呼叫 CAS 嘗試獲取鎖,如果獲取到則將鎖的狀態從 0 切換為 1 並返回。獲取不到就進入 for 迴圈,通過自旋來等待鎖被其它 goroutine 釋放,只有兩個地方 break 退出 for 迴圈而獲取到鎖。
原始碼實現分析
剛進入函式,會嘗試獲取鎖:
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) { if race.Enabled { race.Acquire(unsafe.Pointer(m)) } return }
直接通過呼叫CompareAndSwapInt32
這個方法來檢查鎖的狀態是否是 0,如果是則表示可以加鎖,將其狀態轉換為 1,當前 goroutine 加鎖成功,函式返回。
CompareAndSwapInt32
這個方法是彙編實現的,在單核CPU上執行是可以保證原子性的,但在多核 CPU 上執行時,需要加上 LOCK 字首來對匯流排加鎖,從而保證了該指令的原子性。
至於race.Enabled
,這裡是判斷是否啟用了競爭檢測,即程式編譯或執行時是否加上了-race
子命令。關於競爭檢測如想深入瞭解可以看官方部落格,見參考目錄 1。在這裡可以不用理會。
如果 mutex 已經被其它 goroutine 持有,則進入下面的邏輯。先定義了幾個變數:
var waitStartTime int64 // 當前 goroutine 開始等待的時間 starving := false// mutex 當前的所處的模式 awoke := false// 當前 goroutine 是否被喚醒 iter := 0// 自旋迭代的次數 old := m.state// 儲存 mutex 當前狀態
進入 for 迴圈後,先檢查是否可以進行自旋:
如上所述,不要在飢餓模式下進行自旋 ,因為在飢餓模式下只有等待者們可以獲得 mutex 的所有權,這時自旋是不可能獲取到鎖的。
能進入執行自旋邏輯部分的條件:當前不是飢餓模式,而且當前還可以進行自旋(見上面的 runtime_canSpin 函式)。
然後是判斷能否喚醒當前 goroutine 的四個條件:根據 1)!awoke
和 2)old&mutexWoken == 0
來判斷當前 goroutine 還沒有被喚醒;3)old>>mutexWaiterShift != 0
表示還有其它在等待的 goroutine;4)如果當前 goroutine 狀態還沒有變,就將其狀態切換為old|mutexWoken
, 即喚醒狀態 。
// old&(mutexLocked|mutexStarving) == mutexLocked 表示 mutex 當前不處於飢餓模式。 // 即 old & 0101 == 0001,old 的第一位必定為 1,第三位必定為 0,即未處於飢餓模式。 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // 這時自旋是有意義的,通過把 mutexWoken 標識為 true,以通知 Unlock 方法就先不要叫醒其它 // 阻塞著的 goroutine 了。 if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { awoke = true } // 將當前 goroutine 標識為喚醒狀態後,執行自旋操作,計數器加一,將當前狀態記錄到 old,繼續迴圈等待 runtime_doSpin() iter++ old = m.state continue }
如果不能進行自旋操作,進入下面的邏輯:
如果 mutex 當前處於正常模式,將 new 的第一位即鎖位設定為 1;如果 mutex 當前已經被加鎖或處於飢餓模式,則當前 goroutine 進入等待佇列;如果 mutex 當前處於飢餓模式,而且 mutex 已被加鎖,則將 new 的第三位即飢餓模式位設定為 1。
new := old // 不要嘗試獲取處於飢餓模式的鎖,新到達的 goroutine 們必須排隊。 if old&mutexStarving == 0 { new |= mutexLocked } // 沒有獲取到鎖,當前 goroutine 進入等待佇列。 // old & 0101 != 0,那麼 old 的第一位和第三位至少有一個為 1,即 mutex 已加鎖或處於飢餓模式。 if old&(mutexLocked|mutexStarving) != 0 { new += 1 << mutexWaiterShift } // 當前 goroutine 將 mutex 切換到飢餓模式。但如果當前 mutex 是解鎖狀態,不要切換。 // Unlock 期望處於飢餓模式的 mutex 有等待者,在這種情況下不會這樣。 if starving && old&mutexLocked != 0 { new |= mutexStarving }
設定好 new 後,繼續下面的邏輯:
當 goroutine 被喚醒時,如果 new 還沒有被喚醒,則發生了不一致的 mutex 狀態,丟擲錯誤;否則就重置 new 的第二位即喚醒位為 0。
if awoke { if new&mutexWoken == 0 { throw("sync: inconsistent mutex state") } new &^= mutexWoken }
接下來會呼叫 CAS 來將 mutex 當前狀態由 old 更新為 new:
如更新成功,old&(mutexLocked|mutexStarving) == 0
表示 mutex 未鎖定且未處於飢餓模式,則 break 跳出迴圈,當前 goroutine 獲取到鎖。
如果當前的 goroutine 之前已經在排隊了,就排到佇列的前面。runtime_SemacquireMutex(&m.sema, queueLifo)
這個函式就是做插隊操作的,如果 queueLifo == true,就把當前 goroutine 插入到等待佇列的前面。
繼續往下,如果 mutex 當前是處於飢餓模式,則修改等待的 goroutine 數量和第三位即飢餓模式位,break 跳出迴圈,當前 goroutine 獲取到鎖;如果是正常模式,繼續迴圈。
if atomic.CompareAndSwapInt32(&m.state, old, new) { // old & 0101 == 0,old 的第一位和第三位必定不是 1,即 mutex 未鎖定且未處於飢餓模式。 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // If we were already waiting before, queue at the front of the queue. queueLifo := waitStartTime != 0 if waitStartTime == 0 { // 之前沒排過隊,開始計時。 waitStartTime = runtime_nanotime() } runtime_SemacquireMutex(&m.sema, queueLifo) // 確定 mutex 當前所處模式 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { // If this goroutine was woken and mutex is in starvation mode, // ownership was handed off to us but mutex is in somewhat // inconsistent state: mutexLocked is not set and we are still // accounted as waiter. Fix that. if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw("sync: inconsistent mutex state") } delta := int32(mutexLocked - 1<<mutexWaiterShift) // 如果不是飢餓模式或只剩一個等待者了,退出飢餓模式 if !starving || old>>mutexWaiterShift == 1 { // Exit starvation mode. // Critical to do it here and consider wait time. // Starvation mode is so inefficient, that two goroutines // can go lock-step infinitely once they switch mutex // to starvation mode. delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break } // 未處於飢餓模式,讓新到來的 goroutine 先獲取鎖,繼續迴圈 awoke = true iter = 0 } else { // 上面的 CAS 沒有成功更新為 new,記錄當前狀態,繼續迴圈 old = m.state }
Unlock
Unlock 的程式碼比較少,直接在程式碼中註釋:
func (m *Mutex) Unlock() { if race.Enabled { _ = m.state race.Release(unsafe.Pointer(m)) } // Fast path: drop lock bit. // 將 mutex 當前狀態第一位即鎖位置為 0 儲存到 new new := atomic.AddInt32(&m.state, -mutexLocked) // 當 new 狀態鎖位為 1 時會滿足此條件,即對未加鎖狀態的 mutex 進行解鎖,丟擲錯誤 if (new+mutexLocked)&mutexLocked == 0 { throw("sync: unlock of unlocked mutex") } // 如果未處於飢餓模式 if new&mutexStarving == 0 { old := new for { // 如果沒有等待者,或者一個 goroutine 已經被喚醒或獲取到鎖,或處於飢餓模式, // 無需喚醒任何其它被掛起的 goroutine。 // 在飢餓模式中,所有權直接從執行解鎖的 goroutine 傳遞給下一個等待者。 if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 等待者數量減 1,並將喚醒位改成 1 new = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { // 喚醒一個阻塞的 goroutine,但不是喚醒第一個等待者 runtime_Semrelease(&m.sema, false) return } old = m.state } } else { // 飢餓模式:將 mutex 所有權傳遞給下個等待者。 // 注意:mutexLocked 沒有設定,等待者將在被喚醒後設置它。 // 但是如果設定了 mutexStarving,仍然認為 mutex 是鎖定的,所以新來的 goroutine 不會獲取到它。 runtime_Semrelease(&m.sema, true) } }