1. 程式人生 > >Go語言開發(九)、Go語言並發編程

Go語言開發(九)、Go語言並發編程

col 計數器 yield res 兩個 -c time 放心 還在

Go語言開發(九)、Go語言並發編程

一、goroutine簡介

1、並發與並行簡介

並行(parallel):指在同一時刻,有多條指令在多個處理器上同時執行。
並發(concurrency):指在同一時刻只能有一條指令執行,但多個進程指令被快速的輪換執行,使得在宏觀上具有多個進程同時執行的效果,但在微觀上並不是同時執行的,只是把時間分成若幹段,使多個進程快速交替的執行。
並行在多處理器系統中存在,而並發可以在單處理器和多處理器系統中都存在,並發能夠在單處理器系統中存在是因為並發是並行的假象,並行要求程序能夠同時執行多個操作,而並發只是要求程序假裝同時執行多個操作(每個小時間片執行一個操作,多個操作快速切換執行)。?

2、Coroutine簡介

Coroutine(協程)是一種用戶態的輕量級線程,特點如下:
A、輕量級線程
B、非搶占式多任務處理,由協程主動交出控制權。
C、編譯器/解釋器/虛擬機層面的任務
D、多個協程可能在一個或多個線程上運行。
E、子程序是協程的一個特例。
不同語言對協程的支持:
A、C++通過Boost.Coroutine實現對協程的支持
B、Java不支持
C、Python通過yield關鍵字實現協程,Python3.5開始使用async def對原生協程的支持

3、goroutine簡介

在Go語言中,只需要在函數調用前加上關鍵字go即可創建一個並發任務單元,新建的任務會被放入隊列中,等待調度器安排。

進程在啟動的時候,會創建一個主線程,主線程結束時,程序進程將終止,因此,進程至少有一個線程。main函數裏,必須讓主線程等待,確保進程不會被終止。
go語言中並發指的是讓某個函數獨立於其它函數運行的能力,一個goroutine是一個獨立的工作單元,Go的runtime(運行時)會在邏輯處理器上調度goroutine來運行,一個邏輯處理器綁定一個操作系統線程,因此goroutine不是線程,是一個協程。
進程:一個程序對應一個獨立程序空間
線程:一個執行空間,一個進程可以有多個線程
邏輯處理器:執行創建的goroutine,綁定一個線程
調度器:Go運行時中的,分配goroutine給不同的邏輯處理器
全局運行隊列:所有剛創建的goroutine隊列
本地運行隊列:邏輯處理器的goroutine隊列
當創建一個goroutine後,會先存放在全局運行隊列中,等待Go運行時的調度器進行調度,把goroutine分配給其中的一個邏輯處理器,並放到邏輯處理器對應的本地運行隊列中,最終等著被邏輯處理器執行即可。
Go的並發是管理、調度、執行goroutine的方式。
默認情況下,Go默認會給每個可用的物理處理器都分配一個邏輯處理器。
可以在程序開頭使用runtime.GOMAXPROCS(n)設置邏輯處理器的數量。
如果需要設置邏輯處理器的數量,一般采用如下代碼設置:
runtime.GOMAXPROCS(runtime.NumCPU())
對於並發,Go語言本身自己實現的調度,對於並行,與物理處理器的核數有關,多核就可以並行並發,單核只能並發。

4、goroutinue使用示例

在Go語言中,只需要在函數調用前加上關鍵字go即可創建一個並發任務單元,新建的任務會被放入隊列中,等待調度器安排。

package main

import (
   "fmt"
   "sync"
)

func main(){
   var wg sync.WaitGroup
   wg.Add(2)
   go func() {
      defer wg.Done()
      for i := 0; i < 10000; i++ {
         fmt.Printf("Hello,Go.This is %d\n", i)
      }
   }()
   go func() {
      defer wg.Done()
      for i := 0; i < 10000; i++ {
         fmt.Printf("Hello,World.This is %d\n", i)
      }
   }()
   wg.Wait()
}

