1. 程式人生 > >【我的區塊鏈之路】- golang原始碼分析之select的實現

【我的區塊鏈之路】- golang原始碼分析之select的實現

最近本人再找工作,恩,雖然本人使用go有2年左右了,但是其實還只是停留在語言使用的技巧位面,語言的很多底層實現機制還不是很清楚的,所以面試被問到很多底層,就很懵逼。這篇文章主要是自己對go學習的筆記。(本人還是一隻菜雞,各位海涵)

文章參考:

那麼select的實現在go的原始碼包runtime中,路徑為:./src/runtime/select.go。下面,我們先來看select的使用方式:

select {
  case c1 <-1: 
    // TODO somethings
  case <-c2: 
    // TODO somethings
  default: 
    // TODO somethings
}

其實上述程式碼真正做了下面幾件事:

建立select  –>  註冊case  –>  執行select  –>  釋放select

然後,我們先說一說,select主要是用到的函式:

runtime.reflect_rselect()
runtime.newselect()
runtime.selectsend()
runtime.selectrecv()
runtime.selectdefault()
runtime.selectgo()

先來看定義的幾個常量:

const (
	// scase.kind
	caseNil = iota  // 0 :表示case 為nil;在send 或者 recv 發生在一個 nil channel 上,就有可能出現這種情況
	caseRecv        // 1 : 表示case 為接收通道 <- ch
	caseSend        // 2 :表示case 為傳送通道 ch <-
	caseDefault     // 3 :表示 default 語句塊
)

然後接著來看幾個重要的結構體:

/**
定義select 結構
*/
type hselect struct {
	tcase     uint16   // total count of scase[]      總的case數目
	ncase     uint16   // currently filled scase[]    目前已經註冊的case數目     
	pollorder *uint16  // case poll order             【超重要】 輪詢的case序號
	lockorder *uint16  // channel lock order          【超重要】chan的鎖定順序

    // case 陣列,為了節省一個指標的 8 個位元組搞成這樣的結構
    // 實際上要訪問後面的值,還是需要進行指標移動
    // 指標移動使用 runtime 內部的 add 函式
	scase     [1]scase // one per case (in order of appearance)  【超重要】儲存當前case操作的chan (按照輪詢順序)
}



/**
select 中每一個case的定義
*/
type scase struct {
	elem        unsafe.Pointer // data element                           資料指標
	c           *hchan         // chan                                   當前case所對應的chan引用
	pc          uintptr        // return pc (for race detector / msan)   和彙編中的pc同義,表示 程式計數器,用於指示當前將要執行的下一條機器指令的記憶體地址
	kind        uint16         // 通道的型別
	receivedp   *bool // pointer to received bool, if any
	releasetime int64
}

那麼具體是什麼時候去做上面我們所說的:建立select  –>  註冊case  –>  執行select  –>  釋放select 我們會看到下面

//go:linkname reflect_rselect reflect.rselect
func reflect_rselect(cases []runtimeSelect) (chosen int, recvOK bool) {
	// flagNoScan is safe here, because all objects are also referenced from cases.
	size := selectsize(uintptr(len(cases)))
	sel := (*hselect)(mallocgc(size, nil, true))
	newselect(sel, int64(size), int32(len(cases)))
	r := new(bool)
	for i := range cases {
		rc := &cases[i]
		switch rc.dir {
		case selectDefault:
			selectdefault(sel)
		case selectSend:
			selectsend(sel, rc.ch, rc.val)
		case selectRecv:
			selectrecv(sel, rc.ch, rc.val, r)
		}
	}

	chosen = selectgo(sel)
	recvOK = *r
	return
}

可以看得出來,其實該函式的真正外部觸發是在 reflect包中的rselct() 函式【具體為什麼請自行研究下 //go:linkname  的使用】。而當前這個函式呢其實是先例項化一個 select 例項,然後根據 入參的 cases 逐個的去註冊對應的 chan操作,最後呼叫 selectgo(sel) 去執行case。下面我們再來看看例項化select:

例項化select:

