1. 程式人生 > >圖解Go裡面的互斥鎖mutex瞭解程式語言核心實現原始碼

圖解Go裡面的互斥鎖mutex瞭解程式語言核心實現原始碼

1. 鎖的基礎概念

1.1 CAS與輪詢

1.1.1 cas實現鎖


在鎖的實現中現在越來越多的採用CAS來進行,通過利用處理器的CAS指令來實現對給定變數的值交換來進行鎖的獲取

1.1.2 輪詢鎖


在多執行緒併發的情況下很有可能會有執行緒CAS失敗,通常就會配合for迴圈採用輪詢的方式去嘗試重新獲取鎖

1.2 鎖的公平性


鎖從公平性上通常會分為公平鎖和非公平鎖,主要取決於在鎖獲取的過程中,先進行鎖獲取的執行緒是否比後續的執行緒更先獲得鎖,如果是則就是公平鎖:多個執行緒按照獲取鎖的順序依次獲得鎖,否則就是非公平性

1.3 飢餓與排隊

1.3.1 鎖飢餓

鎖飢餓是指因為大量執行緒都同時進行獲取鎖,某些執行緒可能在鎖的CAS過程中一直失敗,從而長時間獲取不到鎖

1.3.2 排隊機制


上面提到了CAS和輪詢鎖進行鎖獲取的方式,可以發現如果已經有執行緒獲取了鎖,但是在當前執行緒在多次輪詢獲取鎖失敗的時候,就沒有必要再繼續進行反覆嘗試浪費系統資源,通常就會採用一種排隊機制,來進行排隊等待

1.4 位計數

在大多數程式語言中針對實現基於CAS的鎖的時候,通常都會採用一個32位的整數來進行鎖狀態的儲存

2. mutex實現

2.1 成員變數與模式

2.1.1 成員變數

在go的mutex中核心成員變數只有兩個state和sema,其通過state來進行鎖的計數,而通過sema來實現排隊

type Mutex struct {
    state int32
    sema  uint32
}

2.1.2 鎖模式

鎖模式主要分為兩種

描述 公平性
正常模式 正常模式下所有的goroutine按照FIFO的順序進行鎖獲取,被喚醒的goroutine和新請求鎖的goroutine同時進行鎖獲取,通常新請求鎖的goroutine更容易獲取鎖
飢餓模式 飢餓模式所有嘗試獲取鎖的goroutine進行等待排隊,新請求鎖的goroutine不會進行鎖獲取,而是加入佇列尾部等待獲取鎖

上面可以看到其實在正常模式下,其實鎖的效能是最高的如果多個goroutine進行鎖獲取後立馬進行釋放則可以避免多個執行緒的排隊消耗
同理在切換到飢餓模式後,在進行鎖獲取的時候,如果滿足一定的條件也會切換回正常模式,從而保證鎖的高效能

2.2 鎖計數

2.2.1 鎖狀態


在mutex中鎖有三個標誌位,其中其二進位制位分別位001(mutexLocked)、010(mutexWoken)、100(mutexStarving), 注意這三者並不是互斥的關係,比如一個鎖的狀態可能是鎖定的飢餓模式並且已經被喚醒

    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving

2.2.2 等待計數

mutex中通過低3位儲存了當前mutex的三種狀態,剩下的29位全部用來儲存嘗試正在等待獲取鎖的goroutine的數量

    mutexWaiterShift = iota // 3

2.3喚醒機制

2.3.1 喚醒標誌


喚醒標誌其實就是上面說的第二位,喚醒標誌主要用於標識當前嘗試獲取goroutine是否有正在處於喚醒狀態的,記得上面公平模式下,當前正在cpu上執行的goroutine可能會先獲取到鎖

2.3.2 喚醒流程


當釋放鎖的時候,如果當前有goroutine正在喚醒狀態,則只需要修改鎖狀態為釋放鎖,則處於woken狀態的goroutine就可以直接獲取鎖,否則則需要喚醒一個goroutine, 並且等待這個goroutine修改state狀態為mutexWoken,才退出

2.4 加鎖流程

2.3.1 快速模式

如果當前沒有goroutine加鎖,則並且直接進行CAS成功,則直接獲取鎖成功

        // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
        if race.Enabled {
            race.Acquire(unsafe.Pointer(m))
        }
        return
    }

2.3.2 自旋與喚醒

    // 注意這裡其實包含兩個資訊一個是如果當前已經是鎖定狀態,然後允許自旋iter主要是計數次數實際上只允許自旋4次
    // 其實就是在自旋然後等待別人釋放鎖,如果有人釋放鎖,則會立刻進行下面的嘗試獲取鎖的邏輯   
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
            // !awoke 如果當前執行緒不處於喚醒狀態
            // old&mutexWoken == 0如果當前沒有其他正在喚醒的節點,就將當前節點處於喚醒的狀態
            // old>>mutexWaiterShift != 0 :右移3位,如果不位0,則表明當前有正在等待的goroutine
            // atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken)設定當前狀態為喚醒狀態
            if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                awoke = true
            }
            // 嘗試自旋,
            runtime_doSpin()
            // 自旋計數
            iter++
        // 從新獲取狀態
            old = m.state
            continue
        }

