1. 程式人生 > >深度解密Go語言之channel

深度解密Go語言之channel

目錄

  • 併發模型
    • 併發與並行
    • 什麼是 CSP
  • 什麼是 channel
    • channel 實現 CSP
  • 為什麼要 channel
  • channel 實現原理
    • 資料結構
    • 建立
    • 接收
    • 傳送
    • 關閉
  • channel 進階
    • 傳送和接收元素的本質
    • 資源洩漏
    • happened before
    • 如何優雅地關閉 channel
    • 關閉的 channel 仍能讀出資料
  • channel 應用
    • 停止訊號
    • 任務定時
    • 解耦生產方和消費方
    • 控制併發數
  • 總結
  • 參考資料

大家好!“深度解密 Go 語言”系列好久未見,我們今天講 channel,預祝閱讀愉快!在開始正文之前,我們先說些題外話。

上一篇關於 Go 語言的文章講 Go 程式的整個編碼、編譯、執行、退出的全過程。文章發出後,反響強烈,在各大平臺的閱讀量都不錯。例如部落格園登上了 48 小時閱讀排行榜,並且受到了編輯推薦,佔據首頁頭條位置整整一天;在開發者頭條首頁精選的位置霸榜一週時間……

熟悉碼農桃花源的朋友們都知道,這裡每篇文章都很長,要花很長時間才能讀完。但長並不是目的,把每個問題都講深、講透才是最重要的。首先我自己得完全理解才行,所以寫每篇文章時我都會看很多參考資料,看原始碼,請教大牛,自己還要去寫樣例程式碼跑結果……從建立文稿到真正完成寫作需要很長時間。

做這些事情,無非是想力求我寫出來的文字,都是我目前所能理解的最深層次。如果我暫時理解不了,我會說出來,或者不寫進文章裡面去,留到以後有能力的時候再來寫。

我自己平時有這種體會:看微信公眾號的文章都是想快速地看完,快速地拉到最後,目的快點開始看下一篇,新鮮感才能不斷刺激大腦。有時候碰到長文很花時間,可能就沒耐心看下去了,裡面說的東西也覺得很難理解,可能直接就放棄了。但是,如果我知道一篇文章價值很高,就會選一個精力比較充沛的時間段,花整塊時間看完,這時候反倒很容易看進去。這種情況下,潛意識裡就會知道我今天是一定要讀完這篇文章的,並且要把裡面有價值的東西都吸收進來。

所以,對於碼農桃花源的文章,我建議你收藏之後,找個空閒時間再好好看。

上週,我把 GitHub 專案 Go-Question 的內容整合成了開源電子書,閱讀體驗提升 N 倍,建議關注專案,現在已經 400 star 了,年底目標是 1k star。專案地址列在了參考資料裡。

另外,公眾號的文章也可以使用微信讀書看,體驗也非常贊,並且可以放到書架上,每個公眾號就是一本書,簡直酷炫。

閒話最後,一直“吹”了很久的曹大,新書《Go 語言高階程式設計》出版了!書的另一位作者是柴樹杉老師,這是給 Go 語言提交 pull 的人,他在 Go 語言上面的研究不用我多說了吧。我第一時間下了單,並且到曹大工位要了簽名。

這本書的推薦人有很多大佬,像許世偉,郝林,雨痕等,評價非常高。重點給大家看下雨痕老師對這本書的評價(上圖第二排左側圖):

本書闡明瞭官方文件某些語焉不詳的部分,有助於 Gopher 瞭解更多內在實現,以及日常工作中需要用到的 RPC、Web、分散式應用等內容。我認識本書作者之一曹春暉,對他的學習態度和能力頗為欽佩,因此推薦大家閱讀本書。

大家可能不知道,出書一點都不賺錢,但投入的精力卻很大。但是像曹大在給讀者的書籤名時所說的:書籍是時代的生命。多少知識都是通過書本一代代傳承!

搬過幾次家就知道,紙質書太多,過程會比較痛苦。所以,我現在買紙書都會考慮再三。但是,這次我還是在第一時間下單了《Go 語言高階程式設計》。我也強烈推薦你買一本,支援原創者。

柴老師在武漢,我接觸不多。但和曹大卻是經常能見面(在同一個公司工作)。他本人經常活躍在各種微信群,社群,也非常樂於解答各種疑難雜症。上週還和曹大一起吃了個飯,請教了很多問題,我總結了一些對家都有用的東西,放在我的朋友圈:

如果你想圍觀我的朋友圈,想和我交流,可以長按下面的二維碼加我好友,備註下來自公眾號。

好了,下面開始我們的正文。

併發模型

併發與並行

大家都知道著名的摩爾定律。1965 年,時任仙童公司的 Gordon Moore 發表文章,預測在未來十年,半導體晶片上的電晶體和電阻數量將每年增加一倍;1975 年,Moore 再次發表論文,將“每年”修改為“每兩年”。這個預測在 2012 年左右基本是正確的。

但隨著電晶體電路逐漸接近效能極限,摩爾定律終將走到盡頭。靠增加電晶體數量來提高計算機的效能不靈了。於是,人們開始轉換思路,用其他方法來提升計算機的效能,這就是多核計算機產生的原因。

這一招看起來還不錯,但是人們又遇到了一個另一個定律的限制,那就是 Amdahl's Law,它提出了一個模型用來衡量在並行模式下程式執行效率的提升。這個定律是說,一個程式能從並行上獲得性能提升的上限取決於有多少程式碼必須寫成序列的。

舉個例子,對於一個和使用者打交道的介面程式,它必須和使用者打交道。使用者點一個按鈕,然後才能繼續執行下一步,這必須是序列執行的。這種程式的執行效率就取決於和使用者互動的速度,你有多少核都白瞎。使用者就是不按下一步,你怎麼辦?

2000 年左右雲端計算興起,人們可以方便地獲取計算雲上的資源,方便地水平擴充套件自己的服務,可以輕而易舉地就調動多臺機器資源甚至將計算任務分發到分佈在全球範圍的機器。但是也因此帶來了很多問題和挑戰。例如怎樣在機器間進行通訊、聚合結果等。最難的一個挑戰是如何找到一個模型能用來描述 concurrent。

我們都知道,要想一段併發的程式碼沒有任何 bug,是非常困難的。有些併發 bug 是在系統上線數年後才發現的,原因常常是很詭異的,比如使用者數增加到了某個界限。

併發問題一般有下面這幾種:

資料競爭。簡單來說就是兩個或多個執行緒同時讀寫某個變數,造成了預料之外的結果。

原子性。在一個定義好的上下文裡,原子性操作不可分割。上下文的定義非常重要。有些程式碼,你在程式裡看起來是原子的,如最簡單的 i++,但在機器層面看來,這條語句通常需要幾條指令來完成(Load,Incr,Store),不是不可分割的,也就不是原子性的。原子性可以讓我們放心地構造併發安全的程式。

記憶體訪問同步。程式碼中需要控制同時只有一個執行緒訪問的區域稱為臨界區。Go 語言中一般使用 sync 包裡的 Mutex 來完成同步訪問控制。鎖一般會帶來比較大的效能開銷,因此一般要考慮加鎖的區域是否會頻繁進入、鎖的粒度如何控制等問題。

死鎖。在一個死鎖的程式裡,每個執行緒都在等待其他執行緒,形成了一個首尾相連的尷尬局面,程式無法繼續執行下去。

活鎖。想象一下,你走在一條小路上,一個人迎面走來。你往左邊走,想避開他;他做了相反的事情,他往右邊走,結果兩個都過不了。之後,兩個人又都想從原來自己相反的方向走,還是同樣的結果。這就是活鎖,看起來都像在工作,但工作進度就是無法前進。

飢餓。併發的執行緒不能獲取它所需要的資源以進行下一步的工作。通常是有一個非常貪婪的執行緒,長時間佔據資源不釋放,導致其他執行緒無法獲得資源。

關於併發和並行的區別,引用一個經典的描述:

併發是同一時間應對(dealing with)多件事情的能力。
並行是同一時間動手(doing)做多件事情的能力。

雨痕老師《Go 語言學習筆記》上的解釋:

併發是指邏輯上具備同時處理多個任務的能力;並行則是物理上同時執行多個任務。

而根據《Concurrency in Go》這本書,計算機的概念都是抽象的結果,併發和並行也不例外。它這樣描述併發和並行的區別:

Concurrency is a property of the code; parallelism is a property of the running program.

併發是程式碼的特性,並行是正在執行的程式的特性。先忽略我拙劣的翻譯。很新奇,不是嗎?我也是第一次見到這樣的說法,細想一下,還是很有道理的。

我們一直說寫的程式碼是併發的或者是並行的,但是我們能提供什麼保證嗎?如果在只有一個核的機器上跑並行的程式碼,它還能並行嗎?你就是再天才,也無法寫出並行的程式。充其量也就是程式碼上看起來“併發”的,如此而已。

當然,表面上看起來還是並行的,但那不過 CPU 的障眼法,多個執行緒在分時共享 CPU 的資源,在一個粗糙的時間隔裡看起來就是“並行”。

所以,我們實際上只能編寫“併發”的程式碼,而不能編寫“並行”的程式碼,而且只是希望併發的程式碼能夠並行地執行。併發的程式碼能否並行,取決於抽象的層級:程式碼裡的併發原語、runtime,作業系統(虛擬機器、容器)。層級越來越底層,要求也越來越高。因此,我們談併發或並行實際上要指定上下文,也就是抽象的層級。

《Concurrency in Go》書裡舉了一個例子:假如兩個人同時開啟電腦上的計算器程式,這兩個程式肯定不會影響彼此,這就是並行。在這個例子中,上下文就是兩個人的機器,而兩個計算器程序就是並行的元素。

隨著抽象層次的降低,併發模型實際上變得更難也更重要,而越低層次的併發模型對我們也越重要。要想併發程式正確地執行,就要深入研究併發模型。

在 Go 語言釋出前,我們寫併發程式碼時,考慮到的最底層抽象是:系統執行緒。Go 釋出之後,在這條抽象鏈上,又加一個 goroutine。而且 Go 從著名的電腦科學家 Tony Hoare 那借來一個概念:channel。Tony Hoare 就是那篇著名文章《Communicating Sequential Processes》的作者。

看起來事情變得更加複雜,因為 Go 又引入了一個更底層的抽象,但事實並不是這樣。因為 goroutine 並不是看起來的那樣又抽象了一層,它其實是替代了系統執行緒。Gopher 在寫程式碼的時候,並不會去關心繫統執行緒,大部分時候只需要考慮到 goroutine 和 channel。當然有時候會用到一些共享記憶體的概念,一般就是指 sync 包裡的東西,比如 sync.Mutex。

什麼是 CSP

CSP 經常被認為是 Go 在併發程式設計上成功的關鍵因素。CSP 全稱是 “Communicating Sequential Processes”,這也是 Tony Hoare 在 1978 年發表在 ACM 的一篇論文。論文裡指出一門程式語言應該重視 input 和 output 的原語,尤其是併發程式設計的程式碼。

在那篇文章發表的時代,人們正在研究模組化程式設計的思想,該不該用 goto 語句在當時是最激烈的議題。彼時,面向物件程式設計的思想正在崛起,幾乎沒什麼人關心併發程式設計。

在文章中,CSP 也是一門自定義的程式語言,作者定義了輸入輸出語句,用於 processes 間的通訊(communicatiton)。processes 被認為是需要輸入驅動,並且產生輸出,供其他 processes 消費,processes 可以是程序、執行緒、甚至是程式碼塊。輸入命令是:!,用來向 processes 寫入;輸出是:?,用來從 processes 讀出。這篇文章要講的 channel 正是借鑑了這一設計。

Hoare 還提出了一個 -> 命令,如果 -> 左邊的語句返回 false,那它右邊的語句就不會執行。

通過這些輸入輸出命令,Hoare 證明了如果一門程式語言中把 processes 間的通訊看得第一等重要,那麼併發程式設計的問題就會變得簡單。

Go 是第一個將 CSP 的這些思想引入,並且發揚光大的語言。僅管記憶體同步訪問控制(原文是 memory access synchronization)在某些情況下大有用處,Go 裡也有相應的 sync 包支援,但是這在大型程式很容易出錯。

Go 一開始就把 CSP 的思想融入到語言的核心裡,所以併發程式設計成為 Go 的一個獨特的優勢,而且很容易理解。

大多數的程式語言的併發程式設計模型是基於執行緒和記憶體同步訪問控制,Go 的併發程式設計的模型則用 goroutine 和 channel 來替代。Goroutine 和執行緒類似,channel 和 mutex (用於記憶體同步訪問控制)類似。

Goroutine 解放了程式設計師,讓我們更能貼近業務去思考問題。而不用考慮各種像執行緒庫、執行緒開銷、執行緒排程等等這些繁瑣的底層問題,goroutine 天生替你解決好了。

Channel 則天生就可以和其他 channel 組合。我們可以把收集各種子系統結果的 channel 輸入到同一個 channel。Channel 還可以和 select, cancel, timeout 結合起來。而 mutex 就沒有這些功能。

Go 的併發原則非常優秀,目標就是簡單:儘量使用 channel;把 goroutine 當作免費的資源,隨便用。

說明一下,前面這兩部分的內容來自英文開源書《Concurrency In Go》,強烈推薦閱讀。

引入結束,我們正式開始今天的主角:channel。

什麼是 channel