func newselect(sel *hselect, selsize int64, size int32) {
	if selsize != int64(selectsize(uintptr(size))) {
		print("runtime: bad select size ", selsize, ", want ", selectsize(uintptr(size)), "\n")
		throw("bad select size")
	}
	sel.tcase = uint16(size)
	sel.ncase = 0

    // hselect 這個結構體的是長下面這樣:
    // 【超重要】 header |scase[0]|scase[1]|scase[2]|lockorder[0]|lockorder[1]|lockorder[2]|pollorder[0]|pollorder[1]|pollorder[2]
    // 各個 channel 加鎖的順序

	sel.lockorder = (*uint16)(add(unsafe.Pointer(&sel.scase), uintptr(size)*unsafe.Sizeof(hselect{}.scase[0])))

    // case 輪詢的順序初始化
	sel.pollorder = (*uint16)(add(unsafe.Pointer(sel.lockorder), uintptr(size)*unsafe.Sizeof(*hselect{}.lockorder)))

	if debugSelect {
		print("newselect s=", sel, " size=", size, "\n")
	}
}

可以看出來,newselect就是在記憶體上建立一個選擇器。其實就是初始化了某些記憶體空間。然後緊接著就是遍歷所有的case通過判斷反射得到的case中的型別去註冊相應的註冊函式。

具體的我們再來看看hselect,hselect的最後是一個[1]scase表示select中只儲存了一個case的空間,說明hselect在記憶體只是個頭部,select後面儲存了所有的scase,這段Scases的大小就是tcase。在 go runtime實現中經常看到這種 頭部+連續記憶體 的方式

此圖就是整個具備 6 個case 的選擇器的記憶體模型了,這一整塊記憶體結構其實也是由 頭部結構資料結構  組成,頭部就是 Select那一部分,對應上面提到的 hselect資料結構就是 由陣列組成

可以看到 lockorder、pollorder以及scase 【黑色部分】都是6個單元的陣列,黑色部分的scase的第一個單元位於Select頭部結構記憶體空間中,這個單元就是 hselect結構體中的 [1]scase的內容了。在開闢上圖的這塊記憶體時,從頭部開始這一整塊記憶體是由一次類malloc(為什麼是類malloc,因為Go有自己的記憶體管理介面,不是採用的普通malloc)呼叫分配的,然後再將Select頭部結構中的lockorder和pollorder兩個指標分別指向正確的位置即可。當然,在一口氣分配這塊記憶體前,是事先算好了所有需要的記憶體的大小的。在malloc分配這塊記憶體的時候,scase就只需要分配一個單元就可以了,所以上圖中可以看出只是多加了5個scase的 儲存單元。這樣一來,scase欄位和lockorder、pollorder就沒有什麼兩樣了,殊途同歸。然後這三者的內容的填充都是在註冊case中完成。

好了,現在我們知道其實在go內部是用了連續記憶體的方式來儲存select及case中的內容

註冊case:

select中註冊case channel有三種,分別是:selectsendselectrecvselectdefault 分別對應著不同的case。他們的註冊方式一致,都是 ncase+1 (記錄目前已經註冊的case數目),然後按照當前的 index 填充scases域的scase陣列的相關欄位,主要是用case中的chan和case型別填充c和kind欄位。程式碼如下:

