《Go in action》讀後記錄:Go的並發與並行

分類:IT技術 時間:2017-10-02

本文的主要內容是:

  • 了解goroutine,使用它來運行程序
  • 了解Go是如何檢測並修正競爭狀態的(解決資源互斥訪問的方式)
  • 了解並使用通道chan來同步goroutine

一、使用goroutine來運行程序

1.Go的並發與並行

Go的並發能力,是指讓某個函數獨立於其他函數運行的能力。當為一個函數創建goroutine時,該函數將作為一個獨立的工作單元,被 調度器 調度到可用的邏輯處理器上執行。Go的運行時調度器是個復雜的軟件,它做的工作大致是:

  • 管理被創建的所有goroutine,為其分配執行時間
  • 將操作系統線程與語言運行時的邏輯處理器綁定

參考The Go scheduler ,這裏較淺顯地說一下Go的運行時調度器。操作系統會在物理處理器上調度操作系統線程來運行,而Go語言的運行時會在邏輯處理器上調度goroutine來運行,每個邏輯處理器都分別綁定到單個操作系統線程上。這裏涉及到三個角色:

  • M:操作系統線程,這是真正的內核OS線程
  • P:邏輯處理器,代表著調度的上下文,它使goroutine在一個M上跑
  • G:goroutine,擁有自己的棧,指令指針等信息,被P調度

每個P會維護一個全局運行隊列(稱為runqueue),處於ready就緒狀態的goroutine(灰色G)被放在這個隊列中等待被調度。在編寫程序時,每當go func啟動一個goroutine時,runqueue便在尾部加入一個goroutine。在下一個調度點上,P就從runqueue中取出一個goroutine出來執行(藍色G)。

當某個操作系統線程M阻塞的時候(比如goroutine執行了阻塞的系統調用),P可以綁定到另外一個操作系統線程M上,讓運行隊列中的其他goroutine繼續執行:

上圖中G0執行了阻塞操作,M0被阻塞,P將在新的系統線程M1上繼續調度G執行。M1有可能是被新創建的,或者是從線程緩存中取出。Go調度器保證有足夠的線程來運行所有的P,語言運行時默認限制每個程序最多創建10000個線程,這個現在可以通過調用runtime/debug包的SetMaxThreads方法來更改。

Go可以在在一個邏輯處理器P上實現並發,如果需要並行,必須使用多於1個的邏輯處理器。Go調度器會把goroutine平等分配到每個邏輯處理器上,此時goroutine將在不同的線程上運行,不過前提是要求機器擁有多個物理處理器。

2.創建goroutine

使用關鍵字go來創建一個goroutine,並讓所有的goroutine都得到執行:

//example1.go
package main

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   wg sync.WaitGroup
)

func main() {
   //分配一個邏輯處理器P給調度器使用
   runtime.GOMAXPROCS(1)
   //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
   wg.Add(2)
   //聲明1個匿名函數,並創建一個goroutine
   fmt.Printf("Begin Coroutines\n")
   go func() {
      //在函數退出時,wg計數器減1
      defer wg.Done()
      //打印3次小寫字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()
   //聲明1個匿名函數,並創建一個goroutine
   go func() {
      defer wg.Done()
      //打印大寫字母表3次
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            fmt.Printf("%c ", char)
         }
      }
   }()

   fmt.Printf("Waiting To Finish\n")
   //等待2個goroutine執行完畢
   wg.Wait()

}

這個程序使用runtime.GOMAXPROCS(1)來分配一個邏輯處理器給調度器使用,兩個goroutine將被該邏輯處理器調度並發執行。程序輸出:

Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z 

從輸出來看,是先執行完一個goroutine,再接著執行第二個goroutine的,大寫字母全部打印完後,再打印全部的小寫字母。那麽,有沒有辦法讓兩個goroutine並行執行呢?為程序指定兩個邏輯處理器即可:

//修改為2個邏輯處理器
runtime.GOMAXPROCS(2)

此時執行程序,輸出為:

Begin Coroutines
Waiting To Finish
A B C D E a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c F G H I J K L M N O P Q R S T U V W X d e f g h i j k l m n o p q r s Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z t u v w x y z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z 

那如果只有1個邏輯處理器,如何讓兩個goroutine交替被調度?實際上,如果goroutine需要很長的時間才能運行完,調度器的內部算法會將當前運行的goroutine讓出,防止某個goroutine長時間占用邏輯處理器。由於示例程序中兩個goroutine的執行時間都很短,在為引起調度器調度之前已經執行完。不過,程序也可以使用runtime.Gosched()來將當前在邏輯處理器上運行的goruntine讓出,讓另一個goruntine得到執行:

//example2.go
package main