2.3.3 更改鎖狀態

流程走到這裡會有兩種可能:
1.鎖狀態當前已經不是鎖定狀態
2.自旋超過指定的次數,不再允許自旋了

        new := old
        if old&mutexStarving == 0 {
            // 如果當前不是飢餓模式,則這裡其實就可以嘗試進行鎖的獲取了|=其實就是將鎖的那個bit位設為1表示鎖定狀態
            new |= mutexLocked
        }
        if old&(mutexLocked|mutexStarving) != 0 {
            // 如果當前被鎖定或者處於飢餓模式,則增等待一個等待計數
            new += 1 << mutexWaiterShift
        }
        if starving && old&mutexLocked != 0 {
            // 如果當前已經處於飢餓狀態,並且當前鎖還是被佔用,則嘗試進行飢餓模式的切換
            new |= mutexStarving
        }
        if awoke {
            if new&mutexWoken == 0 {
                throw("sync: inconsistent mutex state")
            }
            // awoke為true則表明當前執行緒在上面自旋的時候,修改mutexWoken狀態成功
            // 清除喚醒標誌位
            // 為什麼要清除標誌位呢?
            // 實際上是因為後續流程很有可能當前執行緒會被掛起,就需要等待其他釋放鎖的goroutine來喚醒
            // 但如果unlock的時候發現mutexWoken的位置不是0,則就不會去喚醒,則該執行緒就無法再醒來加鎖
            new &^= mutexWoken
        }

2.3.3 加鎖排隊與狀態轉換

再加鎖的時候實際上只會有一個goroutine加鎖CAS成功,而其他執行緒則需要重新獲取狀態,進行上面的自旋與喚醒狀態的重新計算,從而再次CAS

        if atomic.CompareAndSwapInt32(&m.state, old, new) {
            if old&(mutexLocked|mutexStarving) == 0 {
                // 如果原來的狀態等於0則表明當前已經釋放了鎖並且也不處於飢餓模式下
                // 實際的二進位制位可能是這樣的 1111000, 後面三位全是0,只有記錄等待goroutine的計數器可能會不為0
                // 那就表明其實
                break // locked the mutex with CAS
            }
            // 排隊邏輯,如果發現waitStatrTime不為0,則表明當前執行緒之前已經再排隊來,後面可能因為
            // unlock被喚醒,但是本次依舊沒獲取到鎖,所以就將它移動到等待佇列的頭部
            queueLifo := waitStartTime != 0
            if waitStartTime == 0 {
                waitStartTime = runtime_nanotime()
            }
            // 這裡就會進行排隊等待其他節點進行喚醒
            runtime_SemacquireMutex(&m.sema, queueLifo)
            // 如果等待超過指定時間,則切換為飢餓模式 starving=true
            // 如果一個執行緒之前不是飢餓狀態,並且也沒超過starvationThresholdNs,則starving為false
            // 就會觸發下面的狀態切換
            starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
            // 重新獲取狀態
            old = m.state
            if old&mutexStarving != 0 { 
                // 如果發現當前已經是飢餓模式,注意飢餓模式喚醒的是第一個goroutine
                // 當前所有的goroutine都在排隊等待
            // 一致性檢查,
                if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {
                    throw("sync: inconsistent mutex state")
                }
                // 獲取當前的模式
                delta := int32(mutexLocked - 1<<mutexWaiterShift)
                if !starving || old>>mutexWaiterShift == 1 {
                    // 如果當前goroutine不是飢餓狀態,就從飢餓模式切換會正常模式
                    // 就從mutexStarving狀態切換出去
                    delta -= mutexStarving
                }
                // 最後進行cas操作
                atomic.AddInt32(&m.state, delta)
                break
            }
            // 重置計數
            awoke = true
            iter = 0
        } else {
            old = m.state
        }

2.5 釋放鎖邏輯

2.5.1 釋放鎖程式碼

func (m *Mutex) Unlock() {
    if race.Enabled {
        _ = m.state
        race.Release(unsafe.Pointer(m))
    }

    // 直接進行cas操作
    new := atomic.AddInt32(&m.state, -mutexLocked)
    if (new+mutexLocked)&mutexLocked == 0 {
        throw("sync: unlock of unlocked mutex")
    }
    if new&mutexStarving == 0 {
        // 如果釋放鎖並且不是飢餓模式
        old := new
        for {

            if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
                // 如果已經有等待者並且已經被喚醒,就直接返回
                return
            }
            // 減去一個等待計數,然後將當前模式切換成mutexWoken
            new = (old - 1<<mutexWaiterShift) | mutexWoken
            if atomic.CompareAndSwapInt32(&m.state, old, new) {
                // 喚醒一個goroutine
                runtime_Semrelease(&m.sema, false)
                return
            }
            old = m.state
        }
    } else {
        // 喚醒等待的執行緒
        runtime_Semrelease(&m.sema, true)
    }
}

本文由部落格一文多發平臺 OpenWrite 釋出!