sync.WaitGroup是一個計數的信號量,使main函數所在主線程等待兩個goroutine執行完成後再結束,否則兩個goroutine還在運行時,主線程已經結束。
sync.WaitGroup使用非常簡單,使用Add方法設設置計數器為2,每一個goroutine的函數執行完後,調用Done方法減1。Wait方法表示如果計數器大於0,就會阻塞,main函數會一直等待2個goroutine完成再結束。

5、goroutine的本質

goroutine是輕量級的線程,占用的資源非常小(Go將每個goroutine stack的size默認設置為2k)線程的切換由操作系統控制,而goroutine的切換則由用戶控制。
goroutinue本質上是協程。
?goroutinue可以實現並行,即多個goroutinue可以在多個處理器同時運行,而協程同一時刻只能在一個處理器上運行。
goroutine之間的通信是通過channel,而協程的通信是通過yield和resume()操作。

二、goroutine調度機制

1、線程調度模型

高級語言對內核線程的封裝實現通常有三種線程調度模型:
A、N:1模型。N個用戶空間線程在1個內核空間線程上運行,優勢是上下文切換非常快但無法利用多核系統的優點。
B、1:1模型。1個內核空間線程運行一個用戶空間線程,充分利用了多核系統的優勢但上下文切換非常慢,因為每一次調度都會在用戶態和內核態之間切換。
C、M:N模型。每個用戶線程對應多個內核空間線程,同時也可以一個內核空間線程對應多個用戶空間線程,使用任意個內核模型管理任意個goroutine,但缺點是調度的復雜性。

2、Go調度器簡介

Go的最小調度單元為goroutine,但操作系統最小的調度單元依然是線程,所以go調度器(go scheduler)要做的工作是如何將眾多的goroutine放在有限的線程上進行高效而公平的調度。
操作系統的調度不失為高效和公平,比如CFS調度算法。go引入goroutine的核心原因是goroutine輕量級,無論是從進程到線程,還是從線程到goroutine,其核心都是為了使調度單元更加輕量級,可以輕易創建幾萬幾十萬的goroutine而不用擔心內存耗盡等問題。go引入goroutine試圖在語言內核層做到足夠高性能得同時(充分利用多核優勢、使用epoll高效處理網絡/IO、實現垃圾回收等機制)盡量簡化編程。

3、Go調度器實現原理