Goroutine 和 channel 是 Go 語言併發程式設計的 兩大基石。Goroutine 用於執行併發任務,channel 用於 goroutine 之間的同步、通訊。

Channel 在 gouroutine 間架起了一條管道,在管道里傳輸資料,實現 gouroutine 間的通訊;由於它是執行緒安全的,所以用起來非常方便;channel 還提供“先進先出”的特性;它還能影響 goroutine 的阻塞和喚醒。

相信大家一定見過一句話:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享記憶體來通訊,而要通過通訊來實現記憶體共享。

這就是 Go 的併發哲學,它依賴 CSP 模型,基於 channel 實現。

簡直是一頭霧水,這兩句話難道不是同一個意思?

通過前面兩節的內容,我個人這樣理解這句話:前面半句說的是通過 sync 包裡的一些元件進行併發程式設計;而後面半句則是說 Go 推薦使用 channel 進行併發程式設計。兩者其實都是必要且有效的。實際上看完本文後面對 channel 的原始碼分析,你會發現,channel 的底層就是通過 mutex 來控制併發的。只是 channel 是更高一層次的併發程式設計原語,封裝了更多的功能。

關於是選擇 sync 包裡的底層併發程式設計原語還是 channel,《Concurrency In Go》這本書的第 2 章 “Go's Philosophy on Concurrency” 裡有一張決策樹和詳細的論述,再次推薦你去閱讀。我把圖貼出來:

channel 實現 CSP

Channel 是 Go 語言中一個非常重要的型別,是 Go 裡的第一物件。通過 channel,Go 實現了通過通訊來實現記憶體共享。Channel 是在多個 goroutine 之間傳遞資料和同步的重要手段。

使用原子函式、讀寫鎖可以保證資源的共享訪問安全,但使用 channel 更優雅。

channel 字面意義是“通道”,類似於 Linux 中的管道。宣告 channel 的語法如下:

chan T // 宣告一個雙向通道
chan<- T // 宣告一個只能用於傳送的通道
<-chan T // 宣告一個只能用於接收的通道

單向通道的宣告,用 <- 來表示,它指明通道的方向。你只要明白,程式碼的書寫順序是從左到右就馬上能掌握通道的方向是怎樣的。

因為 channel 是一個引用型別,所以在它被初始化之前,它的值是 nil,channel 使用 make 函式進行初始化。可以向它傳遞一個 int 值,代表 channel 緩衝區的大小(容量),構造出來的是一個緩衝型的 channel;不傳或傳 0 的,構造的就是一個非緩衝型的 channel。

兩者有一些差別:非緩衝型 channel 無法緩衝元素,對它的操作一定順序是“傳送-> 接收 -> 傳送 -> 接收 -> ……”,如果連續向一個非緩衝 chan 傳送 2 個元素,並且沒有接收的話,第二次一定會被阻塞;對於緩衝型 channel 的操作,則要“寬鬆”一些,畢竟是帶了“緩衝”光環。

為什麼要 channel

Go 通過 channel 實現 CSP 通訊模型,主要用於 goroutine 之間的訊息傳遞和事件通知。

有了 channel 和 goroutine 之後,Go 的併發程式設計變得異常容易和安全,得以讓程式設計師把注意力留到業務上去,實現開發效率的提升。

要知道,技術並不是最重要的,它只是實現業務的工具。一門高效的開發語言讓你把節省下來的時間,留著去做更有意義的事情,比如寫寫文章。

channel 實現原理

對 chan 的傳送和接收操作都會在編譯期間轉換成為底層的傳送接收函式。

Channel 分為兩種:帶緩衝、不帶緩衝。對不帶緩衝的 channel 進行的操作實際上可以看作“同步模式”,帶緩衝的則稱為“非同步模式”。

同步模式下,傳送方和接收方要同步就緒,只有在兩者都 ready 的情況下,資料才能在兩者間傳輸(後面會看到,實際上就是記憶體拷貝)。否則,任意一方先行進行傳送或接收操作,都會被掛起,等待另一方的出現才能被喚醒。

非同步模式下,在緩衝槽可用的情況下(有剩餘容量),傳送和接收操作都可以順利進行。否則,操作的一方(如寫入)同樣會被掛起,直到出現相反操作(如接收)才會被喚醒。

小結一下:同步模式下,必須要使傳送方和接收方配對,操作才會成功,否則會被阻塞;非同步模式下,緩衝槽要有剩餘容量,操作才會成功,否則也會被阻塞。

資料結構

直接上原始碼(版本是 1.9.2):

type hchan struct {
    // chan 裡元素數量
    qcount   uint
    // chan 底層迴圈陣列的長度
    dataqsiz uint
    // 指向底層迴圈陣列的指標
    // 只針對有緩衝的 channel
    buf      unsafe.Pointer
    // chan 中元素大小
    elemsize uint16
    // chan 是否被關閉的標誌
    closed   uint32
    // chan 中元素型別
    elemtype *_type // element type
    // 已傳送元素在迴圈陣列中的索引
    sendx    uint   // send index
    // 已接收元素在迴圈陣列中的索引
    recvx    uint   // receive index
    // 等待接收的 goroutine 佇列
    recvq    waitq  // list of recv waiters
    // 等待發送的 goroutine 佇列
    sendq    waitq  // list of send waiters

    // 保護 hchan 中所有欄位
    lock mutex
}

關於欄位的含義都寫在註釋裡了,再來重點說幾個欄位:

buf 指向底層迴圈陣列,只有緩衝型的 channel 才有。

sendxrecvx 均指向底層迴圈陣列,表示當前可以傳送和接收的元素位置索引值(相對於底層陣列)。

sendqrecvq 分別表示被阻塞的 goroutine,這些 goroutine 由於嘗試讀取 channel 或向 channel 傳送資料而被阻塞。

waitqsudog 的一個雙向連結串列,而 sudog 實際上是對 goroutine 的一個封裝:

type waitq struct {
    first *sudog
    last  *sudog
}

lock 用來保證每個讀 channel 或寫 channel 的操作都是原子的。

例如,建立一個容量為 6 的,元素為 int 型的 channel 資料結構如下 :

建立

我們知道,通道有兩個方向,傳送和接收。理論上來說,我們可以建立一個只發送或只接收的通道,但是這種通道創建出來後,怎麼使用呢?一個只能發的通道,怎麼接收呢?同樣,一個只能收的通道,如何向其傳送資料呢?

一般而言,使用 make 建立一個能收能發的通道:

// 無緩衝通道
ch1 := make(chan int)
// 有緩衝通道
ch2 := make(chan int, 10)

通過彙編分析,我們知道,最終建立 chan 的函式是 makechan

func makechan(t *chantype, size int64) *hchan

從函式原型來看,建立的 chan 是一個指標。所以我們能在函式間直接傳遞 channel,而不用傳遞 channel 的指標。

具體來看下程式碼:

const hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))

func makechan(t *chantype, size int64) *hchan {
    elem := t.elem

    // 省略了檢查 channel size,align 的程式碼
    // ……

    var c *hchan
    // 如果元素型別不含指標 或者 size 大小為 0(無緩衝型別)
    // 只進行一次記憶體分配
    if elem.kind&kindNoPointers != 0 || size == 0 {
        // 如果 hchan 結構體中不含指標,GC 就不會掃描 chan 中的元素
        // 只分配 "hchan 結構體大小 + 元素大小*個數" 的記憶體
        c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true))
        // 如果是緩衝型 channel 且元素大小不等於 0(大小等於 0的元素型別:struct{})
        if size > 0 && elem.size != 0 {
            c.buf = add(unsafe.Pointer(c), hchanSize)
        } else {
            // race detector uses this location for synchronization
            // Also prevents us from pointing beyond the allocation (see issue 9401).
            // 1. 非緩衝型的,buf 沒用,直接指向 chan 起始地址處
            // 2. 緩衝型的,能進入到這裡,說明元素無指標且元素型別為 struct{},也無影響
            // 因為只會用到接收和傳送遊標,不會真正拷貝東西到 c.buf 處(這會覆蓋 chan的內容)
            c.buf = unsafe.Pointer(c)
        }
    } else {
        // 進行兩次記憶體分配操作
        c = new(hchan)
        c.buf = newarray(elem, int(size))
    }
    c.elemsize = uint16(elem.size)
    c.elemtype = elem
    // 迴圈陣列長度
    c.dataqsiz = uint(size)

    // 返回 hchan 指標
    return c
}

新建一個 chan 後,記憶體在堆上分配,大概長這樣:

說明一下,這張圖來源於 Gopher Con 上的一份 PPT,地址見參考資料。這份材料非常清晰易懂,推薦你去讀。

接下來,我們用一個來自參考資料【深入 channel 底層】的例子來理解建立、傳送、接收的整個過程。

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("G1 received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("G2 received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)
}

首先建立了一個無緩衝的 channel,接著啟動兩個 goroutine,並將前面建立的 channel 傳遞進去。然後,向這個 channel 中傳送資料 3,最後 sleep 1 秒後程序退出。

程式第 14 行建立了一個非緩衝型的 channel,我們只看 chan 結構體中的一些重要欄位,來從整體層面看一下 chan 的狀態,一開始什麼都沒有:

接收

在繼續分析前面小節的例子前,我們先來看一下接收相關的原始碼。在清楚了接收的具體過程之後,也就能輕鬆理解具體的例子了。

接收操作有兩種寫法,一種帶 "ok",反應 channel 是否關閉;一種不帶 "ok",這種寫法,當接收到相應型別的零值時無法知道是真實的傳送者傳送過來的值,還是 channel 被關閉後,返回給接收者的預設型別的零值。兩種寫法,都有各自的應用場景。

經過編譯器的處理後,這兩種寫法最後對應原始碼裡的這兩個函式:

// entry points for <- c from compiled code
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
    _, received = chanrecv(c, elem, true)
    return
}

chanrecv1 函式處理不帶 "ok" 的情形,chanrecv2 則通過返回 "received" 這個欄位來反應 channel 是否被關閉。接收值則比較特殊,會“放到”引數 elem 所指向的地址了,這很像 C/C++ 裡的寫法。如果程式碼裡忽略了接收值,這裡的 elem 為 nil。

無論如何,最終轉向了 chanrecv 函式:

// 位於 src/runtime/chan.go

// chanrecv 函式接收 channel c 的元素並將其寫入 ep 所指向的記憶體地址。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有資料可接收的情況下,返回 (false, false)
// 否則,如果 c 處於關閉狀態,將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的記憶體地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函式呼叫者的棧

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    // 省略 debug 內容 …………

    // 如果是一個 nil 的 channel
    if c == nil {
        // 如果不阻塞,直接返回 (false, false)
        if !block {
            return
        }
        // 否則,接收一個 nil 的 channel,goroutine 掛起
        gopark(nil, nil, "chan receive (nil chan)", traceEvGoStop, 2)
        // 不會執行到這裡
        throw("unreachable")
    }

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
    // 當我們觀察到 channel 沒準備好接收:
    // 1. 非緩衝型,等待發送列隊 sendq 裡沒有 goroutine 在等待
    // 2. 緩衝型,但 buf 裡沒有元素
    // 之後,又觀察到 closed == 0,即 channel 未關閉。
    // 因為 channel 不可能被重複開啟,所以前一個觀測的時候 channel 也是未關閉的,
    // 因此在這種情況下可以直接宣佈接收失敗,返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 加鎖
    lock(&c.lock)

    // channel 已關閉,並且迴圈陣列 buf 裡沒有元素
    // 這裡可以處理非緩衝型關閉 和 緩衝型關閉但 buf 無元素的情況
    // 也就是說即使是關閉狀態,但在緩衝型的 channel,
    // buf 裡有元素的情況下還能接收到元素
    if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(unsafe.Pointer(c))
        }
        // 解鎖
        unlock(&c.lock)
        if ep != nil {
            // 從一個已關閉的 channel 執行接收操作,且未忽略返回值
            // 那麼接收的值將是一個該型別的零值
            // typedmemclr 根據型別清理相應地址的記憶體
            typedmemclr(c.elemtype, ep)
        }
        // 從一個已關閉的 channel 接收,selected 會返回true
        return true, false
    }

    // 等待發送佇列裡有 goroutine 存在,說明 buf 是滿的
    // 這有可能是:
    // 1. 非緩衝型的 channel
    // 2. 緩衝型的 channel,但 buf 滿了
    // 針對 1,直接進行記憶體拷貝(從 sender goroutine -> receiver goroutine)
    // 針對 2,接收到迴圈陣列頭部的元素,並將傳送者的元素放到迴圈陣列尾部
    if sg := c.sendq.dequeue(); sg != nil {
        // Found a waiting sender. If buffer is size 0, receive value
        // directly from sender. Otherwise, receive from head of queue
        // and add sender's value to the tail of the queue (both map to
        // the same buffer slot because the queue is full).
        recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true, true
    }

    // 緩衝型,buf 裡有元素,可以正常接收
    if c.qcount > 0 {
        // 直接從迴圈數組裡找到要接收的元素
        qp := chanbuf(c, c.recvx)

        // …………

        // 程式碼裡,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // 清理掉迴圈數組裡相應位置的值
        typedmemclr(c.elemtype, qp)
        // 接收遊標向前移動
        c.recvx++
        // 接收遊標歸零
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        // buf 數組裡的元素個數減 1
        c.qcount--
        // 解鎖
        unlock(&c.lock)
        return true, true
    }

    if !block {
        // 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
        unlock(&c.lock)
        return false, false
    }

    // 接下來就是要被阻塞的情況了
    // 構造一個 sudog
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    // 待接收資料的地址儲存下來
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.param = nil
    // 進入channel 的等待接收佇列
    c.recvq.enqueue(mysg)
    // 將當前 goroutine 掛起
    goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)

    // 被喚醒了,接著從這裡繼續執行一些掃尾工作
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    closed := gp.param == nil
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, !closed
}

上面的程式碼註釋地比較詳細了,你可以對著原始碼一行行地去看,我們再來詳細看一下。

  • 如果 channel 是一個空值(nil),在非阻塞模式下,會直接返回。在阻塞模式下,會呼叫 gopark 函式掛起 goroutine,這個會一直阻塞下去。因為在 channel 是 nil 的情況下,要想不阻塞,只有關閉它,但關閉一個 nil 的 channel 又會發生 panic,所以沒有機會被喚醒了。更詳細地可以在 closechan 函式的時候再看。

  • 和傳送函式一樣,接下來搞了一個在非阻塞模式下,不用獲取鎖,快速檢測到失敗並且返回的操作。順帶插一句,我們平時在寫程式碼的時候,找到一些邊界條件,快速返回,能讓程式碼邏輯更清晰,因為接下來的正常情況就比較少,更聚焦了,看程式碼的人也更能專注地看核心程式碼邏輯了。

    // 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回 (false, false)
    if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
        c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
        atomic.Load(&c.closed) == 0 {
        return
    }

當我們觀察到 channel 沒準備好接收:

  1. 非緩衝型,等待發送列隊裡沒有 goroutine 在等待
  2. 緩衝型,但 buf 裡沒有元素

之後,又觀察到 closed == 0,即 channel 未關閉。

因為 channel 不可能被重複開啟,所以前一個觀測的時候, channel 也是未關閉的,因此在這種情況下可以直接宣佈接收失敗,快速返回。因為沒被選中,也沒接收到資料,所以返回值為 (false, false)。

  • 接下來的操作,首先會上一把鎖,粒度比較大。如果 channel 已關閉,並且迴圈陣列 buf 裡沒有元素。對應非緩衝型關閉和緩衝型關閉但 buf 無元素的情況,返回對應型別的零值,但 received 標識是 false,告訴呼叫者此 channel 已關閉,你取出來的值並不是正常由傳送者傳送過來的資料。但是如果處於 select 語境下,這種情況是被選中了的。很多將 channel 用作通知訊號的場景就是命中了這裡。

  • 接下來,如果有等待發送的佇列,說明 channel 已經滿了,要麼是非緩衝型的 channel,要麼是緩衝型的 channel,但 buf 滿了。這兩種情況下都可以正常接收資料。

於是,呼叫 recv 函式:

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 如果是非緩衝型的 channel
    if c.dataqsiz == 0 {
        if raceenabled {
            racesync(c, sg)
        }
        // 未忽略接收的資料
        if ep != nil {
            // 直接拷貝資料,從 sender goroutine -> receiver goroutine
            recvDirect(c.elemtype, sg, ep)
        }
    } else {
        // 緩衝型的 channel,但 buf 已滿。
        // 將迴圈陣列 buf 隊首的元素拷貝到接收資料的地址
        // 將傳送者的資料入隊。實際上這時 revx 和 sendx 值相等
        // 找到接收遊標
        qp := chanbuf(c, c.recvx)
        // …………
        // 將接收遊標處的資料拷貝給接收者
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }

        // 將傳送者資料拷貝到 buf
        typedmemmove(c.elemtype, qp, sg.elem)
        // 更新遊標值
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx
    }
    sg.elem = nil
    gp := sg.g

    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }

    // 喚醒傳送的 goroutine。需要等到排程器的光臨
    goready(gp, skip+1)
}

如果是非緩衝型的,就直接從傳送者的棧拷貝到接收者的棧。