/**
主要針對 case send 通道
如:
select {
  case ch<-1: 
}
*/
func selectsend(sel *hselect, c *hchan, elem unsafe.Pointer) {
	pc := getcallerpc(unsafe.Pointer(&sel))
	i := sel.ncase
	if i >= sel.tcase {
		throw("selectsend: too many cases")
	}
    
    // 註冊數量加一
	sel.ncase = i + 1
	if c == nil {
		return
	}

     // 初始化對應的 scase 結構
	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
	cas.pc = pc
	cas.c = c
	cas.kind = caseSend
	cas.elem = elem

	if debugSelect {
		print("selectsend s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
	}
}




/**
主要針對 case recv 通道
如:
select {
    case <-ch: ==> 這時候就會呼叫 selectrecv; case ,ok <- ch: 也可以這樣寫
}

在 ch 被關閉時,這個 case 每次都可能被輪詢到
*/
func selectrecv(sel *hselect, c *hchan, elem unsafe.Pointer, received *bool) {
	pc := getcallerpc(unsafe.Pointer(&sel))
	i := sel.ncase
	if i >= sel.tcase {
		throw("selectrecv: too many cases")
	}
    
    // 註冊數量加一
	sel.ncase = i + 1
	if c == nil {
		return
	}

    // 初始化對應的 scase 結構
	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
	cas.pc = pc
	cas.c = c
	cas.kind = caseRecv
	cas.elem = elem
	cas.receivedp = received

	if debugSelect {
		print("selectrecv s=", sel, " pc=", hex(cas.pc), " chan=", cas.c, "\n")
	}
}



/**
主要針對 default
如:
select {
    default:
}
*/
func selectdefault(sel *hselect) {
	pc := getcallerpc(unsafe.Pointer(&sel))
	i := sel.ncase
	if i >= sel.tcase {
		throw("selectdefault: too many cases")
	}
    
    // 註冊數量加一
	sel.ncase = i + 1
    
    // 初始化對應的 scase 結構
	cas := (*scase)(add(unsafe.Pointer(&sel.scase), uintptr(i)*unsafe.Sizeof(sel.scase[0])))
	cas.pc = pc
	cas.c = nil
	cas.kind = caseDefault

	if debugSelect {
		print("selectdefault s=", sel, " pc=", hex(cas.pc), "\n")
	}
}

select的執行:

pollorder:儲存的是scase的序號,亂序是為了之後執行時的隨機性。

lockorder:儲存了所有case中channel的地址,這裡按照地址大小堆排了一下lockorder對應的這片連續記憶體。對chan排序是為了去重,保證之後對所有channel上鎖時不會重複上鎖

select 語句執行時會對整個chanel加鎖

select 語句會建立select物件 如果放在for迴圈中長期執行可能會頻繁的分配記憶體

select執行過程總結如下:

  • 通過pollorder的序號,遍歷scase找出已經準備好的case。如果有就執行普通的chan讀寫操作。其中準備好的case是指 可以不阻塞完成讀寫chan的case,或者讀已經關閉的chan的case
  • 如果沒有準備好的case,則嘗試defualt case。
  • 如果以上都沒有,則把當前的 G (協程)封裝好掛到scase所有chan的阻塞連結串列中,按照chan的操作型別掛到sendq或recvq中。
  • 這個 G 被某個chan喚醒,遍歷scase找到目標case,放棄當前G在其他chan中的等待,返回。

在說執行之前我們先來看一下加鎖解鎖:


// 對所有 case 對應的 channel 加鎖
// 需要按照 lockorder 陣列中的元素索引來搞
// 否則可能有迴圈等待的死鎖
func sellock(scases []scase, lockorder []uint16) {
	var c *hchan
	for _, o := range lockorder {
		c0 := scases[o].c
		if c0 != nil && c0 != c {
			c = c0
            
            // 其實也是用對應的hchan中的mutex去對當前hchan操作
            // hchan 被定義在 ./src/runtime/chan.go 中,就是channel的定義
			lock(&c.lock)
		}
	}
}


// 解鎖,比較簡單
func selunlock(scases []scase, lockorder []uint16) {
	// We must be very careful here to not touch sel after we have unlocked
	// the last lock, because sel can be freed right after the last unlock.
	// Consider the following situation.
	// First M calls runtime·park() in runtime·selectgo() passing the sel.
	// Once runtime·park() has unlocked the last lock, another M makes
	// the G that calls select runnable again and schedules it for execution.
	// When the G runs on another M, it locks all the locks and frees sel.
	// Now if the first M touches sel, it will access freed memory.

    /**
        我們必須非常小心,在我們解鎖最後一個鎖之後不要觸控sel,因為sel可以在最後一次解鎖後立即釋放。
        考慮以下情況:
        第一個M在執行時呼叫runtime·park()·selectgo()傳遞sel。
        一旦runtime·park()解鎖了最後一個鎖,另一個M使得再次呼叫select runnable的G並安排它執行。
        當G在另一個M上執行時,它會鎖定所有鎖並釋放sel。
        現在,如果第一個M接觸sel,它將訪問釋放的記憶體。
    */
	for i := len(scases) - 1; i >= 0; i-- {
		c := scases[lockorder[i]].c
		if c == nil {
			break
		}
		if i > 0 && c == scases[lockorder[i-1]].c {
			continue // will unlock it on the next iteration
		}
            // 其實也是用對應的hchan中的mutex去對當前hchan操作
            // hchan 被定義在 ./src/runtime/chan.go 中,就是channel的定義
		unlock(&c.lock)
	}
}



/**
和 gopark 相關的
*/
func selparkcommit(gp *g, _ unsafe.Pointer) bool {
	// This must not access gp's stack (see gopark). In
	// particular, it must not access the *hselect. That's okay,
	// because by the time this is called, gp.waiting has all
	// channels in lock order.

        /**
            這不能訪問gp的堆疊(參見gopark)。 特別是,它不能訪問* hselect。 沒關係,因為在呼叫它的時候,gp.waiting的所有通道都處於鎖定狀態。
        */
	var lastc *hchan
	for sg := gp.waiting; sg != nil; sg = sg.waitlink {
		if sg.c != lastc && lastc != nil {
			// As soon as we unlock the channel, fields in
			// any sudog with that channel may change,
			// including c and waitlink. Since multiple
			// sudogs may have the same channel, we unlock
			// only after we've passed the last instance
			// of a channel.

            /**
                一旦我們解鎖頻道,任何具有該頻道的sudog中的欄位都可能發生變化,包括c和     waitlink。 由於多個sudog可能具有相同的通道,因此只有在我們通過了通道的最後一個例項後才會解鎖。
            */
			unlock(&lastc.lock)
		}
		lastc = sg.c
	}
	if lastc != nil {
		unlock(&lastc.lock)
	}
	return true
}

下面我們來說一說,如何執行select吧,具體程式碼如下:


// selectgo 是在初始化完成之後執行 select 邏輯的函式
// 返回值是要執行的 scase 的 index
func selectgo(sel *hselect) int {
	if debugSelect {
		print("select: sel=", sel, "\n")
	}
	if sel.ncase != sel.tcase {
		throw("selectgo: case count mismatch")
	}
    

         // len 和 cap 就是 scase 的總數
        // 這時候 ncase 和 tcase 已經是相等的了
	scaseslice := slice{unsafe.Pointer(&sel.scase), int(sel.ncase), int(sel.ncase)}
	scases := *(*[]scase)(unsafe.Pointer(&scaseslice))

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
		for i := 0; i < int(sel.ncase); i++ {
			scases[i].releasetime = -1
		}
	}

	// The compiler rewrites selects that statically have
	// only 0 or 1 cases plus default into simpler constructs.
	// The only way we can end up with such small sel.ncase
	// values here is for a larger select in which most channels
	// have been nilled out. The general code handles those
	// cases correctly, and they are rare enough not to bother
	// optimizing (and needing to test).
    
    /**
        編譯器將靜態只有0或1個案例的選擇重寫為更簡單的構造。 我們在這裡得到如此小的sel.ncase值的唯一方法是選擇大多數通道已被填充的更大選擇。 通用程式碼正確處理這些情況,並且它們非常罕見,不會打擾優化(並且需要測試)。
    */

	// generate permuted order
    // 生成置換順序
	pollslice := slice{unsafe.Pointer(sel.pollorder), int(sel.ncase), int(sel.ncase)}
	pollorder := *(*[]uint16)(unsafe.Pointer(&pollslice))

    /**
    洗牌
    */
	for i := 1; i < int(sel.ncase); i++ {
		j := fastrandn(uint32(i + 1))
		pollorder[i] = pollorder[j]
		pollorder[j] = uint16(i)
	}

	// sort the cases by Hchan address to get the locking order.
	// simple heap sort, to guarantee n log n time and constant stack footprint.
    
    /**
        通過Hchan地址對案例進行排序以獲得鎖定順序。
簡單的堆排序,以保證n log n時間和不斷的堆疊佔用。
    */
    // 按 hchan 的地址來進行排序,以生成加鎖順序
    // 用堆排序來保證 nLog(n) 的時間複雜度
	lockslice := slice{unsafe.Pointer(sel.lockorder), int(sel.ncase), int(sel.ncase)}
	lockorder := *(*[]uint16)(unsafe.Pointer(&lockslice))
	for i := 0; i < int(sel.ncase); i++ {
        // 初始化 lockorder 陣列
		j := i
		// Start with the pollorder to permute cases on the same channel.
        // 從pollorder開始,在同一頻道上置換案例。
		c := scases[pollorder[i]].c
		for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
			k := (j - 1) / 2
			lockorder[j] = lockorder[k]
			j = k
		}
		lockorder[j] = pollorder[i]
	}
	for i := int(sel.ncase) - 1; i >= 0; i-- {
		o := lockorder[i]
		c := scases[o].c
		lockorder[i] = lockorder[0]
		j := 0
		for {
			k := j*2 + 1
			if k >= i {
				break
			}
			if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
				k++
			}
			if c.sortkey() < scases[lockorder[k]].c.sortkey() {
				lockorder[j] = lockorder[k]
				j = k
				continue
			}
			break
		}
		lockorder[j] = o
	}
	/*
		for i := 0; i+1 < int(sel.ncase); i++ {
			if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
				print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
				throw("select: broken sort")
			}
		}
	*/

	// lock all the channels involved in the select
    // 鎖定選擇中涉及的所有通道
    // 對涉及到的所有 channel 都加鎖
	sellock(scases, lockorder)

	var (
		gp     *g
		done   uint32
		sg     *sudog
		c      *hchan
		k      *scase
		sglist *sudog
		sgnext *sudog
		qp     unsafe.Pointer
		nextp  **sudog
	)

    /**
    走到這裡,說明各種準備工作已經完成,要開始真正幹活了
    */    