?Go 1.1開始,Go scheduler實現了M:N的G-P-M線程調度模型,即任意數量的用戶態goroutine可以運行在任意數量的內核空間線程線程上,不僅可以使上線文切換更加輕量級,又可以充分利用多核優勢。
技術分享圖片
為了實現M:N線程調度機制,Go引入了3個結構體:
M:操作系統的內核空間線程
G:goroutine對象,G結構體包含調度一個goroutine所需要的堆棧和instruction pointer(IP指令指針),以及其它一些重要的調度信息。每次go調用的時候,都會創建一個G對象。
P:Processor,調度的上下文,實現M:N調度模型的關鍵,M必須拿到P才能對G進行調度,P限定了go調度goroutine的最大並發度。每一個運行的M都必須綁定一個P。
P的個數是GOMAXPROCS(最大256),啟動時固定,一般不修改;?M的個數和P的個數不一定相同(會有休眠的M或者不需要太多的M);每一個P保存著本地G任務隊列,也能使用全局G任務隊列。
技術分享圖片
全局G任務隊列會和各個本地G任務隊列按照一定的策略互相交換。
P是用一個全局數組(255)來保存的,並且維護著一個全局的P空閑鏈表。
每次調用go的時候,都會:
A、創建一個G對象,加入到本地隊列或者全局隊列
B、如果有空閑的P,則創建一個M
C、M會啟動一個底層線程,循環執行能找到的G任務
D、G任務的執行順序是先從本地隊列找,本地沒有則從全局隊列找(一次性轉移(全局G個數/P個數)個,再去其它P中找(一次性轉移一半)。
E、G任務執行是按照隊列順序(即調用go的順序)執行的。
創建一個M過程如下:
A、先找到一個空閑的P,如果沒有則直接返回。
B、調用系統API創建線程,不同的操作系統調用方法不一樣。
C、?在創建的線程裏循環執行G任務
如果一個系統調用或者G任務執行太長,會一直占用內核空間線程,由於本地隊列的G任務是順序執行的,其它G任務就會阻塞。因此,Go程序啟動的時候,會專門創建一個線程sysmon,用來監控和管理,sysmon內部是一個循環:
A、記錄所有P的G任務計數schedtick,schedtick會在每執行一個G任務後遞增。
B、如果檢查到?schedtick一直沒有遞增,說明P一直在執行同一個G任務,如果超過一定的時間(10ms),在G任務的棧信息裏面加一個標記。
C、G任務在執行的時候,如果遇到非內聯函數調用,就會檢查一次標記,然後中斷自己,把自己加到隊列末尾,執行下一個G。
D、如果沒有遇到非內聯函數(有時候正常的小函數會被優化成內聯函數)調用,會一直執行G任務,直到goroutine自己結束;如果goroutine是死循環,並且GOMAXPROCS=1,阻塞。

4、搶占式調度

Go沒有時間片的概念。如果某個G沒有進行system call調用、沒有進行I/O操作、沒有阻塞在一個channel操作上,M通過搶占式調度讓長任務G停下來並調度下一個G。
除非極端的無限循環或死循環,否則只要G調用函數,Go runtime就有搶占G的機會。Go程序啟動時,Go runtime會啟動一個名為sysmon的M(一般稱為監控線程),sysmon無需綁定P即可運行。sysmon是GO程序啟動時創建的一個用於監控管理的線程。
sysmon每20us~10ms啟動一次,sysmon主要完成如下工作:
A、釋放閑置超過5分鐘的span物理內存;
B、如果超過2分鐘沒有垃圾回收,強制執行;
C、將長時間未處理的netpoll結果添加到任務隊列;
D、向長時間運行的G任務發出搶占調度;
E、收回因syscall長時間阻塞的P;
如果一個G任務運行10ms,sysmon就會認為其運行時間太久而發出搶占式調度的請求。一旦G的搶占標誌位被設為true,那麽待G下一次調用函數或方法時,runtime便可以將G搶占,並移出運行狀態,放入P的local runq中,等待下一次被調度。

三、runtime包

1、Gosched

runtime.Gosched()用於讓出CPU時間片,讓出當前goroutine的執行權限,調度器安排其它等待的任務運行,並在下次某個時候從該位置恢復執行。

2、Goexit

調用runtime.Goexit()將立即終止當前goroutine執?,調度器確保所有已註冊defer延遲調用被執行。

3、GOMAXPROCS

調用runtime.GOMAXPROCS()用來設置可以並行計算的CPU核數的最大值,並返回設置前的值。

四、Channel通道

1、Channel簡介

Channel是goroutine之間通信的通道,用於goroutine之間發消息和接收消息。Channel是一種引用類型的數據,可以作為參數,也可以作為返回值。

2、Channel的創建

channel聲明使用chan關鍵字,channel的創建需要指定通道中發送和接收數據的類型。
使用make來建立一個信道:

var channel chan int = make(chan int)
// 或channel := make(chan int)

make有第二個參數,用於指定通道的大小。

3、Channel的操作

//發送數據:寫
channel<- data
//接收數據:讀
data := <- channel

關閉通道:發送方關閉通道,用於通知接收方已經沒有數據
關閉通道後,其它goroutine訪問通道獲取數據時,得到零值和false
有條件結束死循環:

for{
   v ,ok := <- chan
   if ok== false{
      //通道已經關閉。。
      break
   }
}
//循環從通道中獲取數據,直到通道關閉。
for v := range channel{
   //從通道讀取數據
}

Channel使用示例如下:

package main

import (
   "fmt"
   "time"
)

type Person struct {
   name string
   age uint8
   address Address
}

type Address struct {
   city string
   district string
}

func SendMessage(person *Person, channel chan Person){
   go func(person *Person, channel chan Person) {
      fmt.Printf("%s send a message.\n", person.name)
      channel<-*person
      for i := 0; i < 5; i++ {
         channel<- *person
      }
      close(channel)
      fmt.Println("channel is closed.")
   }(person, channel)
}

func main() {
   channel := make(chan Person,1)
   harry := Person{
      "Harry",
      30,
      Address{"London","Oxford"},
   }
   go SendMessage(&harry, channel)
   data := <-channel
   fmt.Printf("main goroutine receive a message from %s.\n", data.name)
   for {
      i, ok := <-channel
      time.Sleep(time.Second)
      if !ok {
         fmt.Println("channel is empty.")
         break
      }else{
         fmt.Printf("receive %s\n",i.name)
      }
   }
}

結果如下:

Harry send a message.
main goroutine receive a message from Harry.
receive Harry
receive Harry
receive Harry
channel is closed.
receive Harry
receive Harry
channel is empty.

Go運行時系統並沒有在通道channel被關閉後立即把false作為相應接收操作的第二個結果,而是等到接收端把已在通道中的所有元素值都接收到後才這樣做,確保在發送端關閉通道的安全性。
被關閉的通道會禁止數據流入, 是只讀的,仍然可以從關閉的通道中取出數據,但不能再寫入數據。
給一個nil的channel發送數據,造成永遠阻塞?;從一個nil的channel接收數據,造成永遠阻塞。給一個已經關閉的channel發送數據,引起panic?;
從一個已經關閉的channel接收數據,返回帶緩存channel中緩存的值,如果通道中無緩存,返回0。

4、無緩沖通道

make創建通道時,默認沒有第二個參數,通道的大小為0,稱為無緩沖通道。
無緩沖的通道是指通道的大小為0,即通道在接收前沒有能力保存任何值,無緩沖通道發送goroutine和接收gouroutine必須是同步的,如果沒有同時準備好,先執行的操作就會阻塞等待,直到另一個相對應的操作準備好為止。無緩沖通道也稱為同步通道。
無緩沖的信道永遠不會存儲數據,只負責數據的流通。從無緩沖信道取數據,必須要有數據流進來才可以,否則當前goroutine會阻塞;數據流入無緩沖信道, 如果沒有其它goroutine來拿取走數據,那麽當前goroutine會阻塞。

package main

import (
   "fmt"
)

func main() {
   ch := make(chan int)
   go func() {
      var sum int = 0
      for i := 0; i < 10; i++ {
         sum += i
      }
      //發送數據到通道
      ch <- sum
   }()
   //從通道接收數據
   fmt.Println(<-ch)
}

在計算sum和的goroutine沒有執行完,將值賦發送到ch通道前,fmt.Println(<-ch)會一直阻塞等待,main函數所在的主goroutine就不會終止,只有當計算和的goroutine完成後,並且發送到ch通道的操作準備好後,main函數的<-ch會接收計算好的值,然後打印出來。
無緩存通道的發送數據和讀取數據的操作不能放在同一個協程中,防止發生死鎖。通常,先創建一個goroutine對通道進行操作,此時新創建goroutine會阻塞,然後再在主goroutine中進行通道的反向操作,實現goroutine解鎖,即必須goroutine在前,解鎖goroutine在後。

5、有緩沖通道

make創建通道時,指定通道的大小時,稱為有緩沖通道。
對於帶緩存通道,只要通道中緩存不滿,可以一直向通道中發送數據,直到緩存已滿;同理只要通道中緩存不為0,可以一直從通道中讀取數據,直到通道的緩存變為0才會阻塞。
相對於不帶緩存通道,帶緩存通道不易造成死鎖,可以同時在一個goroutine中放心使用。
帶緩存通道不僅可以流通數據,還可以緩存數據,當帶緩存通道達到滿的狀態的時候才會阻塞,此時帶緩存通道不能再承載更多的數據。
帶緩存通道是先進先出的。

6、單向通道

對於某些特殊的場景,需要限制一個通道只可以接收,不能發送;限制一個通道只能發送,不能接收。只能單向接收或發送的通道稱為單向通道。
定義單向通道只需要在定義的時候,帶上<-即可。

var send chan<- int //只能發送
var receive <-chan int //只能接收

<-操作符的位置在後面只能發送,對應發送操作;<-操作符的位置在前面只能接收,對應接收操作。
單向通道通常用於函數或者方法的參數。

五、channel應用

1、廣播功能實現

當一個通道關閉時, 所有對此通道的讀取的goroutine都會退出阻塞。

package main

import (
   "fmt"
   "time"
)

func notify(id int, channel chan int){
   <-channel//接收到數據或通道關閉時退出阻塞
   fmt.Printf("%d receive a message.\n", id)
}

func broadcast(channel chan int){
   fmt.Printf("Broadcast:\n")
   close(channel)//關閉通道
}

func main(){
   channel := make(chan int,1)

   for i:=0;i<10 ;i++  {
      go notify(i,channel)
   }
   go broadcast(channel)
   time.Sleep(time.Second)
}

2、select使用

select用於在多個channel上同時進行偵聽並收發消息,當任何一個case滿足條件時即執行,如果沒有可執行的case則會執行default的case,如果沒有指定default case,則會阻塞程序。select的語法如下:

select {
case communication clause :
   statement(s);
case communication clause :
   statement(s);
   /*可以定義任意數量的 case */
default : /*可選 */
   statement(s);
}

Select多路復用中:
A、每個case都必須是一次通信
B、所有channel表達式都會被求值
C、所有被發送的表達式都會被求值
D、如果任意某個通信可以進行,它就執行;其它被忽略。
E、如果有多個case都可以運行,Select會隨機公平地選出一個執行。其它不會執行。
F、否則,如果有default子句,則執行default語句。如果沒有default子句,select將阻塞,直到某個通信可以運行;Go不會重新對channel或值進行求值。

package main

import (
   "fmt"
   "time"
)

func doWork(channels *[10]chan int){
   for {
      select {
      case x1 := <-channels[0]:
         fmt.Println("receive x1: ",x1)
      case x2 := <-channels[1]:
         fmt.Println("receive x2: ",x2)
      case x3 := <-channels[2]:
         fmt.Println("receive x3: ",x3)
      case x4 := <-channels[3]:
         fmt.Println("receive x4: ",x4)
      case x5 := <-channels[4]:
         fmt.Println("receive x5: ",x5)
      case x6 := <-channels[5]:
         fmt.Println("receive x6: ",x6)
      case x7 := <-channels[6]:
         fmt.Println("receive x7: ",x7)
      case x8 := <-channels[7]:
         fmt.Println("receive x8: ",x8)
      case x9 := <-channels[8]:
         fmt.Println("receive x9: ",x9)
      case x10 := <-channels[9]:
         fmt.Println("receive x10: ",x10)
      }
   }
}

func main(){
   var channels [10]chan int
   go doWork(&channels)
   for i := 0; i < 10; i++ {
      channels[i] = make(chan int,1)
      channels[i]<- i
   }
   time.Sleep(time.Second*5)
}

結果如下:

receive x4:  3
receive x10:  9
receive x9:  8
receive x5:  4
receive x2:  1
receive x7:  6
receive x8:  7
receive x1:  0
receive x3:  2
receive x6:  5

六、死鎖

Go程序中死鎖是指所有的goroutine在等待資源的釋放。
通常,死鎖的報錯信息如下:
fatal error: all goroutines are asleep - deadlock!
Goroutine死鎖產生的原因如下:
A、只在單一的goroutine裏操作無緩沖信道,一定死鎖
B、非緩沖信道上如果發生流入無流出,或者流出無流入,會導致死鎖
因此,解決死鎖的方法有:
A、取走無緩沖通道的數據或是發送數據到無緩沖通道
B、使用緩沖通道

Go語言開發(九)、Go語言並發編程