func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) {
    // dst is on our stack or the heap, src is on another stack.
    src := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

否則,就是緩衝型 channel,而 buf 又滿了的情形。說明發送遊標和接收遊標重合了,因此需要先找到接收遊標:

// chanbuf(c, i) is pointer to the i'th slot in the buffer.
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

將該處的元素拷貝到接收地址。然後將傳送者待發送的資料拷貝到接收遊標處。這樣就完成了接收資料和傳送資料的操作。接著,分別將傳送遊標和接收遊標向前進一,如果發生“環繞”,再從 0 開始。

最後,取出 sudog 裡的 goroutine,呼叫 goready 將其狀態改成 “runnable”,待發送者被喚醒,等待排程器的排程。

  • 然後,如果 channel 的 buf 裡還有資料,說明可以比較正常地接收。注意,這裡,即使是在 channel 已經關閉的情況下,也是可以走到這裡的。這一步比較簡單,正常地將 buf 裡接收遊標處的資料拷貝到接收資料的地址。

  • 到了最後一步,走到這裡來的情形是要阻塞的。當然,如果 block 傳進來的值是 false,那就不阻塞,直接返回就好了。

先構造一個 sudog,接著就是儲存各種值了。注意,這裡會將接收資料的地址儲存到了 elem 欄位,當被喚醒時,接收到的資料就會儲存到這個欄位指向的地址。然後將 sudog 新增到 channel 的 recvq 佇列裡。呼叫 goparkunlock 函式將 goroutine 掛起。

接下來的程式碼就是 goroutine 被喚醒後的各種收尾工作了。

我們繼續之前的例子。前面說到第 14 行,建立了一個非緩衝型的 channel,接著,第 15、16 行分別建立了一個 goroutine,各自執行了一個接收操作。通過前面的原始碼分析,我們知道,這兩個 goroutine (後面稱為 G1 和 G2 好了)都會被阻塞在接收操作。G1 和 G2 會掛在 channel 的 recq 佇列中,形成一個雙向迴圈連結串列。

在程式的 17 行之前,chan 的整體資料結構如下:

buf 指向一個長度為 0 的陣列,qcount 為 0,表示 channel 中沒有元素。重點關注 recvqsendq,它們是 waitq 結構體,而 waitq 實際上就是一個雙向連結串列,連結串列的元素是 sudog,裡面包含 g 欄位,g 表示一個 goroutine,所以 sudog 可以看成一個 goroutine。recvq 儲存那些嘗試讀取 channel 但被阻塞的 goroutine,sendq 則儲存那些嘗試寫入 channel,但被阻塞的 goroutine。

此時,我們可以看到,recvq 裡掛了兩個 goroutine,也就是前面啟動的 G1 和 G2。因為沒有 goroutine 接收,而 channel 又是無緩衝型別,所以 G1 和 G2 被阻塞。sendq 沒有被阻塞的 goroutine。

recvq 的資料結構如下。這裡直接引用文章中的一幅圖,用了三維元素,畫得很好:

再從整體上來看一下 chan 此時的狀態:

G1 和 G2 被掛起了,狀態是 WAITING。關於 goroutine 排程器這塊不是今天的重點,當然後面肯定會寫相關的文章。這裡先簡單說下,goroutine 是使用者態的協程,由 Go runtime 進行管理,作為對比,核心執行緒由 OS 進行管理。Goroutine 更輕量,因此我們可以輕鬆建立數萬 goroutine。

一個核心執行緒可以管理多個 goroutine,當其中一個 goroutine 阻塞時,核心執行緒可以排程其他的 goroutine 來執行,核心執行緒本身不會阻塞。這就是通常我們說的 M:N 模型:

M:N 模型通常由三部分構成:M、P、G。M 是核心執行緒,負責執行 goroutine;P 是 context,儲存 goroutine 執行所需要的上下文,它還維護了可執行(runnable)的 goroutine 列表;G 則是待執行的 goroutine。M 和 P 是 G 執行的基礎。

繼續回到例子。假設我們只有一個 M,當 G1(go goroutineA(ch)) 執行到 val := <- a 時,它由本來的 running 狀態變成了 waiting 狀態(呼叫了 gopark 之後的結果):

G1 脫離與 M 的關係,但排程器可不會讓 M 閒著,所以會接著排程另一個 goroutine 來執行:

G2 也是同樣的遭遇。現在 G1 和 G2 都被掛起了,等待著一個 sender 往 channel 裡傳送資料,才能得到解救。

傳送

接著上面的例子,G1 和 G2 現在都在 recvq 佇列裡了。

ch <- 3

第 17 行向 channel 傳送了一個元素 3。

傳送操作最終轉化為 chansend 函式,直接上原始碼,同樣大部分都註釋了,可以看懂主流程:

// 位於 src/runtime/chan.go

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    // 如果 channel 是 nil
    if c == nil {
        // 不能阻塞,直接返回 false,表示未傳送成功
        if !block {
            return false
        }
        // 當前 goroutine 被掛起
        gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
        throw("unreachable")
    }

    // 省略 debug 相關……

    // 對於不阻塞的 send,快速檢測失敗場景
    //
    // 如果 channel 未關閉且 channel 沒有多餘的緩衝空間。這可能是:
    // 1. channel 是非緩衝型的,且等待接收佇列裡沒有 goroutine
    // 2. channel 是緩衝型的,但迴圈陣列已經裝滿了元素
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }

    var t0 int64
    if blockprofilerate > 0 {
        t0 = cputicks()
    }

    // 鎖住 channel,併發安全
    lock(&c.lock)

    // 如果 channel 關閉了
    if c.closed != 0 {
        // 解鎖
        unlock(&c.lock)
        // 直接 panic
        panic(plainError("send on closed channel"))
    }

    // 如果接收佇列裡有 goroutine,直接將要傳送的資料拷貝到接收 goroutine
    if sg := c.recvq.dequeue(); sg != nil {
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }

    // 對於緩衝型的 channel,如果還有緩衝空間
    if c.qcount < c.dataqsiz {
        // qp 指向 buf 的 sendx 位置
        qp := chanbuf(c, c.sendx)

        // ……

        // 將資料從 ep 處拷貝到 qp
        typedmemmove(c.elemtype, qp, ep)
        // 傳送遊標值加 1
        c.sendx++
        // 如果傳送遊標值等於容量值,遊標值歸 0
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        // 緩衝區的元素數量加一
        c.qcount++

        // 解鎖
        unlock(&c.lock)
        return true
    }

    // 如果不需要阻塞,則直接返回錯誤
    if !block {
        unlock(&c.lock)
        return false
    }

    // channel 滿了,傳送方會被阻塞。接下來會構造一個 sudog

    // 獲取當前 goroutine 的指標
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    if t0 != 0 {
        mysg.releasetime = -1
    }

    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.selectdone = nil
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil

    // 當前 goroutine 進入傳送等待佇列
    c.sendq.enqueue(mysg)

    // 當前 goroutine 被掛起
    goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

    // 從這裡開始被喚醒了(channel 有機會可以傳送了)
    if mysg != gp.waiting {
        throw("G waiting list is corrupted")
    }
    gp.waiting = nil
    if gp.param == nil {
        if c.closed == 0 {
            throw("chansend: spurious wakeup")
        }
        // 被喚醒後,channel 關閉了。坑爹啊,panic
        panic(plainError("send on closed channel"))
    }
    gp.param = nil
    if mysg.releasetime > 0 {
        blockevent(mysg.releasetime-t0, 2)
    }
    // 去掉 mysg 上繫結的 channel
    mysg.c = nil
    releaseSudog(mysg)
    return true
}

上面的程式碼註釋地比較詳細了,我們來詳細看看。

  • 如果檢測到 channel 是空的,當前 goroutine 會被掛起。

  • 對於不阻塞的傳送操作,如果 channel 未關閉並且沒有多餘的緩衝空間(說明:a. channel 是非緩衝型的,且等待接收佇列裡沒有 goroutine;b. channel 是緩衝型的,但迴圈陣列已經裝滿了元素)

對於這一點,runtime 原始碼裡註釋了很多。這一條判斷語句是為了在不阻塞傳送的場景下快速檢測到傳送失敗,好快速返回。

if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
    return false
}

註釋裡主要講為什麼這一塊可以不加鎖,我詳細解釋一下。if 條件裡先讀了兩個變數:block 和 c.closed。block 是函式的引數,不會變;c.closed 可能被其他 goroutine 改變,因為沒加鎖嘛,這是“與”條件前面兩個表示式。

最後一項,涉及到三個變數:c.dataqsiz,c.recvq.first,c.qcount。c.dataqsiz == 0 && c.recvq.first == nil 指的是非緩衝型的 channel,並且 recvq 裡沒有等待接收的 goroutine;c.dataqsiz > 0 && c.qcount == c.dataqsiz 指的是緩衝型的 channel,但迴圈陣列已經滿了。這裡 c.dataqsiz 實際上也是不會被修改的,在建立的時候就已經確定了。不加鎖真正影響地是 c.qcountc.recvq.first

這一部分的條件就是兩個 word-sized read,就是讀兩個 word 操作:c.closedc.recvq.first(非緩衝型) 或者 c.qcount(緩衝型)。

當我們發現 c.closed == 0 為真,也就是 channel 未被關閉,再去檢測第三部分的條件時,觀測到 c.recvq.first == nil 或者 c.qcount == c.dataqsiz 時(這裡忽略 c.dataqsiz),就斷定要將這次傳送操作作失敗處理,快速返回 false。

