「Go」- golang原始碼分析 - channel的底層實現
路徑為:./src/runtime/chan.go 檔案中,先看channel結構體:
type hchan struct { qcountuint// total data in the queue 當前佇列中的資料的個數 dataqsiz uint// size of the circular queuechannel環形佇列的大小 bufunsafe.Pointer // points to an array of dataqsiz elements存放資料的環形佇列的指標 elemsize uint16// channel 中存放的資料型別的大小|即每個元素的大小 closeduint32// channel 是否關閉的標示 elemtype *_type // element type channel中存放的元素的型別 sendxuint// send index當前傳送元素指向channel環形佇列的下標指標 recvxuint// receive index 當前接收元素指向channel環形佇列的下標指標 recvqwaitq// list of recv waiters 等待接收元素的goroutine佇列 sendqwaitq// list of send waiters等待發送元素的goroutine佇列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. // 保持此鎖定時不要更改另一個G的狀態(特別是,沒有準備好G),因為這可能會因堆疊收縮而死鎖。 lock mutex }
以及waitq的結構體:
//等待發送及接收的等待接收元素的goroutine佇列的結構體 type waitq struct { first *sudog last*sudog }
等待發送或接受goroutine連結串列的結構體sudog:
// sudog表示等待連結串列中的g,例如用於傳送/接收在頻道上。 // 一個G可以出現在許多等待列表中,因此一個G有許多sudog;許多G可能在等待相同的結果,同步物件,因此一個物件可能有多個sudog。 // sudog是從一個特殊的池中分配的。使用AcquireDog和 // 釋放sudog來分配和釋放它們。 type sudog struct { // 以下欄位受hchan.lock的保護 g *g // 繫結的goroutine isSelect bool// isSelect的布林值表示該執行緒是否正在進行操作channel next*sudog // 指向下一個等待執行緒的指標地址 prev*sudog // 指向上一個等待執行緒的指標地址 elemunsafe.Pointer // data element (may point to stack) 資料物件(可能指向棧) // 當進行channel的send操作時,elem代表將要儲存進channel的元素 // 當進行channel的recv操作時, elem代表從channel接受的元素 // G1執行ch<-task4的時候,G1會建立一個sudog然後將elem儲存進入sendq佇列 // 從不同場景訪問以下欄位。 // 對於channel,WaitLink只能由G訪問。 // 對於訊號量,所有欄位(包括上面的欄位)只有在持有semaroot鎖時才能訪問。 acquiretime int64 // 獲取時間 releasetime int64 // 釋放時間 ticketuint32 parent*sudog // semaRoot binary tree waitlink*sudog // g.waiting list or semaRoot waittail*sudog // semaRoot c*hchan // channel // 繫結channel }
從以上三個結構體我們即可看出channel其實就是由一個環形陣列實現的佇列用於在確定大小的連續記憶體塊進行資料元素的儲存,用waitq以及連結串列sudog共同實現goroutine的等待佇列,並在每個連結串列元素中儲存待從channel中取出或拷貝進channel的資料元素,可以理解為每個等待執行緒都是channel的搬運工,負責運送資料.
其中hchan中的lock是 recvq 是讀操作阻塞在 channel 的 goroutine 列表,sendq 是寫操作阻塞在 channel 的 goroutine 列表。
qcount 和 dataqsiz 分別描述了該channel的當前使用量和最大容量。
接下來進行channel的每一個函式方法進行分析:
makechan:
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. // 判斷定義的channel儲存的每個元素大小是否在範圍內 if elem.size >= 1<<16 { throw("makechan: invalid channel element type") } if hchanSize%maxAlign != 0 || elem.align > maxAlign { throw("makechan: bad alignment") } // 計算channel所需要分配的記憶體大小 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) // 判斷記憶體大小是否超過限制 if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { // 當計算channel的記憶體大小為0時建立不帶buffer的channel case mem == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() // elem型別非指標 // 當計算channel的記憶體大小為0時建立帶buffer的channel // 分配連續的記憶體 (連續記憶體有利於提高記憶體使用效率) // 直接從棧中分配記憶體 case elem.kind&kindNoPointers != 0: // 分配記憶體 c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // 當channel元素型別包含指標時分配離散的記憶體 default: // Elements contain pointers. c = new(hchan) // 分配記憶體 c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) if debugChan { print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n") } return c }
函式接收兩個引數,一個是channel裡面儲存的元素的資料型別,一個是緩衝的容量(如果為0表示是非緩衝buffer),建立流程如下:
根據傳遞的緩衝大小size是否為零,分別建立不帶buffer的channel或則帶size大小的緩衝channel:
對於不帶緩衝channel,申請一個hchan資料結構的記憶體大小;
對於帶緩衝channel,new一個hchan物件,並初始化buffer記憶體;
對於包含指標帶快取的channel同樣申請一個hchan資料結構的記憶體大小;
以及設定channel的屬性。
帶指標以及不帶指標帶記憶體申請區別可以看記憶體管理相關原始碼。
chanbuf:
//chanbuf(c, i) is pointer to the i'th slot in the buffer. func chanbuf(c *hchan, i uint) unsafe.Pointer { return add(c.buf, uintptr(i)*uintptr(c.elemsize)) }
chanbuf的實現很簡單,主要就是根據下標(sendx或recvx)以及每一個元素的大小還有環形佇列的指標計算出該下標槽點記憶體地址並返回
chansend:
// 通用單通道傳送/接收 // 如果阻塞不是nil,則將不會休眠,但如果無法完成則返回。 // 當睡眠中涉及的通道關閉時,睡眠可以通過g.param == nil喚醒。 最簡單的迴圈和重新執行操作; 我們會 // 看到它現在已經關閉了。 func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 當 channel 未初始化或為 nil 時,向其中傳送資料將會永久阻塞 if c == nil { if !block { return false } // gopark 會使當前 goroutine 休眠,並通過 unlockf 喚醒,但是此時傳入的 unlockf 為 nil, 因此,goroutine 會一直休眠 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } if debugChan { print("chansend: chan=", c, "\n") } // 如果開啟了競爭檢測 if raceenabled { racereadpc(c.raceaddr(), callerpc, funcPC(chansend)) } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not closed, we observe that the channel is // not ready for sending. Each of these observations is a single word-sized read // (first c.closed and second c.recvq.first or c.qcount depending on kind of channel). // Because a closed channel cannot transition from 'ready for sending' to // 'not ready for sending', even if the channel is closed between the two observations, // they imply a moment between the two when the channel was both not yet closed // and not ready for sending. We behave as if we observed the channel at that moment, // and report that the send cannot proceed. // // It is okay if the reads are reordered here: if we observe that the channel is not // ready for sending and then observe that it is not closed, that implies that the // channel wasn't closed during the first observation. if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } var t0 int64 //計時器 if blockprofilerate > 0 { t0 = cputicks() } // 獲取同步鎖 lock(&c.lock) // 向已經關閉的 channel 傳送訊息會產生 panic if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // CASE1: 當有 goroutine 在 recv 佇列上等待時,跳過快取佇列,將訊息直接發給 reciever goroutine // dequeue 從等待接受的執行緒佇列連結串列獲取一個sudog if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } // CASE2: 快取佇列未滿,則將訊息複製到快取佇列上並移動sendx下標 if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } // CASE3: 快取佇列已滿,將goroutine 加入 send 佇列 // 建立 sudo // Block on the channel. Some receiver will complete our operation for us. //獲取當前執行緒並繫結到sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 講當前sudog放入等待發送的執行緒佇列 c.sendq.enqueue(mysg) // 休眠執行緒(即阻塞) // 通過呼叫goready(gp),goroutine可以再次執行。 goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3) // Ensure the value being sent is kept alive until the // receiver copies it out. The sudog has a pointer to the // stack object, but sudogs aren't considered as roots of the // stack tracer. KeepAlive(ep) // someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if gp.param == nil { if c.closed == 0 { throw("chansend: spurious wakeup") } panic(plainError("send on closed channel")) } gp.param = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } mysg.c = nil //釋放sudog releaseSudog(mysg) return true } // send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked.send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
send 有以下四種情況:【都是對不為nil的chan的情況】
1.向已經close的chan寫資料,拋panic。
2.有 goroutine 阻塞在 channel recv 佇列上,此時快取佇列( hchan.buf)為空(即緩衝區內無元素),直接將訊息傳送給 reciever goroutine,只產生一次複製,從當前 channel 的等待佇列中取出等待的 goroutine,然後呼叫 send。goready 負責喚醒 goroutine。
3.當 channel 快取佇列( hchan.buf )有剩餘空間時,將資料放到佇列裡,等待接收,接收後總共產生兩次複製
4.當 channel 快取佇列( hchan.buf )已滿時,將當前 goroutine 加入 send 佇列並阻塞
receive:
// chanrecv receives on channel c and writes the received data to ep. // ep may be nil, in which case received data is ignored. // If block == false and no elements are available, returns (false, false). // Otherwise, if c is closed, zeros *ep and returns (true, false). // Otherwise, fills in *ep with an element and returns (true, true). // A non-nil ep must point to the heap or the caller's stack. func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // raceenabled: don't need to check ep, as it is always on the stack // or is new memory allocated by reflect. if debugChan { print("chanrecv: chan=", c, "\n") } // 從 nil 的 channel 中接收訊息,永久阻塞 if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // Fast path: check for failed non-blocking operation without acquiring the lock. // // After observing that the channel is not ready for receiving, we observe that the // channel is not closed. Each of these observations is a single word-sized read // (first c.sendq.first or c.qcount, and second c.closed). // Because a channel cannot be reopened, the later observation of the channel // being not closed implies that it was also not closed at the moment of the // first observation. We behave as if we observed the channel at that moment // and report that the receive cannot proceed. // // The order of operations is important here: reversing the operations can lead to // incorrect behavior when racing with a close. if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } var t0 int64 if blockprofilerate > 0 { t0 = cputicks() } lock(&c.lock) // CASE1: 從已經 close 且為空的 channel recv 資料,返回空值 if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } // CASE2: send 佇列不為空,直接從channel佇列中獲取 // sg是sends 執行緒佇列 // 從sends 執行緒佇列獲取一個sudog並喚醒讓其將元素推入channel if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } // CASE3: 快取佇列不為空,此時只有可能是快取佇列已滿,從佇列頭取出元素, if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) //移動channel的recvx下標 c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } // CASE4: 快取佇列為空,將 goroutine 加入 recv 佇列,並阻塞 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3) // someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
從程式碼上可以很明顯的看出
receive和send的四種情況相互配合相互對應實現一存一拿的執行順序
close channel 的工作
整個channel的流程結構:

WechatIMG1513.jpeg

15141548494179_.pic_hd.jpg