1. 程式人生 > >多圖詳解Go中的Channel原始碼

多圖詳解Go中的Channel原始碼

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 本文使用的go的原始碼時14.4 ## chan介紹 ```go package main import "fmt" func main() { c := make(chan int) go func() { c <- 1 // send to channel }() x := <-c // recv from channel fmt.Println(x) } ``` 我們可以這樣檢視彙編結果: ``` go tool compile -N -l -S hello.go -N表示禁用優化 -l禁用內聯 -S列印結果 ``` 通過上面這樣的方式,我們可以直到chan是呼叫的哪些函式: ![ch](https://img.luozhiyun.com/20210110110750.svg) ## 原始碼分析 ### 結構體與建立 ```go type hchan struct { qcount uint // 迴圈列表元素個數 dataqsiz uint // 迴圈佇列的大小 buf unsafe.Pointer // 迴圈佇列的指標 elemsize uint16 // chan中元素的大小 closed uint32 // 是否已close elemtype *_type // chan中元素型別 sendx uint // send在buffer中的索引 recvx uint // recv在buffer中的索引 recvq waitq // receiver的等待佇列 sendq waitq // sender的等待佇列 // 互拆鎖 lock mutex } ``` qcount代表chan 中已經接收但還沒被取走的元素的個數,函式 len 可以返回這個欄位的值; dataqsiz和buf分別代表隊列buffer的大小,cap函式可以返回這個欄位的值以及佇列buffer的指標,是一個定長的環形陣列; elemtype 和 elemsiz表示chan 中元素的型別和 元素的大小; sendx:傳送資料的指標在 buffer中的位置; recvx:接收請求時的指標在 buffer 中的位置; recvq和sendq分別表示等待接收資料的 goroutine 與等待發送資料的 goroutine; sendq和recvq的型別是waitq的結構體: ```go type waitq struct { first *sudog last *sudog } ``` waitq裡面連線的是一個sudog雙向連結串列,儲存的是等待的goroutine 。整個chan的圖例大概是這樣: ![Group 40](https://img.luozhiyun.com/20210110110753.png) 下面看一下建立chan,我們通過彙編結果也可以檢視到`make(chan int)`這句程式碼會呼叫到runtime的makechan函式中: ```go const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ) func makechan(t *chantype, size int) *hchan { elem := t.elem // 略去檢查程式碼 ... //計算需要分配的buf空間 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // chan的size或者元素的size是0,不必建立buf c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不是指標,分配一塊連續的記憶體給hchan資料結構和buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 表示hchan後面在記憶體裡緊跟著就是buf c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指標,那麼單獨分配buf c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c } ``` 首先我們可以看到計算hchanSize: ```go maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ``` maxAlign是8,那麼maxAlign-1的二進位制就是111,然後和int(unsafe.Sizeof(hchan{}))取與就是取它的低三位,hchanSize就得到的是8的整數倍,做對齊使用。 這裡switch有三種情況,第一種情況是緩衝區所需大小為 0,那麼在為 hchan 分配記憶體時,只需要分配 sizeof(hchan) 大小的記憶體; 第二種情況是緩衝區所需大小不為 0,而且資料型別不包含指標,那麼就分配連續的記憶體。注意的是,我們在建立channel的時候可以指定型別為指標型別: ```go //chan裡存入的是int的指標 c := make(chan *int) //chan裡存入的是int的值 c := make(chan int) ``` 第三種情況是緩衝區所需大小不為 0,而且資料型別包含指標,那麼就不使用add的方式讓hchan和buf放在一起了,而是單獨的為buf申請一塊記憶體。 ### 傳送資料 #### channel的阻塞非阻塞 在看傳送資料的程式碼之前,我們先看一下什麼是channel的阻塞和非阻塞。 一般情況下,傳入的引數都是 `block=true`,即阻塞呼叫,一個往 channel 中插入資料的 goroutine 會阻塞到插入成功為止。 非阻塞是隻這種情況: ```go select { case c <- v: ... foo default: ... bar } ``` 編譯器會將其改為: ```go if selectnbsend(c, v) { ... foo } else { ... bar } ``` selectnbsend方法傳入的block就是false: ```go func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) } ``` #### chansend方法 向通道傳送資料我們通過彙編結果可以發現是在runtime 中通過 **chansend** 實現的,方法比較長下面我們分段來進行理解: ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { // 對於非阻塞的傳送,直接返回 if !block { return false } // 對於阻塞的通道,將 goroutine 掛起 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } ... } ``` 這裡會對chan做一個判斷,如果它是空的,那麼對於非阻塞的傳送,直接返回 false;對於阻塞的通道,將 goroutine 掛起,並且永遠不會返回。 ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 非阻塞的情況下,如果通道沒有關閉,滿足以下一條: // 1.沒有緩衝區並且當前沒有接收者 // 2.緩衝區不為0,並且已滿 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... } ``` 需要注意的是這裡是沒有加鎖的,go雖然在使用指標讀取單個值的時候原子性的,但是讀取多個值並不能保證,所以在判斷完closed雖然是沒有關閉的,那麼在讀取完之後依然可能在這一瞬間從未關閉狀態轉變成關閉狀態。那麼就有兩種可能: * 通道沒有關閉,而且已經滿了,那麼需要返回false,沒有問題; * 通道關閉,而且已經滿了,但是在非阻塞的傳送中返回false,也沒有問題; 有關go的一致性原語,可以看這篇:[The Go Memory Model](https://golang.org/ref/mem)。 上面的這些判斷被稱為 fast path,因為加鎖的操作是一個很重的操作,所以能夠在加鎖之前返回的判斷就在加鎖之前做好是最好的。 下面接著看看加鎖部分的程式碼: ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... //加鎖 lock(&c.lock) // 是否關閉的判斷 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 從 recvq 中取出一個接收者 if sg := c.recvq.dequeue(); sg != nil { // 如果接收者存在,直接向該接收者傳送資料,繞過buffer send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... } ``` 進入了lock區域之後還需要再判斷以下close的狀態,然後從recvq 中取出一個接收者,如果已經有接收者,那麼就向第一個接收者傳送當前enqueue的訊息。這裡需要注意的是如果有接收者在佇列中等待,則說明此時的緩衝區是空的。 既然是一行行分析程式碼,那麼我們再進入到send看一下實現: ```go func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { // 直接把要傳送的資料copy到reciever的棧空間 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒對應的 goroutine goready(gp, skip+1) } ``` 在send方法裡,sg就是goroutine打包好的物件,ep是對應要傳送資料的指標,sendDirect方法會呼叫memmove進行資料的記憶體拷貝。然後goready函式會喚醒對應的 goroutine進行排程。 回到chansend方法,繼續往下看: ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 如果緩衝區沒有滿,直接將要傳送的資料複製到緩衝區 if c.qcount < c.dataqsiz { // 找到buf要填充資料的索引位置 qp := chanbuf(c, c.sendx) ... // 將資料拷貝到 buffer 中 typedmemmove(c.elemtype, qp, ep) // 資料索引前移,如果到了末尾,又從0開始 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素個數加1,釋放鎖並返回 c.qcount++ unlock(&c.lock) return true } ... } ``` 這裡會判斷buf緩衝區有沒有滿,如果沒有滿,那麼就找到buf要填充資料的索引位置,呼叫typedmemmove方法將資料拷貝到buf中,然後重新設值sendx偏移量。 ```go func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 緩衝區沒有空間了,所以對於非阻塞呼叫直接返回 if !block { unlock(&c.lock) return false } // 建立 sudog 物件 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 將sudog 物件入隊 c.sendq.enqueue(mysg) // 進入等待狀態 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... } ``` 這裡會做兩部分的操作,對於非阻塞的呼叫會直接返回;對於阻塞的呼叫會建立sudog 物件,然後將sudog物件入隊之後gopark將 goroutine 轉入 waiting 狀態,並解鎖。呼叫gopark之後,在使用者看來該向 channel 傳送資料的程式碼語句會進行阻塞。 這裡也需要注意一下,如果緩衝區為0,那麼也會進入到這裡,會呼叫到gopark立馬阻塞,所以在使用的時候需要記得接收資料,防止向chan傳送資料的那一端永遠阻塞,如: ```go func process(timeout time.Duration) bool { ch := make(chan bool) go func() { // 模擬處理耗時的業務 time.Sleep((timeout + time.Second)) ch <- true // block fmt.Println("exit goroutine") }() select { case result := <-ch: return result case <-time.After(timeout): return false } } ``` 如果這裡在select的時候直接timeout返回了,而沒有呼叫` result := <-ch`,那麼goroutine 就會永遠阻塞。 到這裡傳送的程式碼就講解完了,整個流程大致如下: 比如我要執行:`ch<-10`
1. 檢查 recvq 是否為空,如果不為空,則從 recvq 頭部取一個 goroutine,將資料傳送過去; 2. 如果 recvq 為空,,並且buf沒有滿,則將資料放入到 buf中; 3. 如果 buf已滿,則將要傳送的資料和當前 goroutine 打包成sudog,然後入隊到sendq佇列中,並將當前 goroutine 置為 waiting 狀態進行阻塞。 ### 接收資料 從chan獲取資料實現函式為 chanrecv。下面我們看一下程式碼實現: ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { // 如果 c 為空且是非阻塞呼叫,那麼直接返回 (false,false) if !block { return } // 阻塞呼叫直接等待 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 對於非阻塞的情況,並且沒有關閉的情況 // 如果是無緩衝chan或者是chan中沒有資料,那麼直接返回 (false,false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz >
0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } // 上鎖 lock(&c.lock) // 如果已經關閉,並且chan中沒有資料,返回 (true,false) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... } ``` chanrecv方法和chansend方法是一樣的,首先也是做非空判斷,如果chan沒有初始化,那麼如果是非阻塞呼叫,那麼直接返回 (false,false),阻塞呼叫會直接等待; 下面的兩個if判斷我放在一起來進行講解,因為這裡和chansend是不一樣的,chanrecv要根據不同條件需要返回不同的結果。 在上鎖之前的判斷是邊界條件的判斷:如果是非阻塞呼叫會判斷chan沒有傳送方(dataqsiz為空且傳送佇列為空),或chan的緩衝為空(dataqsiz>
0 並且qcount==0)並且chan是沒有close,那麼需要返回 (false,false);而chan已經關閉了,並且buf中沒有資料,需要返回 (true,false); 為了實現這個需求,所以在chanrecv方法裡面邊界條件的判斷都使用atomic方法進行了獲取。 因為需要正確的得到chan已關閉,並且 buf 空會返回 (true, false),而不是 (false,false),所以在lock上鎖之前需要使用atomic來獲取引數防止重排序(Happens Before),因此必須使此處的 qcount 和 closed 的讀取操作的順序通過原子操作得到**順序保障**。 ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 從傳送者佇列獲取資料 if sg := c.sendq.dequeue(); sg != nil { // 傳送者佇列不為空,直接從傳送者那裡提取資料 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 如果是無緩衝區chan if c.dataqsiz == 0 { ... if ep != nil { // 直接從傳送者拷貝資料 recvDirect(c.elemtype, sg, ep) } // 有緩衝區chan } else { // 獲取buf的存放資料指標 qp := chanbuf(c, c.recvx) ... // 直接從緩衝區拷貝資料給接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 從傳送者拷貝資料到緩衝區 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 將傳送者喚醒 goready(gp, skip+1) } ``` 在這裡如果有傳送者在佇列等待,那麼直接從傳送者那裡提取資料,並且喚醒這個傳送者。需要注意的是由於有傳送者在等待,**所以如果有緩衝區,那麼緩衝區一定是滿的**。 在喚醒傳送者之前需要對緩衝區做判斷,如果是無緩衝區,那麼直接從傳送者那裡提取資料;如果有緩衝區首先會獲取recvx的指標,然後將從緩衝區拷貝資料給接收者,再將傳送者資料拷貝到緩衝區。 然後將recvx加1,相當於將新的資料移到了隊尾,再將recvx的值賦值給sendx,最後呼叫goready將傳送者喚醒,這裡有些繞,我們通過圖片來展示: 這裡展示的是在chansend中將資料拷貝到緩衝區中,當資料滿的時候會將sendx的指標置為0,所以當buf環形佇列是滿的時候sendx等於recvx。 然後再來看看chanrecv中傳送者佇列有資料的時候移交緩衝區的資料是怎麼做的: 這裡會將recvx為0處的資料直接從快取區拷貝資料給接收者,然後將傳送者拷貝資料到緩衝區recvx指標處,然後將recvx指標加1並將recvx賦值給sendx,由於是滿的所以用recvx加1的效果實現了將新加入的資料入庫到隊尾的操作。 接著往下看: ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 如果緩衝區中有資料 if c.qcount > 0 { qp := chanbuf(c, c.recvx) ... // 從緩衝區複製資料到 ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 接收資料的指標前移 c.recvx++ // 環形佇列,如果到了末尾,再從0開始 if c.recvx == c.dataqsiz { c.recvx = 0 } // 緩衝區中現存資料減一 c.qcount-- unlock(&c.lock) return true, true } ... } ``` 到了這裡,說明緩衝區中有資料,但是傳送者佇列沒有資料,那麼將資料拷貝到接收資料的協程,然後將接收資料的指標前移,如果已經到了隊尾,那麼就從0開始,最後將緩衝區中現存資料減一併解鎖。 下面就是緩衝區中沒有資料的情況: ```go func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 非阻塞,直接返回 if !block { unlock(&c.lock) return false, false } // 建立sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 將sudog新增到接收佇列中 c.recvq.enqueue(mysg) // 阻塞住goroutine,等待被喚醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... } ``` 如果是非阻塞呼叫,直接返回;阻塞呼叫會將當前goroutine 封裝成sudog,然後將sudog新增到接收佇列中,呼叫gopark阻塞住goroutine,等待被喚醒。 ### 關閉通道 關閉通道會呼叫到closechan方法: ```go func closechan(c *hchan) { // 1. 校驗chan是否已初始化 if c == nil { panic(plainError("close of nil channel")) } // 加鎖 lock(&c.lock) // 如果已關閉了,那麼不能被再次關閉 if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ... // 設定chan已關閉 c.closed = 1 // 申明一個存放g的list,用於存放在等待佇列中的groutine var glist gList // 2. 獲取所有接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入佇列中 glist.push(gp) } // 獲取所有傳送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入佇列中 glist.push(gp) } unlock(&c.lock) // 3.喚醒所有的glist中的goroutine for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } } ``` 1. 這個方法首先會校驗chan是否已被初始化,然後加鎖之後再校驗是否已被關閉過,如果校驗都通過了,那麼將closed欄位設值為1; 2. 遍歷所有的接收者和傳送者,並將其goroutine 加入到glist中; 3. 將所有glist中的goroutine加入排程佇列,等待被喚醒,這裡需要注意的是傳送者在被喚醒之後會panic; ## 總結 chan在go中是一個非常強大的工具,使用它可以實現很多功能,但是為了能夠高效的使用它我們也應該去了解裡面是如何實現的。這篇文章通過一步步分析從零開始瞭解go的chan是如何實現的,以及在使用過程中有什麼需要注意的事項,chan的buf環形佇列是怎樣維護的,希望能對你有所幫助~ ## Reference https://speakerdeck.com/kavya719/understanding-channels https://golang.org/ref/mem https://github.com/talkgo/night/issues/450 https://codeburst.io/diving-deep-into-the-golang-channels-549f