本文的主要內容是:
- 了解goroutine,使用它來運行程序
- 了解Go是如何檢測並修正競爭狀態的(解決資源互斥訪問的方式)
- 了解並使用通道chan來同步goroutine
一、使用goroutine來運行程序
1.Go的並發與並行
Go的並發能力,是指讓某個函數獨立於其他函數運行的能力。當為一個函數創建goroutine
時,該函數將作為一個獨立的工作單元,被 調度器 調度到可用的邏輯處理器上執行。Go的運行時調度器是個復雜的軟件,它做的工作大致是:
- 管理被創建的所有goroutine,為其分配執行時間
- 將操作系統線程與語言運行時的邏輯處理器綁定
參考The Go scheduler ,這裏較淺顯地說一下Go的運行時調度器。操作系統會在物理處理器上調度操作系統線程
來運行,而Go語言的運行時會在邏輯處理器
上調度goroutine
來運行,每個邏輯處理器都分別綁定到單個操作系統線程上。這裏涉及到三個角色:
- M:操作系統線程,這是真正的內核OS線程
- P:邏輯處理器,代表著調度的上下文,它使goroutine在一個M上跑
- G:goroutine,擁有自己的棧,指令指針等信息,被P調度
每個P會維護一個全局運行隊列(稱為runqueue),處於ready就緒狀態的goroutine
(灰色G)被放在這個隊列中等待被調度。在編寫程序時,每當go func
啟動一個goroutine
時,runqueue
便在尾部加入一個goroutine
。在下一個調度點上,P就從runqueue
中取出一個goroutine
出來執行(藍色G)。
當某個操作系統線程M阻塞的時候(比如goroutine
執行了阻塞的系統調用),P可以綁定到另外一個操作系統線程M上,讓運行隊列中的其他goroutine
繼續執行:
上圖中G0執行了阻塞操作,M0被阻塞,P將在新的系統線程M1上繼續調度G執行。M1有可能是被新創建的,或者是從線程緩存中取出。Go調度器保證有足夠的線程來運行所有的P,語言運行時默認限制每個程序最多創建10000個線程,這個現在可以通過調用runtime/debug包的SetMaxThreads
方法來更改。
Go可以在在一個邏輯處理器P上實現並發,如果需要並行,必須使用多於1個的邏輯處理器。Go調度器會把goroutine
平等分配到每個邏輯處理器上,此時goroutine
將在不同的線程上運行,不過前提是要求機器擁有多個物理處理器。
2.創建goroutine
使用關鍵字go
來創建一個goroutine
,並讓所有的goroutine
都得到執行:
//example1.go
package main
import (
"runtime"
"sync"
"fmt"
)
var (
wg sync.WaitGroup
)
func main() {
//分配一個邏輯處理器P給調度器使用
runtime.GOMAXPROCS(1)
//在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
wg.Add(2)
//聲明1個匿名函數,並創建一個goroutine
fmt.Printf("Begin Coroutines\n")
go func() {
//在函數退出時,wg計數器減1
defer wg.Done()
//打印3次小寫字母表
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
//聲明1個匿名函數,並創建一個goroutine
go func() {
defer wg.Done()
//打印大寫字母表3次
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
fmt.Printf("%c ", char)
}
}
}()
fmt.Printf("Waiting To Finish\n")
//等待2個goroutine執行完畢
wg.Wait()
}
這個程序使用runtime.GOMAXPROCS(1)
來分配一個邏輯處理器給調度器使用,兩個goroutine
將被該邏輯處理器調度並發執行。程序輸出:
Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
從輸出來看,是先執行完一個goroutine
,再接著執行第二個goroutine
的,大寫字母全部打印完後,再打印全部的小寫字母。那麽,有沒有辦法讓兩個goroutine
並行執行呢?為程序指定兩個邏輯處理器即可:
//修改為2個邏輯處理器
runtime.GOMAXPROCS(2)
此時執行程序,輸出為:
Begin Coroutines
Waiting To Finish
A B C D E a b c d e f g h i j k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z a b c F G H I J K L M N O P Q R S T U V W X d e f g h i j k l m n o p q r s Y Z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z t u v w x y z A B C D E F G H I J K L M N O P Q R S T U V W X Y Z
那如果只有1個邏輯處理器,如何讓兩個goroutine交替被調度?實際上,如果goroutine
需要很長的時間才能運行完,調度器的內部算法會將當前運行的goroutine
讓出,防止某個goroutine
長時間占用邏輯處理器。由於示例程序中兩個goroutine
的執行時間都很短,在為引起調度器調度之前已經執行完。不過,程序也可以使用runtime.Gosched()
來將當前在邏輯處理器上運行的goruntine
讓出,讓另一個goruntine
得到執行:
//example2.go
package main
import (
"runtime"
"sync"
"fmt"
)
var (
wg sync.WaitGroup
)
func main() {
//分配一個邏輯處理器P給調度器使用
runtime.GOMAXPROCS(1)
//在這裏,wg用於等待程序完成,計數器加2,表示要等待兩個goroutine
wg.Add(2)
//聲明1個匿名函數,並創建一個goroutine
fmt.Printf("Begin Coroutines\n")
go func() {
//在函數退出時,wg計數器減1
defer wg.Done()
//打印3次小寫字母表
for count := 0; count < 3; count++ {
for char := 'a'; char < 'a'+26; char++ {
if char=='k'{
runtime.Gosched()
}
fmt.Printf("%c ", char)
}
}
}()
//聲明1個匿名函數,並創建一個goroutine
go func() {
defer wg.Done()
//打印大寫字母表3次
for count := 0; count < 3; count++ {
for char := 'A'; char < 'A'+26; char++ {
if char == 'K'{
runtime.Gosched()
}
fmt.Printf("%c ", char)
}
}
}()
fmt.Printf("Waiting To Finish\n")
//等待2個goroutine執行完畢
wg.Wait()
}
兩個goroutine
在循環的字符為k/K的時候會讓出邏輯處理器,程序的輸出結果為:
Begin Coroutines
Waiting To Finish
A B C D E F G H I J K L M N O P Q R S T U V W X Y Z A B C D E F G H I J a b c d e f g h i j K L M N O P Q R S T U V W X Y Z A B C D E F G H I J k l m n o p q r s t u v w x y z a b c d e f g h i j K L M N O P Q R S T U V W X Y Z k l m n o p q r s t u v w x y z a b c d e f g h i j k l m n o p q r s t u v w x y z
這裏大小寫字母果然是交替著輸出了。不過從輸出可以看到,第一次輸出大寫字母時遇到K沒有讓出邏輯處理器,這是什麽原因還不是很清楚,調度器的調度機制?
二、處理競爭狀態
並發程序避免不了的一個問題是對資源的同步訪問。如果多個goroutine
在沒有互相同步的情況下去訪問同一個資源,並進行讀寫操作,這時goroutine
就處於競爭狀態下:
//example3.go
package main
import (
"sync"
"runtime"
"fmt"
)
var (
//counter為訪問的資源
counter int64
wg sync.WaitGroup
)
func addCount() {
defer wg.Done()
for count := 0; count < 2; count++ {
value := counter
//當前goroutine從線程退出
runtime.Gosched()
value++
counter=value
}
}
func main() {
wg.Add(2)
go addCount()
go addCount()
wg.Wait()
fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4 或者counter: 2
這段程序中,goroutine
對counter
的讀寫操作沒有進行同步,goroutine 1對counter的寫結果可能被goroutine 2所覆蓋。Go可通過如下方式來解決這個問題:
- 使用原子函數操作
- 使用互斥鎖鎖住臨界區
- 使用通道
chan
1. 檢測競爭狀態
有時候競爭狀態並不能一眼就看出來。Go 提供了一個非常有用的工具,用於檢測競爭狀態。使用方式是:
go build -race example4.go//用競爭檢測器標誌來編譯程序
./example4 //運行程序
工具檢測出了程序存在一處競爭狀態,並指出發生競爭狀態的幾行代碼是:
22 counter=value
18 value := counter
28 go addCount()
29 go addCount()
2. 使用原子函數
對整形變量或指針的同步訪問,可以使用原子函數來進行。這裏使用原子函數來修復example4.go中的競爭狀態問題:
//example5.go
package main
import (
"sync"
"runtime"
"fmt"
"sync/atomic"
)
var (
//counter為訪問的資源
counter int64
wg sync.WaitGroup
)
func addCount() {
defer wg.Done()
for count := 0; count < 2; count++ {
//使用原子操作來進行
atomic.AddInt64(&counter,1)
//當前goroutine從線程退出
runtime.Gosched()
}
}
func main() {
wg.Add(2)
go addCount()
go addCount()
wg.Wait()
fmt.Printf("counter: %d\n",counter)
}
//output:
counter: 4
這裏使用atomic.AddInt64
函數來對一個整形數據進行加操作,另外一些有用的原子操作還有:
atomic.StoreInt64() //寫
atomic.LoadInt64() //讀
更多的原子操作函數請看atomic
包中的聲明。
3. 使用互斥鎖
對臨界區的訪問,可以使用互斥鎖來進行。對於example4.go的競爭狀態,可以使用互斥鎖來解決:
//example5.go
package main
import (
"sync"
"runtime"
"fmt"
)
var (
//counter為訪問的資源
counter int
wg sync.WaitGroup
mutex sync.Mutex
)
func addCount() {
defer wg.Done()
for count := 0; count < 2; count++ {
//加上鎖,進入臨界區域
mutex.Lock()
{
value := counter
//當前goroutine從線程退出
runtime.Gosched()
value++
counter = value
}
//離開臨界區,釋放互斥鎖
mutex.Unlock()
}
}
func main() {
wg.Add(2)
go addCount()
go addCount()
wg.Wait()
fmt.Printf("counter: %d\n", counter)
}
//output:
counter: 4
使用Lock()
與Unlock()
函數調用來定義臨界區,在同一個時刻內,只有一個goroutine能夠進入臨界區,直到調用Unlock()
函數後,其他的goroutine才能夠進入臨界區。
在Go中解決共享資源安全訪問,更常用的使用通道chan。
三、利用通道共享數據
Go語言采用CSP消息傳遞模型。通過在goroutine
之間傳遞數據來傳遞消息,而不是對數據進行加鎖來實現同步訪問。這裏就需要用到通道chan
這種特殊的數據類型。當一個資源需要在goroutine
中共享時,chan在goroutine
中間架起了一個通道。通道使用make
來創建:
unbuffered := make(char int) //創建無緩存通道,用於int類型數據共享
buffered := make(chan string,10)//創建有緩存通道,用於string類型數據共享
buffered<- "hello world" //向通道中寫入數據
value:= <-buffered //從通道buffered中接受數據
通道用於放置某一種類型的數據。創建通道時指定通道的大小,將創建有緩存的通道。無緩存通道是一種同步通信機制,它要求發送goroutine
和接收goroutine
都應該準備好,否則會進入阻塞。
1. 無緩存的通道
無緩存通道是同步的——一個goroutine
向channel寫入消息的操作會一直阻塞,直到另一個goroutine
從通道中讀取消息。反過來也是,一個goroutine
從channel讀取消息的操作會一直阻塞,直到另一個goroutine
向通道中寫入消息。《Go in action》中關於無緩存通道的解釋有一個非常棒的例子:網球比賽。在網球比賽中,兩位選手總是處在以下兩種狀態之一:要麽在等待接球,要麽在把球打向對方。球的傳遞可看為通道中數據傳遞。下面這段代碼使用通道模擬了這個過程:
//example6.go
package main
import (
"sync"
"fmt"
"math/rand"
"time"
)
var wg sync.WaitGroup
func player(name string, court chan int) {
defer wg.Done()
for {
//如果通道關閉,那麽選手勝利
ball, ok := <-court
if !ok {
fmt.Printf("Player %s Won\n", name)
return
}
n := rand.Intn(100)
//隨機概率使某個選手Miss
if n%13 == 0 {
fmt.Printf("Player %s Missed\n", name)
//關閉通道
close(court)
return
}
fmt.Printf("Player %s Hit %d\n", name, ball)
ball++
//否則選手進行擊球
court <- ball
}
}
func main() {
rand.Seed(time.Now().Unix())
court := make(chan int)
//等待兩個goroutine都執行完
wg.Add(2)
//選手1等待接球
go player("candy", court)
//選手2等待接球
go player("luffic", court)
//球進入球場(可以開始比賽了)
court <- 1
wg.Wait()
}
//output:
Player luffic Hit 1
Player candy Hit 2
Player luffic Hit 3
Player candy Hit 4
Player luffic Hit 5
Player candy Missed
Player luffic Won
2. 有緩存的通道
有緩存的通道是一種在被接收前能存儲一個或者多個值的通道,它與無緩存通道的區別在於:無緩存的通道保證進行發送和接收的goroutine會在同一時間進行數據交換,有緩存的通道沒有這種保證。有緩存通道讓goroutine
阻塞的條件為:通道中沒有數據可讀的時候,接收動作會被阻塞;通道中沒有區域容納更多數據時,發送動作阻塞。向已經關閉的通道中發送數據,會引發panic,但是goroutine
依舊能從通道中接收數據,但是不能再向通道裏發送數據。所以,發送端應該負責把通道關閉,而不是由接收端來關閉通道。
小結
- goroutine被邏輯處理器執行,邏輯處理器擁有獨立的系統線程與運行隊列
- 多個goroutine在一個邏輯處理器上可以並發執行,當機器有多個物理核心時,可通過多個邏輯處理器來並行執行。
- 使用關鍵字 go 來創建goroutine。
- 在Go中,競爭狀態出現在多個goroutine試圖同時去訪問一個資源時。
- 可以使用互斥鎖或者原子函數,去防止競爭狀態的出現。
- 在go中,更好的解決競爭狀態的方法是使用通道來共享數據。
- 無緩沖通道是同步的,而有緩沖通道不是。
(完)
Tags: goroutine 調度 運行 線程 一個 操作系統
文章來源: