【我的區塊鏈之路】- golang原始碼分析之select的實現
最近本人再找工作,恩,雖然本人使用go有2年左右了,但是其實還只是停留在語言使用的技巧位面,語言的很多底層實現機制還不是很清楚的,所以面試被問到很多底層,就很懵逼。這篇文章主要是自己對go學習的筆記。(本人還是一隻菜雞,各位海涵)
文章參考:
那麼select的實現在go的原始碼包runtime中,路徑為:./src/runtime/select.go。下面,我們先來看select的使用方式:
select { case c1 <-1: // TODO somethings case <-c2: // TODO somethings default: // TODO somethings }
其實上述程式碼真正做了下面幾件事:
建立select –> 註冊case –> 執行select –> 釋放select
然後,我們先說一說,select主要是用到的函式:
runtime.reflect_rselect()
runtime.newselect()
runtime.selectsend()
runtime.selectrecv()
runtime.selectdefault()
runtime.selectgo()
先來看定義的幾個常量:
const ( // scase.kind caseNil = iota // 0 :表示case 為nil;在send 或者 recv 發生在一個 nil channel 上,就有可能出現這種情況 caseRecv // 1 : 表示case 為接收通道 <- ch caseSend // 2 :表示case 為傳送通道 ch <- caseDefault // 3 :表示 default 語句塊 )
然後接著來看幾個重要的結構體:
/** 定義select 結構 */ type hselect struct { tcase uint16 // total count of scase[] 總的case數目 ncase uint16 // currently filled scase[] 目前已經註冊的case數目 pollorder *uint16 // case poll order 【超重要】 輪詢的case序號 lockorder *uint16 // channel lock order 【超重要】chan的鎖定順序 // case 陣列,為了節省一個指標的 8 個位元組搞成這樣的結構 // 實際上要訪問後面的值,還是需要進行指標移動 // 指標移動使用 runtime 內部的 add 函式 scase [1]scase // one per case (in order of appearance) 【超重要】儲存當前case操作的chan (按照輪詢順序) } /** select 中每一個case的定義 */ type scase struct { elem unsafe.Pointer // data element 資料指標 c *hchan // chan 當前case所對應的chan引用 pc uintptr // return pc (for race detector / msan) 和彙編中的pc同義,表示 程式計數器,用於指示當前將要執行的下一條機器指令的記憶體地址 kind uint16 // 通道的型別 receivedp *bool // pointer to received bool, if any releasetime int64 }
那麼具體是什麼時候去做上面我們所說的:建立select –> 註冊case –> 執行select –> 釋放select ?我們會看到下面
//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) (chosen int, recvOK bool) {
// flagNoScan is safe here, because all objects are also referenced from cases.
size := selectsize(uintptr(len(cases)))
sel := (*hselect)(mallocgc(size, nil, true))
newselect(sel, int64(size), int32(len(cases)))
r := new(bool)
for i := range cases {
rc := &cases[i]
switch rc.dir {
case selectDefault:
selectdefault(sel)
case selectSend:
selectsend(sel, rc.ch, rc.val)
case selectRecv:
selectrecv(sel, rc.ch, rc.val, r)
}
}
chosen = selectgo(sel)
recvOK = *r
return
}
可以看得出來,其實該函式的真正外部觸發是在 reflect包中的rselct() 函式【具體為什麼請自行研究下 //go:linkname 的使用】。而當前這個函式呢其實是先例項化一個 select 例項,然後根據 入參的 cases 逐個的去註冊對應的 chan操作,最後呼叫 selectgo(sel) 去執行case。下面我們再來看看例項化select:
例項化select:
func newselect(sel *hselect, selsize int64, size int32) {
if selsize != int64(selectsize(uintptr(size))) {
print("runtime: bad select size ", selsize, ", want ", selectsize(uintptr(size)), "\n")
throw("bad select size")
}
sel.tcase = uint16(size)
sel.ncase = 0
// hselect 這個結構體的是長下面這樣:
// 【超重要】 header |scase[0]|scase[1]|scase[2]|lockorder[0]|lockorder[1]|lockorder[2]|pollorder[0]|pollorder[1]|pollorder[2]
// 各個 channel 加鎖的順序
sel.lockorder = (*uint16)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0])))
// case 輪詢的順序初始化
sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder)))
if debugSelect {
print("newselect s=", sel, " size=", size, "\n")
}
}
可以看出來,newselect就是在記憶體上建立一個選擇器。其實就是初始化了某些記憶體空間。然後緊接著就是遍歷所有的case通過判斷反射得到的case中的型別去註冊相應的註冊函式。
具體的我們再來看看hselect,hselect的最後是一個[1]scase表示select中只儲存了一個case的空間,說明hselect在記憶體只是個頭部,select後面儲存了所有的scase,這段Scases的大小就是tcase。在 go runtime實現中經常看到這種 頭部+連續記憶體 的方式。
此圖就是整個具備 6 個case 的選擇器的記憶體模型了,這一整塊記憶體結構其實也是由 頭部結構 + 資料結構 組成,頭部就是 Select那一部分,對應上面提到的 hselect,資料結構就是 由陣列組成。
可以看到 lockorder、pollorder以及scase 【黑色部分】都是6個單元的陣列,黑色部分的scase的第一個單元位於Select頭部結構記憶體空間中,這個單元就是 hselect結構體中的 [1]scase的內容了。在開闢上圖的這塊記憶體時,從頭部開始這一整塊記憶體是由一次類malloc(為什麼是類malloc,因為Go有自己的記憶體管理介面,不是採用的普通malloc)呼叫分配的,然後再將Select頭部結構中的lockorder和pollorder兩個指標分別指向正確的位置即可。當然,在一口氣分配這塊記憶體前,是事先算好了所有需要的記憶體的大小的。在malloc分配這塊記憶體的時候,scase就只需要分配一個單元就可以了,所以上圖中可以看出只是多加了5個scase的 儲存單元。這樣一來,scase欄位和lockorder、pollorder就沒有什麼兩樣了,殊途同歸。然後這三者的內容的填充都是在註冊case中完成。
好了,現在我們知道其實在go內部是用了連續記憶體的方式來儲存select及case中的內容。
註冊case:
select中註冊case channel有三種,分別是:selectsend
、selectrecv
、selectdefault
分別對應著不同的case。他們的註冊方式一致,都是 ncase+1 (記錄目前已經註冊的case數目),然後按照當前的 index 填充scases域的scase陣列的相關欄位,主要是用case中的chan和case型別填充c和kind欄位。程式碼如下:
/**
主要針對 case send 通道
如:
select {
case ch<-1:
}
*/
func selectsend(sel *hselect, c *hchan, elem unsafe.Pointer) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectsend: too many cases")
}
// 註冊數量加一
sel.ncase = i + 1
if c == nil {
return
}
// 初始化對應的 scase 結構
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = c
cas.kind = caseSend
cas.elem = elem
if debugSelect {
print("selectsend s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
}
}
/**
主要針對 case recv 通道
如:
select {
case <-ch: ==> 這時候就會呼叫 selectrecv; case ,ok <- ch: 也可以這樣寫
}
在 ch 被關閉時,這個 case 每次都可能被輪詢到
*/
func selectrecv(sel *hselect, c *hchan, elem unsafe.Pointer, received *bool) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectrecv: too many cases")
}
// 註冊數量加一
sel.ncase = i + 1
if c == nil {
return
}
// 初始化對應的 scase 結構
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = c
cas.kind = caseRecv
cas.elem = elem
cas.receivedp = received
if debugSelect {
print("selectrecv s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
}
}
/**
主要針對 default
如:
select {
default:
}
*/
func selectdefault(sel *hselect) {
pc := getcallerpc(unsafe.Pointer(&sel))
i := sel.ncase
if i >= sel.tcase {
throw("selectdefault: too many cases")
}
// 註冊數量加一
sel.ncase = i + 1
// 初始化對應的 scase 結構
cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
cas.pc = pc
cas.c = nil
cas.kind = caseDefault
if debugSelect {
print("selectdefault s=", sel, " pc=", hex(cas.pc), "\n")
}
}
select的執行:
pollorder:儲存的是scase的序號,亂序是為了之後執行時的隨機性。
lockorder:儲存了所有case中channel的地址,這裡按照地址大小堆排了一下lockorder對應的這片連續記憶體。對chan排序是為了去重,保證之後對所有channel上鎖時不會重複上鎖。
select 語句執行時會對整個chanel加鎖
select 語句會建立select物件 如果放在for迴圈中長期執行可能會頻繁的分配記憶體
select執行過程總結如下:
- 通過pollorder的序號,遍歷scase找出已經準備好的case。如果有就執行普通的chan讀寫操作。其中準備好的case是指 可以不阻塞完成讀寫chan的case,或者讀已經關閉的chan的case。
- 如果沒有準備好的case,則嘗試defualt case。
- 如果以上都沒有,則把當前的 G (協程)封裝好掛到scase所有chan的阻塞連結串列中,按照chan的操作型別掛到sendq或recvq中。
- 這個 G 被某個chan喚醒,遍歷scase找到目標case,放棄當前G在其他chan中的等待,返回。
在說執行之前我們先來看一下加鎖解鎖:
// 對所有 case 對應的 channel 加鎖
// 需要按照 lockorder 陣列中的元素索引來搞
// 否則可能有迴圈等待的死鎖
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != nil && c0 != c {
c = c0
// 其實也是用對應的hchan中的mutex去對當前hchan操作
// hchan 被定義在 ./src/runtime/chan.go 中,就是channel的定義
lock(&c.lock)
}
}
}
// 解鎖,比較簡單
func selunlock(scases []scase, lockorder []uint16) {
// We must be very careful here to not touch sel after we have unlocked
// the last lock, because sel can be freed right after the last unlock.
// Consider the following situation.
// First M calls runtime·park() in runtime·selectgo() passing the sel.
// Once runtime·park() has unlocked the last lock, another M makes
// the G that calls select runnable again and schedules it for execution.
// When the G runs on another M, it locks all the locks and frees sel.
// Now if the first M touches sel, it will access freed memory.
/**
我們必須非常小心,在我們解鎖最後一個鎖之後不要觸控sel,因為sel可以在最後一次解鎖後立即釋放。
考慮以下情況:
第一個M在執行時呼叫runtime·park()·selectgo()傳遞sel。
一旦runtime·park()解鎖了最後一個鎖,另一個M使得再次呼叫select runnable的G並安排它執行。
當G在另一個M上執行時,它會鎖定所有鎖並釋放sel。
現在,如果第一個M接觸sel,它將訪問釋放的記憶體。
*/
for i := len(scases) - 1; i >= 0; i-- {
c := scases[lockorder[i]].c
if c == nil {
break
}
if i > 0 && c == scases[lockorder[i-1]].c {
continue // will unlock it on the next iteration
}
// 其實也是用對應的hchan中的mutex去對當前hchan操作
// hchan 被定義在 ./src/runtime/chan.go 中,就是channel的定義
unlock(&c.lock)
}
}
/**
和 gopark 相關的
*/
func selparkcommit(gp *g, _ unsafe.Pointer) bool {
// This must not access gp's stack (see gopark). In
// particular, it must not access the *hselect. That's okay,
// because by the time this is called, gp.waiting has all
// channels in lock order.
/**
這不能訪問gp的堆疊(參見gopark)。 特別是,它不能訪問* hselect。 沒關係,因為在呼叫它的時候,gp.waiting的所有通道都處於鎖定狀態。
*/
var lastc *hchan
for sg := gp.waiting; sg != nil; sg = sg.waitlink {
if sg.c != lastc && lastc != nil {
// As soon as we unlock the channel, fields in
// any sudog with that channel may change,
// including c and waitlink. Since multiple
// sudogs may have the same channel, we unlock
// only after we've passed the last instance
// of a channel.
/**
一旦我們解鎖頻道,任何具有該頻道的sudog中的欄位都可能發生變化,包括c和 waitlink。 由於多個sudog可能具有相同的通道,因此只有在我們通過了通道的最後一個例項後才會解鎖。
*/
unlock(&lastc.lock)
}
lastc = sg.c
}
if lastc != nil {
unlock(&lastc.lock)
}
return true
}
下面我們來說一說,如何執行select吧,具體程式碼如下:
// selectgo 是在初始化完成之後執行 select 邏輯的函式
// 返回值是要執行的 scase 的 index
func selectgo(sel *hselect) int {
if debugSelect {
print("select: sel=", sel, "\n")
}
if sel.ncase != sel.tcase {
throw("selectgo: case count mismatch")
}
// len 和 cap 就是 scase 的總數
// 這時候 ncase 和 tcase 已經是相等的了
scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)}
scases := *(*[]scase)(unsafe.Pointer(&scaseslice))
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
for i := 0; i < int(sel.ncase); i++ {
scases[i].releasetime = -1
}
}
// The compiler rewrites selects that statically have
// only 0 or 1 cases plus default into simpler constructs.
// The only way we can end up with such small sel.ncase
// values here is for a larger select in which most channels
// have been nilled out. The general code handles those
// cases correctly, and they are rare enough not to bother
// optimizing (and needing to test).
/**
編譯器將靜態只有0或1個案例的選擇重寫為更簡單的構造。 我們在這裡得到如此小的sel.ncase值的唯一方法是選擇大多數通道已被填充的更大選擇。 通用程式碼正確處理這些情況,並且它們非常罕見,不會打擾優化(並且需要測試)。
*/
// generate permuted order
// 生成置換順序
pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))
/**
洗牌
*/
for i := 1; i < int(sel.ncase); i++ {
j := fastrandn(uint32(i + 1))
pollorder[i] = pollorder[j]
pollorder[j] = uint16(i)
}
// sort the cases by Hchan address to get the locking order.
// simple heap sort, to guarantee n log n time and constant stack footprint.
/**
通過Hchan地址對案例進行排序以獲得鎖定順序。
簡單的堆排序,以保證n log n時間和不斷的堆疊佔用。
*/
// 按 hchan 的地址來進行排序,以生成加鎖順序
// 用堆排序來保證 nLog(n) 的時間複雜度
lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}
lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))
for i := 0; i < int(sel.ncase); i++ {
// 初始化 lockorder 陣列
j := i
// Start with the pollorder to permute cases on the same channel.
// 從pollorder開始,在同一頻道上置換案例。
c := scases[pollorder[i]].c
for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
k := (j - 1) / 2
lockorder[j] = lockorder[k]
j = k
}
lockorder[j] = pollorder[i]
}
for i := int(sel.ncase) - 1; i >= 0; i-- {
o := lockorder[i]
c := scases[o].c
lockorder[i] = lockorder[0]
j := 0
for {
k := j*2 + 1
if k >= i {
break
}
if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
k++
}
if c.sortkey() < scases[lockorder[k]].c.sortkey() {
lockorder[j] = lockorder[k]
j = k
continue
}
break
}
lockorder[j] = o
}
/*
for i := 0; i+1 < int(sel.ncase); i++ {
if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
throw("select: broken sort")
}
}
*/
// lock all the channels involved in the select
// 鎖定選擇中涉及的所有通道
// 對涉及到的所有 channel 都加鎖
sellock(scases, lockorder)
var (
gp *g
done uint32
sg *sudog
c *hchan
k *scase
sglist *sudog
sgnext *sudog
qp unsafe.Pointer
nextp **sudog
)
/**
走到這裡,說明各種準備工作已經完成,要開始真正幹活了
*/
loop:
// pass 1 - look for something already waiting
// 第一部分: 尋找已經等待的東西
var dfli int
var dfl *scase
var casi int
var cas *scase
// 雖然看著是一個 for 迴圈
// 但實際上有 case ready 的時候
// 直接就用 goto 跳出了
for i := 0; i < int(sel.ncase); i++ {
// 按 pollorder 的順序進行遍歷
casi = int(pollorder[i])
cas = &scases[casi]
c = cas.c
switch cas.kind {
case caseNil:
continue
case caseRecv:
// <- ch 的情況
// 根據有沒有等待的 goroutine 佇列執行不同的操作
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
case caseSend:
// ch <- 1 的情況,也是一些基本的 channel 操作
if raceenabled {
racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
}
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
case caseDefault:
dfli = casi
dfl = cas
}
}
// 這裡是在前面進了 caseDefault 才會走到
if dfl != nil {
selunlock(scases, lockorder)
casi = dfli
cas = dfl
goto retc
}
// pass 2 - enqueue on all chans
// 第二部分:在所有的chan上排隊
// 沒有任何一個 case 滿足,且沒有 default
// 這種情況下需要把當前的 goroutine 入隊所有 channel 的等待佇列裡
gp = getg()
done = 0
if gp.waiting != nil {
throw("gp.waiting != nil")
}
nextp = &gp.waiting
// 按照加鎖的順序把 gorutine 入每一個 channel 的等待佇列
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
if cas.kind == caseNil {
continue
}
c = cas.c
sg := acquireSudog()
sg.g = gp
// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs
// selectdone在stack1.go:adjustsudogs中針對堆疊副本進行了調整
sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))
// No stack splits between assigning elem and enqueuing
// sg on gp.waiting where copystack can find it.
// 在gp.waiting上分配elem和排隊sg之間沒有堆疊分割,copystack可以在其中找到它。
sg.elem = cas.elem
sg.releasetime = 0
if t0 != 0 {
sg.releasetime = -1
}
sg.c = c
// Construct waiting list in lock order.
// 按鎖定順序構造等待列表。
*nextp = sg
nextp = &sg.waitlink
switch cas.kind {
case caseRecv:
// recv 的情況進 recvq
c.recvq.enqueue(sg)
case caseSend:
// send 的情況進 sendq
c.sendq.enqueue(sg)
}
}
// wait for someone to wake us up
// 等待有人叫醒我們
gp.param = nil
// 當前 goroutine 進入休眠,同時解鎖channel,等待被喚醒(selparkcommit這裡實現的)
gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)
// While we were asleep, some goroutine came along and completed
// one of the cases in the select and woke us up (called ready).
// As part of that process, the goroutine did a cas on done above
// (aka *sg.selectdone for all queued sg) to win the right to
// complete the select. Now done = 1.
//
// If we copy (grow) our own stack, we will update the
// selectdone pointers inside the gp.waiting sudog list to point
// at the new stack. Another goroutine attempting to
// complete one of our (still linked in) select cases might
// see the new selectdone pointer (pointing at the new stack)
// before the new stack has real data; if the new stack has done = 0
// (before the old values are copied over), the goroutine might
// do a cas via sg.selectdone and incorrectly believe that it has
// won the right to complete the select, executing a second
// communication and attempting to wake us (call ready) again.
//
// Then things break.
//
// The best break is that the goroutine doing ready sees the
// _Gcopystack status and throws, as in #17007.
// A worse break would be for us to continue on, start running real code,
// block in a semaphore acquisition (sema.go), and have the other
// goroutine wake us up without having really acquired the semaphore.
// That would result in the goroutine spuriously running and then
// queue up another spurious wakeup when the semaphore really is ready.
// In general the situation can cascade until something notices the
// problem and causes a crash.
//
// A stack shrink does not have this problem, because it locks
// all the channels that are involved first, blocking out the
// possibility of a cas on selectdone.
//
// A stack growth before gopark above does not have this
// problem, because we hold those channel locks (released by
// selparkcommit).
//
// A stack growth after sellock below does not have this
// problem, because again we hold those channel locks.
//
// The only problem is a stack growth during sellock.
// To keep that from happening, run sellock on the system stack.
//
// It might be that we could avoid this if copystack copied the
// stack before calling adjustsudogs. In that case,
// syncadjustsudogs would need to recopy the tiny part that
// it copies today, resulting in a little bit of extra copying.
//
// An even better fix, not for the week before a release candidate,
// would be to put space in every sudog and make selectdone
// point at (say) the space in the first sudog.
// 有故事發生,被喚醒,再次該select下全部channel加鎖
systemstack(func() {
sellock(scases, lockorder)
})
sg = (*sudog)(gp.param)
gp.param = nil
// pass 3 - dequeue from unsuccessful chans
// otherwise they stack up on quiet channels
// record the successful case, if any.
// We singly-linked up the SudoGs in lock order.
// 被喚醒後執行下面的程式碼
/**
從不成功的chans出發,否則他們在安靜的頻道上疊加記錄成功案例,如果有的話。 我們以鎖定順序單獨連結SudoGs。
*/
casi = -1
cas = nil
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
// 在從gp.waiting取消連結之前清除所有元素。
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.selectdone = nil
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil
for _, casei := range lockorder {
k = &scases[casei]
if k.kind == caseNil {
continue
}
if sglist.releasetime > 0 {
k.releasetime = sglist.releasetime
}
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
} else {
c = k.c
if k.kind == caseSend {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}
// 如果還是沒有的話再次走 loop 邏輯
if cas == nil {
// We can wake up with gp.param == nil (so cas == nil)
// when a channel involved in the select has been closed.
// It is easiest to loop and re-run the operation;
// we'll see that it's now closed.
// Maybe some day we can signal the close explicitly,
// but we'd have to distinguish close-on-reader from close-on-writer.
// It's easiest not to duplicate the code and just recheck above.
// We know that something closed, and things never un-close,
// so we won't block again.
goto loop
}
c = cas.c
if debugSelect {
print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
}
if cas.kind == caseRecv {
if cas.receivedp != nil {
*cas.receivedp = true
}
}
if raceenabled {
if cas.kind == caseRecv && cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
} else if cas.kind == caseSend {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
}
if msanenabled {
if cas.kind == caseRecv && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
} else if cas.kind == caseSend {
msanread(cas.elem, c.elemtype.size)
}
}
selunlock(scases, lockorder)
goto retc
bufrecv:
// can receive from buffer
if raceenabled {
if cas.elem != nil {
raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
}
raceacquire(chanbuf(c, c.recvx))
racerelease(chanbuf(c, c.recvx))
}
if msanenabled && cas.elem != nil {
msanwrite(cas.elem, c.elemtype.size)
}
if cas.receivedp != nil {
*cas.receivedp = true
}
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc
bufsend:
// can send to buffer
if raceenabled {
raceacquire(chanbuf(c, c.sendx))
racerelease(chanbuf(c, c.sendx))
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncrecv: sel=", sel, " c=", c, "\n")
}
if cas.receivedp != nil {
*cas.receivedp = true
}
goto retc
rclose:
// read at end of closed channel
selunlock(scases, lockorder)
if cas.receivedp != nil {
*cas.receivedp = false
}
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
if raceenabled {
raceacquire(unsafe.Pointer(c))
}
goto retc
send:
// can send to a sleeping receiver (sg)
if raceenabled {
raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
}
if msanenabled {
msanread(cas.elem, c.elemtype.size)
}
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
if debugSelect {
print("syncsend: sel=", sel, " c=", c, "\n")
}
goto retc
retc:
if cas.releasetime > 0 {
blockevent(cas.releasetime-t0, 1)
}
return casi
sclose:
// send on closed channel
// 向關閉 channel 傳送資料
// 直接 panic
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))
}
在上述程式碼中具體做了些什麼事呢?下面我們來分析分析。
case洗牌 -> case堆排去重 -> 所有channel 上鎖 -> 遍歷(洗牌後的)case找出第一個可以執行的case -> 如果沒有就執行default -> 如果都沒有就把當前G掛起到所有case 所對應的chan等待喚醒,同時解鎖所有channel -> 被喚醒後再次對所有channel上鎖 -> 最後一次迴圈操作,獲取可執行case,其餘全部出佇列丟棄 -> 沒有的話再次走 loop 邏輯
最後:
- select對所有涉及到的channel都加鎖。
- 在對所有的channel枷鎖之前,有一個對所有的channel做了一次堆排的動作,目的為了去重channel
- select並不是隨機選擇case去執行,而是事先將所有的case進行洗牌,然後再從頭到尾去遍歷,選擇出第一個可以執行的case。
- 在for {} 結構中的 select 每一次for 都會經歷上述的 4各階段,建立 -> 註冊 -> 執行 -> 釋放;所以select的執行是有代價的而且代價不低。