loop:
	// pass 1 - look for something already waiting
    // 第一部分: 尋找已經等待的東西
	var dfli int
	var dfl *scase
	var casi int
	var cas *scase

    // 雖然看著是一個 for 迴圈
    // 但實際上有 case ready 的時候
    // 直接就用 goto 跳出了
	for i := 0; i < int(sel.ncase); i++ {
         // 按 pollorder 的順序進行遍歷
		casi = int(pollorder[i])
		cas = &scases[casi]
		c = cas.c

		switch cas.kind {
		case caseNil:
			continue

		case caseRecv:
             // <- ch 的情況
             // 根據有沒有等待的 goroutine 佇列執行不同的操作
			sg = c.sendq.dequeue()
			if sg != nil {
				goto recv
			}
			if c.qcount > 0 {
				goto bufrecv
			}
			if c.closed != 0 {
				goto rclose
			}

		case caseSend:
            
            // ch <- 1 的情況,也是一些基本的 channel 操作
			if raceenabled {
				racereadpc(unsafe.Pointer(c), cas.pc, chansendpc)
			}
			if c.closed != 0 {
				goto sclose
			}
			sg = c.recvq.dequeue()
			if sg != nil {
				goto send
			}
			if c.qcount < c.dataqsiz {
				goto bufsend
			}

		case caseDefault:
			dfli = casi
			dfl = cas
		}
	}
    
    // 這裡是在前面進了 caseDefault 才會走到
	if dfl != nil {
		selunlock(scases, lockorder)
		casi = dfli
		cas = dfl
		goto retc
	}

	// pass 2 - enqueue on all chans
    // 第二部分:在所有的chan上排隊
     // 沒有任何一個 case 滿足,且沒有 default
     // 這種情況下需要把當前的 goroutine 入隊所有 channel 的等待佇列裡
	gp = getg()
	done = 0
	if gp.waiting != nil {
		throw("gp.waiting != nil")
	}
	nextp = &gp.waiting
    // 按照加鎖的順序把 gorutine 入每一個 channel 的等待佇列
	for _, casei := range lockorder {
		casi = int(casei)
		cas = &scases[casi]
		if cas.kind == caseNil {
			continue
		}
		c = cas.c
		sg := acquireSudog()
		sg.g = gp
		// Note: selectdone is adjusted for stack copies in stack1.go:adjustsudogs
        // selectdone在stack1.go:adjustsudogs中針對堆疊副本進行了調整
		sg.selectdone = (*uint32)(noescape(unsafe.Pointer(&done)))
		// No stack splits between assigning elem and enqueuing
		// sg on gp.waiting where copystack can find it.
        // 在gp.waiting上分配elem和排隊sg之間沒有堆疊分割,copystack可以在其中找到它。

		sg.elem = cas.elem
		sg.releasetime = 0
		if t0 != 0 {
			sg.releasetime = -1
		}
		sg.c = c
		// Construct waiting list in lock order.
        // 按鎖定順序構造等待列表。
		*nextp = sg
		nextp = &sg.waitlink

		switch cas.kind {
		case caseRecv:
            // recv 的情況進 recvq
			c.recvq.enqueue(sg)

		case caseSend:
             // send 的情況進 sendq
			c.sendq.enqueue(sg)
		}
	}

	// wait for someone to wake us up
    // 等待有人叫醒我們
	gp.param = nil
    // 當前 goroutine 進入休眠,同時解鎖channel,等待被喚醒(selparkcommit這裡實現的)
	gopark(selparkcommit, nil, "select", traceEvGoBlockSelect, 1)

	// While we were asleep, some goroutine came along and completed
	// one of the cases in the select and woke us up (called ready).
	// As part of that process, the goroutine did a cas on done above
	// (aka *sg.selectdone for all queued sg) to win the right to
	// complete the select. Now done = 1.
	//
	// If we copy (grow) our own stack, we will update the
	// selectdone pointers inside the gp.waiting sudog list to point
	// at the new stack. Another goroutine attempting to
	// complete one of our (still linked in) select cases might
	// see the new selectdone pointer (pointing at the new stack)
	// before the new stack has real data; if the new stack has done = 0
	// (before the old values are copied over), the goroutine might
	// do a cas via sg.selectdone and incorrectly believe that it has
	// won the right to complete the select, executing a second
	// communication and attempting to wake us (call ready) again.
	//
	// Then things break.
	//
	// The best break is that the goroutine doing ready sees the
	// _Gcopystack status and throws, as in #17007.
	// A worse break would be for us to continue on, start running real code,
	// block in a semaphore acquisition (sema.go), and have the other
	// goroutine wake us up without having really acquired the semaphore.
	// That would result in the goroutine spuriously running and then
	// queue up another spurious wakeup when the semaphore really is ready.
	// In general the situation can cascade until something notices the
	// problem and causes a crash.
	//
	// A stack shrink does not have this problem, because it locks
	// all the channels that are involved first, blocking out the
	// possibility of a cas on selectdone.
	//
	// A stack growth before gopark above does not have this
	// problem, because we hold those channel locks (released by
	// selparkcommit).
	//
	// A stack growth after sellock below does not have this
	// problem, because again we hold those channel locks.
	//
	// The only problem is a stack growth during sellock.
	// To keep that from happening, run sellock on the system stack.
	//
	// It might be that we could avoid this if copystack copied the
	// stack before calling adjustsudogs. In that case,
	// syncadjustsudogs would need to recopy the tiny part that
	// it copies today, resulting in a little bit of extra copying.
	//
	// An even better fix, not for the week before a release candidate,
	// would be to put space in every sudog and make selectdone
	// point at (say) the space in the first sudog.
    
    // 有故事發生,被喚醒,再次該select下全部channel加鎖
	systemstack(func() {
		sellock(scases, lockorder)
	})

	sg = (*sudog)(gp.param)
	gp.param = nil

	// pass 3 - dequeue from unsuccessful chans
	// otherwise they stack up on quiet channels
	// record the successful case, if any.
	// We singly-linked up the SudoGs in lock order.
    // 被喚醒後執行下面的程式碼
    
    /**
    從不成功的chans出發,否則他們在安靜的頻道上疊加記錄成功案例,如果有的話。 我們以鎖定順序單獨連結SudoGs。
    */
	casi = -1
	cas = nil
	sglist = gp.waiting
	// Clear all elem before unlinking from gp.waiting.
    // 在從gp.waiting取消連結之前清除所有元素。
	for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
		sg1.selectdone = nil
		sg1.elem = nil
		sg1.c = nil
	}
	gp.waiting = nil

	for _, casei := range lockorder {
		k = &scases[casei]
		if k.kind == caseNil {
			continue
		}
		if sglist.releasetime > 0 {
			k.releasetime = sglist.releasetime
		}
		if sg == sglist {
			// sg has already been dequeued by the G that woke us up.
			casi = int(casei)
			cas = k
		} else {
			c = k.c
			if k.kind == caseSend {
				c.sendq.dequeueSudoG(sglist)
			} else {
				c.recvq.dequeueSudoG(sglist)
			}
		}
		sgnext = sglist.waitlink
		sglist.waitlink = nil
		releaseSudog(sglist)
		sglist = sgnext
	}
        

    // 如果還是沒有的話再次走 loop 邏輯
	if cas == nil {
		// We can wake up with gp.param == nil (so cas == nil)
		// when a channel involved in the select has been closed.
		// It is easiest to loop and re-run the operation;
		// we'll see that it's now closed.
		// Maybe some day we can signal the close explicitly,
		// but we'd have to distinguish close-on-reader from close-on-writer.
		// It's easiest not to duplicate the code and just recheck above.
		// We know that something closed, and things never un-close,
		// so we won't block again.
		goto loop
	}

	c = cas.c

	if debugSelect {
		print("wait-return: sel=", sel, " c=", c, " cas=", cas, " kind=", cas.kind, "\n")
	}

	if cas.kind == caseRecv {
		if cas.receivedp != nil {
			*cas.receivedp = true
		}
	}

	if raceenabled {
		if cas.kind == caseRecv && cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
		} else if cas.kind == caseSend {
			raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
		}
	}
	if msanenabled {
		if cas.kind == caseRecv && cas.elem != nil {
			msanwrite(cas.elem, c.elemtype.size)
		} else if cas.kind == caseSend {
			msanread(cas.elem, c.elemtype.size)
		}
	}

	selunlock(scases, lockorder)
	goto retc