import (
   "runtime"
   "sync"
   "fmt"
)

var (
   wg sync.WaitGroup
)

func main() {
   //分配一個邏輯處理器P給調度器使用
   runtime.GOMAXPROCS(1)
   //在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
   wg.Add(2)
   //聲明1個匿名函數,並創建一個goroutine
   fmt.Printf("Begin Coroutines\n")
   go func() {
      //在函數退出時,wg計數器減1
      defer wg.Done()
      //打印3次小寫字母表
      for count := 0; count < 3; count++ {
         for char := 'a'; char < 'a'+26; char++ {
            if char=='k'{
               runtime.Gosched()
            }
            fmt.Printf("%c ", char)
         }
      }
   }()
   //聲明1個匿名函數,並創建一個goroutine
   go func() {
      defer wg.Done()
      //打印大寫字母表3次
      for count := 0; count < 3; count++ {
         for char := 'A'; char < 'A'+26; char++ {
            if char == 'K'{
               runtime.Gosched()
            }
            fmt.Printf("%c ", char)
         }
      }
   }()

   fmt.Printf("Waiting To Finish\n")
   //等待2個goroutine執行完畢
   wg.Wait()

}

兩個goroutine在循環的字符為k/K的時候會讓出邏輯處理器,程序的輸出結果為:

Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J a b c d e f g h i j K L M N O P Q R S T U V W X Y Z A B C D E F G H I J k l m n o p q r s t u v w x y z a b c d e f g h i j K L M N O P Q R S T U V W X Y Z k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z 

這裏大小寫字母果然是交替著輸出了。不過從輸出可以看到,第一次輸出大寫字母時遇到K沒有讓出邏輯處理器,這是什麽原因還不是很清楚,調度器的調度機制?

二、處理競爭狀態

並發程序避免不了的一個問題是對資源的同步訪問。如果多個goroutine在沒有互相同步的情況下去訪問同一個資源,並進行讀寫操作,這時goroutine就處於競爭狀態下:

//example3.go
package main

import (
   "sync"
   "runtime"
   "fmt"
)

var (
   //counter為訪問的資源
   counter int64
   wg      sync.WaitGroup
)

func addCount() {
   defer wg.Done()
   for count := 0; count < 2; count++ {
      value := counter
      //當前goroutine從線程退出
      runtime.Gosched()
      value++
      counter=value
   }
}