這裡涉及到兩個觀測項:channel 未關閉、channel not ready for sending。這兩項都會因為沒加鎖而出現觀測前後不一致的情況。例如我先觀測到 channel 未被關閉,再觀察到 channel not ready for sending,這時我以為能滿足這個 if 條件了,但是如果這時 c.closed 變成 1,這時其實就不滿足條件了,誰讓你不加鎖呢!

但是,因為一個 closed channel 不能將 channel 狀態從 'ready for sending' 變成 'not ready for sending',所以當我觀測到 'not ready for sending' 時,channel 不是 closed。即使 c.closed == 1,即 channel 是在這兩個觀測中間被關閉的,那也說明在這兩個觀測中間,channel 滿足兩個條件:not closednot ready for sending,這時,我直接返回 false 也是沒有問題的。

這部分解釋地比較繞,其實這樣做的目的就是少獲取一次鎖,提升效能。

  • 如果檢測到 channel 已經關閉,直接 panic。

  • 如果能從等待接收佇列 recvq 裡出隊一個 sudog(代表一個 goroutine),說明此時 channel 是空的,沒有元素,所以才會有等待接收者。這時會呼叫 send 函式將元素直接從傳送者的棧拷貝到接收者的棧,關鍵操作由 sendDirect 函式完成。

// send 函式處理向一個空的 channel 傳送操作

// ep 指向被髮送的元素,會被直接拷貝到接收的 goroutine
// 之後,接收的 goroutine 會被喚醒
// c 必須是空的(因為等待佇列裡有 goroutine,肯定是空的)
// c 必須被上鎖,傳送操作執行完後,會使用 unlockf 函式解鎖
// sg 必須已經從等待佇列裡取出來了
// ep 必須是非空,並且它指向堆或呼叫者的棧

func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    // 省略一些用不到的
    // ……

    // sg.elem 指向接收到的值存放的位置,如 val <- ch,指的就是 &val
    if sg.elem != nil {
        // 直接拷貝記憶體(從傳送者到接收者)
        sendDirect(c.elemtype, sg, ep)
        sg.elem = nil
    }
    // sudog 上繫結的 goroutine
    gp := sg.g
    // 解鎖
    unlockf()
    gp.param = unsafe.Pointer(sg)
    if sg.releasetime != 0 {
        sg.releasetime = cputicks()
    }
    // 喚醒接收的 goroutine. skip 和列印棧相關,暫時不理會
    goready(gp, skip+1)
}

繼續看 sendDirect 函式:

// 向一個非緩衝型的 channel 傳送資料、從一個無元素的(非緩衝型或緩衝型但空)的 channel
// 接收資料,都會導致一個 goroutine 直接操作另一個 goroutine 的棧
// 由於 GC 假設對棧的寫操作只能發生在 goroutine 正在執行中並且由當前 goroutine 來寫
// 所以這裡實際上違反了這個假設。可能會造成一些問題,所以需要用到寫屏障來規避
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
    // src 在當前 goroutine 的棧上,dst 是另一個 goroutine 的棧

    // 直接進行記憶體"搬遷"
    // 如果目標地址的棧發生了棧收縮,當我們讀出了 sg.elem 後
    // 就不能修改真正的 dst 位置的值了
    // 因此需要在讀和寫之前加上一個屏障
    dst := sg.elem
    typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
    memmove(dst, src, t.size)
}

這裡涉及到一個 goroutine 直接寫另一個 goroutine 棧的操作,一般而言,不同 goroutine 的棧是各自獨有的。而這也違反了 GC 的一些假設。為了不出問題,寫的過程中增加了寫屏障,保證正確地完成寫操作。這樣做的好處是減少了一次記憶體 copy:不用先拷貝到 channel 的 buf,直接由傳送者到接收者,沒有中間商賺差價,效率得以提高,完美。

然後,解鎖、喚醒接收者,等待排程器的光臨,接收者也得以重見天日,可以繼續執行接收操作之後的程式碼了。

  • 如果 c.qcount < c.dataqsiz,說明緩衝區可用(肯定是緩衝型的 channel)。先通過函式取出待發送元素應該去到的位置:
qp := chanbuf(c, c.sendx)

// 返回迴圈佇列裡第 i 個元素的地址處
func chanbuf(c *hchan, i uint) unsafe.Pointer {
    return add(c.buf, uintptr(i)*uintptr(c.elemsize))
}

c.sendx 指向下一個待發送元素在迴圈陣列中的位置,然後呼叫 typedmemmove 函式將其拷貝到迴圈陣列中。之後 c.sendx 加 1,元素總量加 1 :c.qcount++,最後,解鎖並返回。

  • 如果沒有命中以上條件的,說明 channel 已經滿了。不管這個 channel 是緩衝型的還是非緩衝型的,都要將這個 sender “關起來”(goroutine 被阻塞)。如果 block 為 false,直接解鎖,返回 false。

  • 最後就是真的需要被阻塞的情況。先構造一個 sudog,將其入隊(channel 的 sendq 欄位)。然後呼叫 goparkunlock 將當前 goroutine 掛起,並解鎖,等待合適的時機再喚醒。

喚醒之後,從 goparkunlock 下一行程式碼開始繼續往下執行。

這裡有一些繫結操作,sudog 通過 g 欄位繫結 goroutine,而 goroutine 通過 waiting 繫結 sudog,sudog 還通過 elem 欄位繫結待發送元素的地址,以及 c 欄位繫結被“坑”在此處的 channel。

所以,待發送的元素地址其實是儲存在 sudog 結構體裡,也就是當前 goroutine 裡。

好了,看完原始碼。我們接著來分析例子,相信大家已經把例子忘得差不多了,我再貼一下程式碼:

func goroutineA(a <-chan int) {
    val := <- a
    fmt.Println("goroutine A received data: ", val)
    return
}

func goroutineB(b <-chan int) {
    val := <- b
    fmt.Println("goroutine B received data: ", val)
    return
}

func main() {
    ch := make(chan int)
    go goroutineA(ch)
    go goroutineB(ch)
    ch <- 3
    time.Sleep(time.Second)

    ch1 := make(chan struct{})
}

在傳送小節裡我們說到 G1 和 G2 現在被掛起來了,等待 sender 的解救。在第 17 行,主協程向 ch 傳送了一個元素 3,來看下接下來會發生什麼。

根據前面原始碼分析的結果,我們知道,sender 發現 ch 的 recvq 裡有 receiver 在等待著接收,就會出隊一個 sudog,把 recvq 裡 first 指標的 sudo “推舉”出來了,並將其加入到 P 的可執行 goroutine 佇列中。

然後,sender 把傳送元素拷貝到 sudog 的 elem 地址處,最後會呼叫 goready 將 G1 喚醒,狀態變為 runnable。

當排程器光顧 G1 時,將 G1 變成 running 狀態,執行 goroutineA 接下來的程式碼。G 表示其他可能有的 goroutine。