bufrecv:
	// can receive from buffer
	if raceenabled {
		if cas.elem != nil {
			raceWriteObjectPC(c.elemtype, cas.elem, cas.pc, chanrecvpc)
		}
		raceacquire(chanbuf(c, c.recvx))
		racerelease(chanbuf(c, c.recvx))
	}
	if msanenabled && cas.elem != nil {
		msanwrite(cas.elem, c.elemtype.size)
	}
	if cas.receivedp != nil {
		*cas.receivedp = true
	}
	qp = chanbuf(c, c.recvx)
	if cas.elem != nil {
		typedmemmove(c.elemtype, cas.elem, qp)
	}
	typedmemclr(c.elemtype, qp)
	c.recvx++
	if c.recvx == c.dataqsiz {
		c.recvx = 0
	}
	c.qcount--
	selunlock(scases, lockorder)
	goto retc

bufsend:
	// can send to buffer
	if raceenabled {
		raceacquire(chanbuf(c, c.sendx))
		racerelease(chanbuf(c, c.sendx))
		raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
	c.sendx++
	if c.sendx == c.dataqsiz {
		c.sendx = 0
	}
	c.qcount++
	selunlock(scases, lockorder)
	goto retc

recv:
	// can receive from sleeping sender (sg)
	recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncrecv: sel=", sel, " c=", c, "\n")
	}
	if cas.receivedp != nil {
		*cas.receivedp = true
	}
	goto retc

