1. 程式人生 > >Golang的sync.WaitGroup 實現邏輯和源碼解析

Golang的sync.WaitGroup 實現邏輯和源碼解析

信號 啟動 blocks done 多少 set spa eas golang

在Golang中,WaitGroup主要用來做go Routine的等待,當啟動多個go程序,通過waitgroup可以等待所有go程序結束後再執行後面的代碼邏輯,比如:

func Main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            time.Sleep(10 * time.Second)
        }()

    }
    wg.Wait() // 等待在此,等所有go func裏都執行了Done()才會退出
}

WaitGroup主要是三個方法,Add(int),Done()和Wait(), 其中Done()是調用了Add(-1),推薦使用方法是,先統一Add,在goroutine裏並發的Done,然後Wait

WaitGroup主要維護了2個計數器,一個是請求計數器 v,一個是等待計數器 w,二者組成一個64bit的值,請求計數器占高32bit,等待計數器占低32bit。

簡單來說,當Add(n)執行時,請求計數器 v 就會加n,當Done()執行時,v 就會減1,可以想到,v 為0時就是結束,可以觸發Wait()執行了,所謂的觸發Wait()是通過信號量實現的。

那麽等待計數器拿來幹嘛?是因為Wait()方法支持並發,每一次Wait()方法執行,等待計數器 w 就會加1,而等待v為0觸發Wait()時,要根據w的數量發送w份的信號量,正確的觸發所有的Wait()。

同時,WaitGroup裏還有對使用邏輯進行了嚴格的檢查,比如Wait()一旦開始不能Add().

下面是帶註釋的代碼:

func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    // 更新statep,statep將在wait和add中通過原子操作一起使用
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    v := int32(state >> 32)
    w := uint32(state)
        if v < 0
{ panic("sync: negative WaitGroup counter") } if w != 0 && delta > 0 && v == int32(delta) { // wait不等於0說明已經執行了Wait,此時不容許Add panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // 正常情況,Add會讓v增加,Done會讓v減少,如果沒有全部Done掉,此處v總是會大於0的,直到v為0才往下走 // 而w代表是有多少個goruntine在等待done的信號,wait中通過compareAndSwap對這個w進行加1 if v > 0 || w == 0 { return } // This goroutine has set counter to 0 when waiters > 0. // Now there can‘t be concurrent mutations of state: // - Adds must not happen concurrently with Wait, // - Wait does not increment waiters if it sees counter == 0. // Still do a cheap sanity check to detect WaitGroup misuse. // 當v為0(Done掉了所有)或者w不為0(已經開始等待)才會到這裏,但是在這個過程中又有一次Add,導致statep變化,panic if *statep != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } // Reset waiters count to 0. // 將statep清0,在Wait中通過這個值來保護信號量發出後還對這個Waitgroup進行操作 *statep = 0 // 將信號量發出,觸發wait結束 for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false) } } // Done decrements the WaitGroup counter by one. func (wg *WaitGroup) Done() { wg.Add(-1) } // Wait blocks until the WaitGroup counter is zero. func (wg *WaitGroup) Wait() { statep := wg.state() for { state := atomic.LoadUint64(statep) v := int32(state >> 32) w := uint32(state) if v == 0 { // Counter is 0, no need to wait. if race.Enabled { race.Enable() race.Acquire(unsafe.Pointer(wg)) } return } // Increment waiters count. // 如果statep和state相等,則增加等待計數,同時進入if等待信號量 // 此處做CAS,主要是防止多個goroutine裏進行Wait()操作,每有一個goroutine進行了wait,等待計數就加1 // 如果這裏不相等,說明statep,在 從讀出來 到 CAS比較 的這個時間區間內,被別的goroutine改寫了,那麽不進入if,回去再讀一次,這樣寫避免用鎖,更高效些 if atomic.CompareAndSwapUint64(statep, state, state+1) { if race.Enabled && w == 0 { // Wait must be synchronized with the first Add. // Need to model this is as a write to race with the read in Add. // As a consequence, can do the write only for the first waiter, // otherwise concurrent Waits will race with each other. race.Write(unsafe.Pointer(&wg.sema)) } // 等待信號量 runtime_Semacquire(&wg.sema) // 信號量來了,代表所有Add都已經Done if *statep != 0 { // 走到這裏,說明在所有Add都已經Done後,觸發信號量後,又被執行了Add panic("sync: WaitGroup is reused before previous Wait has returned") } return } } }

Golang的sync.WaitGroup 實現邏輯和源碼解析