這裡其實涉及到一個協程寫另一個協程棧的操作。有兩個 receiver 在 channel 的一邊虎視眈眈地等著,這時 channel 另一邊來了一個 sender 準備向 channel 傳送資料,為了高效,用不著通過 channel 的 buf “中轉”一次,直接從源地址把資料 copy 到目的地址就可以了,效率高啊!

上圖是一個示意圖,3 會被拷貝到 G1 棧上的某個位置,也就是 val 的地址處,儲存在 elem 欄位。

關閉

關閉某個 channel,會執行函式 closechan

func closechan(c *hchan) {
    // 關閉一個 nil channel,panic
    if c == nil {
        panic(plainError("close of nil channel"))
    }

    // 上鎖
    lock(&c.lock)
    // 如果 channel 已經關閉
    if c.closed != 0 {
        unlock(&c.lock)
        // panic
        panic(plainError("close of closed channel"))
    }

    // …………

    // 修改關閉狀態
    c.closed = 1

    var glist *g

    // 將 channel 所有等待接收佇列的裡 sudog 釋放
    for {
        // 從接收佇列裡出隊一個 sudog
        sg := c.recvq.dequeue()
        // 出隊完畢,跳出迴圈
        if sg == nil {
            break
        }

        // 如果 elem 不為空,說明此 receiver 未忽略接收資料
        // 給它賦一個相應型別的零值
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        // 取出 goroutine
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 相連,形成連結串列
        gp.schedlink.set(glist)
        glist = gp
    }

    // 將 channel 等待發送佇列裡的 sudog 釋放
    // 如果存在,這些 goroutine 將會 panic
    for {
        // 從傳送佇列裡出隊一個 sudog
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }

        // 傳送者會 panic
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, unsafe.Pointer(c))
        }
        // 形成連結串列
        gp.schedlink.set(glist)
        glist = gp
    }
    // 解鎖
    unlock(&c.lock)

    // Ready all Gs now that we've dropped the channel lock.
    // 遍歷連結串列
    for glist != nil {
        // 取最後一個
        gp := glist
        // 向前走一步,下一個喚醒的 g
        glist = glist.schedlink.ptr()
        gp.schedlink = 0
        // 喚醒相應 goroutine
        goready(gp, 3)
    }
}

close 邏輯比較簡單,對於一個 channel,recvq 和 sendq 中分別儲存了阻塞的傳送者和接收者。關閉 channel 後,對於等待接收者而言,會收到一個相應型別的零值。對於等待發送者,會直接 panic。所以,在不瞭解 channel 還有沒有接收者的情況下,不能貿然關閉 channel。

close 函式先上一把大鎖,接著把所有掛在這個 channel 上的 sender 和 receiver 全都連成一個 sudog 連結串列,再解鎖。最後,再將所有的 sudog 全都喚醒。

喚醒之後,該幹嘛幹嘛。sender 會繼續執行 chansend 函式裡 goparkunlock 函式之後的程式碼,很不幸,檢測到 channel 已經關閉了,panic。receiver 則比較幸運,進行一些掃尾工作後,返回。這裡,selected 返回 true,而返回值 received 則要根據 channel 是否關閉,返回不同的值。如果 channel 關閉,received 為 false,否則為 true。這我們分析的這種情況下,received 返回 false。

channel 進階

總結一下操作 channel 的結果:

操作 nil channel closed channel not nil, not closed channel
close panic panic 正常關閉
讀 <- ch 阻塞 讀到對應型別的零值 阻塞或正常讀取資料。緩衝型 channel 為空或非緩衝型 channel 沒有等待發送者時會阻塞
寫 ch <- 阻塞 panic 阻塞或正常寫入資料。非緩衝型 channel 沒有等待接收者或緩衝型 channel buf 滿時會被阻塞

總結一下,發生 panic 的情況有三種:向一個關閉的 channel 進行寫操作;關閉一個 nil 的 channel;重複關閉一個 channel。

讀、寫一個 nil channel 都會被阻塞。

傳送和接收元素的本質

Channel 傳送和接收元素的本質是什麼?參考資料【深入 channel 底層】裡是這樣回答的:

Remember all transfer of value on the go channels happens with the copy of value.

就是說 channel 的傳送和接收操作本質上都是 “值的拷貝”,無論是從 sender goroutine 的棧到 chan buf,還是從 chan buf 到 receiver goroutine,或者是直接從 sender goroutine 到 receiver goroutine。

這裡再引用文中的一個例子,我會加上更加詳細地解釋。順帶說一下,這是一篇英文的部落格,寫得很好,沒有像我們這篇文章那樣大段的原始碼分析,它是將程式碼裡情況拆開來各自描述的,各有利弊吧。推薦去讀下原文,閱讀體驗比較好。

type user struct {
    name string
    age int8
}

var u = user{name: "Ankur", age: 25}
var g = &u

func modifyUser(pu *user) {
    fmt.Println("modifyUser Received Vaule", pu)
    pu.name = "Anand"
}

func printUser(u <-chan *user) {
    time.Sleep(2 * time.Second)
    fmt.Println("printUser goRoutine called", <-u)
}

func main() {
    c := make(chan *user, 5)
    c <- g
    fmt.Println(g)
    // modify g
    g = &user{name: "Ankur Anand", age: 100}
    go printUser(c)
    go modifyUser(g)
    time.Sleep(5 * time.Second)
    fmt.Println(g)
}

執行結果:

&{Ankur 25}
modifyUser Received Value &{Ankur Anand 100}
printUser goRoutine called &{Ankur 25}
&{Anand 100}

這裡就是一個很好的 share memory by communicating 的例子。

一開始構造一個結構體 u,地址是 0x56420,圖中地址上方就是它的內容。接著把 &u 賦值給指標 g,g 的地址是 0x565bb0,它的內容就是一個地址,指向 u。

main 程式裡,先把 g 傳送到 c,根據 copy value 的本質,進入到 chan buf 裡的就是 0x56420,它是指標 g 的值(不是它指向的內容),所以列印從 channel 接收到的元素時,它就是 &{Ankur 25}。因此,這裡並不是將指標 g “傳送” 到了 channel 裡,只是拷貝它的值而已。

再強調一次:

Remember all transfer of value on the go channels happens with the copy of value.

資源洩漏

Channel 可能會引發 goroutine 洩漏。

洩漏的原因是 goroutine 操作 channel 後,處於傳送或接收阻塞狀態,而 channel 處於滿或空的狀態,一直得不到改變。同時,垃圾回收器也不會回收此類資源,進而導致 gouroutine 會一直處於等待佇列中,不見天日。

雨痕老師的《Go 語言學習筆記》第 8 章通道的“資源洩露”一節舉了個例子,大家可以自己去看。

happen