func main() {
   wg.Add(2)
   go addCount()
   go addCount()
   wg.Wait()
   fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4 或者counter: 2

這段程序中,goroutinecounter的讀寫操作沒有進行同步,goroutine 1對counter的寫結果可能被goroutine 2所覆蓋。Go可通過如下方式來解決這個問題:

  • 使用原子函數操作
  • 使用互斥鎖鎖住臨界區
  • 使用通道chan

1. 檢測競爭狀態

有時候競爭狀態並不能一眼就看出來。Go 提供了一個非常有用的工具,用於檢測競爭狀態。使用方式是:

go build -race example4.go//用競爭檢測器標誌來編譯程序
./example4 //運行程序

工具檢測出了程序存在一處競爭狀態,並指出發生競爭狀態的幾行代碼是:

22        counter=value
18        value := counter
28        go addCount()
29        go addCount()

2. 使用原子函數

對整形變量或指針的同步訪問,可以使用原子函數來進行。這裏使用原子函數來修復example4.go中的競爭狀態問題:

//example5.go
package main

import (
    "sync"
    "runtime"
    "fmt"
    "sync/atomic"
)

var (
    //counter為訪問的資源
    counter int64
    wg      sync.WaitGroup
)

func addCount() {
    defer wg.Done()
    for count := 0; count < 2; count++ {
        //使用原子操作來進行
        atomic.AddInt64(&counter,1)
        //當前goroutine從線程退出
        runtime.Gosched()
    }
}

func main() {
    wg.Add(2)
    go addCount()
    go addCount()
    wg.Wait()
    fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4

這裏使用atomic.AddInt64函數來對一個整形數據進行加操作,另外一些有用的原子操作還有:

atomic.StoreInt64() //寫
atomic.LoadInt64()  //讀

更多的原子操作函數請看atomic包中的聲明。

3. 使用互斥鎖

對臨界區的訪問,可以使用互斥鎖來進行。對於example4.go的競爭狀態,可以使用互斥鎖來解決:

//example5.go
package main

import (
   "sync"
   "runtime"
   "fmt"
)

var (
   //counter為訪問的資源
   counter int
   wg      sync.WaitGroup
   mutex   sync.Mutex
)

func addCount() {
   defer wg.Done()

   for count := 0; count < 2; count++ {
      //加上鎖,進入臨界區域
      mutex.Lock()
      {
         value := counter
         //當前goroutine從線程退出
         runtime.Gosched()
         value++
         counter = value
      }
      //離開臨界區,釋放互斥鎖
      mutex.Unlock()
   }
}
func main() {
   wg.Add(2)
   go addCount()
   go addCount()
   wg.Wait()
   fmt.Printf("counter: %d\n", counter)
}

//output:
counter: 4

使用Lock()Unlock()函數調用來定義臨界區,在同一個時刻內,只有一個goroutine能夠進入臨界區,直到調用Unlock()函數後,其他的goroutine才能夠進入臨界區。

在Go中解決共享資源安全訪問,更常用的使用通道chan。

三、利用通道共享數據

Go語言采用CSP消息傳遞模型。通過在goroutine之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。這裏就需要用到通道chan這種特殊的數據類型。當一個資源需要在goroutine中共享時,chan在goroutine中間架起了一個通道。通道使用make來創建:

unbuffered := make(char int) //創建無緩存通道,用於int類型數據共享
buffered := make(chan string,10)//創建有緩存通道,用於string類型數據共享
buffered<- "hello world" //向通道中寫入數據
value:= <-buffered //從通道buffered中接受數據

通道用於放置某一種類型的數據。創建通道時指定通道的大小,將創建有緩存的通道。無緩存通道是一種同步通信機制,它要求發送goroutine和接收goroutine都應該準備好,否則會進入阻塞。

1. 無緩存的通道

無緩存通道是同步的——一個goroutine向channel寫入消息的操作會一直阻塞,直到另一個goroutine從通道中讀取消息。反過來也是,一個goroutine從channel讀取消息的操作會一直阻塞,直到另一個goroutine向通道中寫入消息。《Go in action》中關於無緩存通道的解釋有一個非常棒的例子:網球比賽。在網球比賽中,兩位選手總是處在以下兩種狀態之一:要麽在等待接球,要麽在把球打向對方。球的傳遞可看為通道中數據傳遞。下面這段代碼使用通道模擬了這個過程:

//example6.go
package main

import (
    "sync"
    "fmt"
    "math/rand"
    "time"
)

var wg sync.WaitGroup

func player(name string, court chan int) {
    defer wg.Done()
    for {
        //如果通道關閉,那麽選手勝利
        ball, ok := <-court
        if !ok {
            fmt.Printf("Player %s Won\n", name)
            return
        }
        n := rand.Intn(100)

        //隨機概率使某個選手Miss
        if n%13 == 0 {
            fmt.Printf("Player %s Missed\n", name)
            //關閉通道
            close(court)
            return
        }
        fmt.Printf("Player %s Hit %d\n", name, ball)
        ball++
        //否則選手進行擊球
        court <- ball
    }
}


func main() {
    rand.Seed(time.Now().Unix())
    court := make(chan int)
    //等待兩個goroutine都執行完
    wg.Add(2)
    //選手1等待接球
    go player("candy", court)
    //選手2等待接球
    go player("luffic", court)
    //球進入球場(可以開始比賽了)
    court <- 1
    wg.Wait()
}
//output:
Player luffic Hit 1
Player candy Hit 2
Player luffic Hit 3
Player candy Hit 4
Player luffic Hit 5
Player candy Missed
Player luffic Won

2. 有緩存的通道

有緩存的通道是一種在被接收前能存儲一個或者多個值的通道,它與無緩存通道的區別在於:無緩存的通道保證進行發送和接收的goroutine會在同一時間進行數據交換,有緩存的通道沒有這種保證。有緩存通道讓goroutine阻塞的條件為:通道中沒有數據可讀的時候,接收動作會被阻塞;通道中沒有區域容納更多數據時,發送動作阻塞。向已經關閉的通道中發送數據,會引發panic,但是goroutine依舊能從通道中接收數據,但是不能再向通道裏發送數據。所以,發送端應該負責把通道關閉,而不是由接收端來關閉通道。

小結

  • goroutine被邏輯處理器執行,邏輯處理器擁有獨立的系統線程與運行隊列
  • 多個goroutine在一個邏輯處理器上可以並發執行,當機器有多個物理核心時,可通過多個邏輯處理器來並行執行。
  • 使用關鍵字 go 來創建goroutine。
  • 在Go中,競爭狀態出現在多個goroutine試圖同時去訪問一個資源時。
  • 可以使用互斥鎖或者原子函數,去防止競爭狀態的出現。
  • 在go中,更好的解決競爭狀態的方法是使用通道來共享數據。
  • 無緩沖通道是同步的,而有緩沖通道不是。

(完)


Tags: goroutine 調度 運行 線程 一個 操作系統

文章來源:


ads
ads

相關文章
ads

相關文章

ad