1. 程式人生 > >golang Mutex 實現上的幾個巧妙的點

golang Mutex 實現上的幾個巧妙的點

    golang 的metux 的實現有幾個點做法是非常有意思的,一個是底層資料結構上,用了平時很少用的位運算,第二個,用到了自旋,並做了自旋策略控制,最後是用了訊號量控制協程。

    首先是golang mutex 中用了很多位運算。位運算不做細介紹,對記憶體利用比較高的演算法都有涉及,比如redies 的壓縮列表,比如golang 的Protobuffer。

    有幾個關鍵點,iota 在定義的時候,用的多,做自增運算:    

const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexWaiterShift = iota
)

// 這裡 第一個變數為1 ,第二個變數為10, 第三個為10 

    然後,位運算的求或和求與用的很多,一個是與1 求或將某位置1,一個是與0 求與將某位置0,這些都是用於改變某些標誌位的方式,不要看懵逼了:


        new := old | mutexLocked // 將old 的最後一位置1,表示new 鎖一定是被持有狀態
        if old&mutexLocked != 0 { // 將最後一位保留後,其他位全部置0, 判斷最後一位的狀態是不是0,判斷是不是被持有
            if runtime_canSpin(iter) {
                // Active spinning makes sense.
                // Try to set mutexWoken flag to inform Unlock
                // to not wake other blocked goroutines.
                if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
                    atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
                    awoke = true
                }
                runtime_doSpin()
                iter++
                continue
            }
            new = old + 1<<mutexWaiterShift
        }

    第二個是golang 的加鎖會自旋,4次自旋沒拿到鎖後再將協程休眠,這樣可以減少切換成本。這裡關判斷條件是自旋次數,cpu核數,p 的數量:

func sync_runtime_canSpin(i int) bool {
	if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched.npidle+sched.nmspinning)+1 {
		return false
	}
	if p := getg().m.p.ptr(); !runqempty(p) {
		return false
	}
	return true
}

    最後是利用訊號量掛起和喚醒協程,核心函式是

runtime_SemacquireMutex(&m.sema)
runtime_Semrelease(&m.sema)

    獲取訊號時,當s > 0 ,將s--,如果s 為負數,會將當前g 放入阻塞佇列,掛起直到s>0。

func (m *Mutex) Lock() {
	if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
		if race.Enabled {
			race.Acquire(unsafe.Pointer(m))
		}
		return // cas 獲取到鎖,直接返回
	}

	awoke := false  //迴圈標記
	iter := 0       //迴圈計數器
	for {
		old := m.state            //儲存當前鎖狀態
		new := old | mutexLocked  //將狀態位最後一位指定1
		if old&mutexLocked != 0 { //鎖被佔用
			if runtime_canSpin(iter) { //檢查是否可以進入自旋鎖,4次
				if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
					atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) { 
                                        //awoke標記為true
					awoke = true
				}
                                
				runtime_doSpin()//進入自旋
				iter++
				continue
			}
                       
			new = old + 1<<mutexWaiterShift //鎖被佔用,且自旋次數超過4次,掛起協程數+1,下面步驟將g 掛起並等待
		}
		if awoke {
			if new&mutexWoken == 0 {
				throw("sync: inconsistent mutex state")
			}
			new &^= mutexWoken //清除標誌
		}
               
		if atomic.CompareAndSwapInt32(&m.state, old, new) { //更新協程計數
			if old&mutexLocked == 0 {
				break
			}
                         
                        // 鎖請求失敗,進入休眠狀態,等待訊號喚醒後重新開始迴圈,一直阻塞在這裡
			runtime_SemacquireMutex(&m.sema)
			awoke = true
			iter = 0
		}
	}

	if race.Enabled {
		race.Acquire(unsafe.Pointer(m))
	}
}

    解鎖的過程就和加鎖反過來即可:

func (m *Mutex) Unlock() {
	if race.Enabled {
		_ = m.state
		race.Release(unsafe.Pointer(m))
	}

	new := atomic.AddInt32(&m.state, -mutexLocked)// 移除加鎖位
	if (new+mutexLocked)&mutexLocked == 0 {
		throw("sync: unlock of unlocked mutex")
	}

	old := new
	for {
		//當休眠佇列內的等待計數為0或者自旋狀態計數器為0,退出
		if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken) != 0 {
			return
		}
		// 等待協程數-1,更改清除標記位
		new = (old - 1<<mutexWaiterShift) | mutexWoken
		if atomic.CompareAndSwapInt32(&m.state, old, new) {         
			runtime_Semrelease(&m.sema)// 釋放鎖,傳送釋放訊號,對應之前的acquire
			return
		}
		old = m.state
	}
}