rclose:
	// read at end of closed channel
	selunlock(scases, lockorder)
	if cas.receivedp != nil {
		*cas.receivedp = false
	}
	if cas.elem != nil {
		typedmemclr(c.elemtype, cas.elem)
	}
	if raceenabled {
		raceacquire(unsafe.Pointer(c))
	}
	goto retc

send:
	// can send to a sleeping receiver (sg)
	if raceenabled {
		raceReadObjectPC(c.elemtype, cas.elem, cas.pc, chansendpc)
	}
	if msanenabled {
		msanread(cas.elem, c.elemtype.size)
	}
	send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
	if debugSelect {
		print("syncsend: sel=", sel, " c=", c, "\n")
	}
	goto retc

retc:
	if cas.releasetime > 0 {
		blockevent(cas.releasetime-t0, 1)
	}
	return casi

sclose:
	// send on closed channel
    // 向關閉 channel 傳送資料
    // 直接 panic
	selunlock(scases, lockorder)
	panic(plainError("send on closed channel"))
}

在上述程式碼中具體做了些什麼事呢?下面我們來分析分析。

case洗牌 -> case堆排去重 -> 所有channel 上鎖 ->  遍歷(洗牌後的)case找出第一個可以執行的case -> 如果沒有就執行default -> 如果都沒有就把當前G掛起到所有case 所對應的chan等待喚醒,同時解鎖所有channel ->  被喚醒後再次對所有channel上鎖 -> 最後一次迴圈操作,獲取可執行case,其餘全部出佇列丟棄 -> 沒有的話再次走 loop 邏輯

最後:

  1. select對所有涉及到的channel都加鎖。
  2. 在對所有的channel枷鎖之前,有一個對所有的channel做了一次堆排的動作,目的為了去重channel
  3. select並不是隨機選擇case去執行,而是事先將所有的case進行洗牌,然後再從頭到尾去遍歷,選擇出第一個可以執行的case。
  4. 在for {} 結構中的 select 每一次for 都會經歷上述的 4各階段,建立 -> 註冊 -> 執行 -> 釋放;所以select的執行是有代價的而且代價不低。