Go基礎系列:channel入門
channel基礎
channel用於goroutines之間的通訊,讓它們之間可以進行資料交換。像管道一樣,一個goroutine_A向channel_A中放資料,另一個goroutine_B從channel_A取資料。
channel是指標型別的資料型別,通過make來分配記憶體。例如:
ch := make(chan int)
這表示建立一個channel,這個channel中只能儲存int型別的資料。也就是說一端只能向此channel中放進int型別的值,另一端只能從此channel中讀出int型別的值。
需要注意, chan TYPE
才表示channel的型別。所以其作為引數或返回值時,需指定為 xxx chan int
類似的格式。
向ch這個channel放資料的操作形式為:
ch <- VALUE
從ch這個channel讀資料的操作形式為:
<-ch// 從ch中讀取一個值 val = <-ch val := <-ch// 從ch中讀取一個值並儲存到val變數中 val,ok = <-ch// 從ch讀取一個值,判斷是否讀取成功,如果成功則儲存到val變數中
其實很簡單,當ch出現在 <-
的左邊表示send,當ch出現在 <-
的右邊表示recv。
例如:
package main import ( "fmt" "time" ) func main() { ch := make(chan string) go sender(ch)// sender goroutine go recver(ch)// recver goroutine time.Sleep(1e9) } func sender(ch chan string) { ch <- "malongshuai" ch <- "gaoxiaofang" ch <- "wugui" ch <- "tuner" } func recver(ch chan string) { var recv string for { recv = <-ch fmt.Println(recv) } }
輸出結果:
malongshuai gaoxiaofang wugui tuner
上面激活了一個goroutine用於執行sender()函式,該函式每次向channel ch中傳送一個字串。同時還激活了另一個goroutine用於執行recver()函式,該函式每次從channel ch中讀取一個字串。
注意上面的 recv = <-ch
,當channel中沒有資料可讀時,recver goroutine將會阻塞在此行。由於recver中讀取channel的操作放在了無限for迴圈中,表示recver goroutine將一直阻塞,直到從channel ch中讀取到資料,讀取到資料後進入下一輪迴圈由被阻塞在 recv = <-ch
上。直到main中的time.Sleep()指定的時間到了,main程式終止,所有的goroutine將全部被強制終止。
因為receiver要不斷從channel中讀取可能存在的資料,所以 receiver一般都使用一個無限迴圈來讀取channel ,避免sender傳送的資料被丟棄。
channel的屬性和分類
每個channel都有3種操作:send、receive和close
- send:表示sender端的goroutine向channel中投放資料
- receive:表示receiver端的goroutine從channel中讀取資料
- close:表示關閉channel
- 關閉channel後,send操作將導致painc
- 關閉channel後,recv操作將返回對應型別的0值以及一個狀態碼false
- close並非強制需要使用close(ch)來關閉channel,在某些時候可以自動被關閉
- 如果使用close(),建議條件允許的情況下加上defer
- 只在sender端上顯式使用close()關閉channel。因為關閉通道意味著沒有資料再需要傳送
例如,判斷channel是否被關閉:
val, ok := <-counter if ok { fmt.Println(val) }
channel分為兩種:unbuffered channel和buffered channel
- unbuffered channel:阻塞、同步模式
- sender端向channel中send一個數據,然後阻塞,直到receiver端將此資料receive
- receiver端一直阻塞,直到sender端向channel傳送了一個數據
- buffered channel:非阻塞、非同步模式
- sender端可以向channel中send多個數據(只要channel容量未滿),容量滿之前不會阻塞
- receiver端按照佇列的方式(FIFO,先進先出)從buffered channel中按序receive其中資料
buffered channel有兩個屬性:容量和長度:和slice的capacity和length的概念是一樣的
- capacity:表示bufffered channel最多可以緩衝多少個數據
- length:表示buffered channel當前已緩衝多少個數據
- 建立buffered channel的方式為
make(chan TYPE,CAP)
unbuffered channel可以認為是容量為0的buffered channel,所以每傳送一個數據就被阻塞。注意,不是容量為1的buffered channel,因為容量為1的channel,是在channel中已有一個數據,併發送第二個資料的時候才被阻塞。
換句話說,send被阻塞的時候,其實是沒有傳送成功的,只有被另一端讀走一個數據之後才算是send成功。對於unbuffered channel來說,這是send/recv的同步模式。
實際上,當向一個channel進行send的時候,先關閉了channel,再讀取channel時會發現錯誤在send,而不是recv。它會提示向已經關閉了的channel傳送資料。
func main() { counter := make(chan int) go func() { counter <- 32 }() close(counter) fmt.Println(<-counter) }
輸出報錯:
panic: send on closed channel
所以,在Go的內部行為中,send和recv是一個整體行為,資料未讀就表示未send成功。
死鎖(deadlock)
當channel的某一端(sender/receiver)期待另一端的(receiver/sender)操作,另一端正好在期待本端的操作時,也就是說兩端都因為對方而使得自己當前處於阻塞狀態,這時將會出現死鎖問題。
比如,在main函式中,它有一個預設的goroutine,如果在此goroutine中建立一個unbuffered channel,並在main goroutine中向此channel中傳送資料並直接receive資料,將會出現死鎖:
package main import ( "fmt" ) func main (){ goo(32) } func goo(s int) { counter := make(chan int) counter <- s fmt.Println(<-counter) }
在上面的示例中,向unbuffered channel中send資料的操作 counter <- s
是在main goroutine中進行的,從此channel中recv的操作 <-counter
也是在main goroutine中進行的。send的時候會直接阻塞main goroutine,使得recv操作無法被執行,go將探測到此問題,並報錯:
fatal error: all goroutines are asleep - deadlock! goroutine 1 [chan send]:
要修復此問題,只需將send操作放在另一個goroutine中執行即可:
package main import ( "fmt" ) func main() { goo(32) } func goo(s int) { counter := make(chan int) go func() { counter <- s }() fmt.Println(<-counter) }
或者,將counter設定為一個容量為1的buffered channel:
counter := make(chan int,1)
這樣放完一個數據後send不會阻塞(被recv之前放第二個資料才會阻塞),可以執行到recv操作。
unbuffered channel同步通訊示例
下面通過sync.WaitGroup型別來等待程式的結束,分析多個goroutine之間通訊時狀態的轉換。因為建立的channel是unbuffered型別的,所以send和recv都是阻塞的。
package main import ( "fmt" "sync" ) // wg用於等待程式執行完成 var wg sync.WaitGroup func main() { count := make(chan int) // 增加兩個待等待的goroutines wg.Add(2) fmt.Println("Start Goroutines") // 啟用一個goroutine,label:"Goroutine-1" go printCounts("Goroutine-1", count) // 啟用另一個goroutine,label:"Goroutine-2" go printCounts("Goroutine-2", count) fmt.Println("Communication of channel begins") // 向channel中傳送初始資料 count <- 1 // 等待goroutines都執行完成 fmt.Println("Waiting To Finish") wg.Wait() fmt.Println("\nTerminating the Program") } func printCounts(label string, count chan int) { // goroutine執行完成時,wg的計數器減1 defer wg.Done() for { // 從channel中接收資料 // 如果無資料可recv,則goroutine阻塞在此 val, ok := <-count if !ok { fmt.Println("Channel was closed:",label) return } fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) // Close the channel close(count) return } // 輸出接收到的資料後,加1,並重新將其send到channel中 val++ count <- val } }
上面的程式中,激活了兩個goroutine,啟用這兩個goroutine後,向channel中傳送一個初始資料值1,然後main goroutine將因為wg.Wait()等待2個goroutine都執行完成而被阻塞。
再看這兩個goroutine,這兩個goroutine執行完全一樣的函式程式碼,它們都接收count這個channel的資料,但可能是goroutine1先接收到channel中的初始值1,也可能是goroutine2先接收到初始值1。接收到資料後輸出值,並在輸出後對資料加1,然後將加1後的資料再次send到channel,每次send都會將自己這個goroutine阻塞(因為unbuffered channel),此時另一個goroutine因為等待recv而執行。當加1後傳送給channel的資料為10之後,某goroutine將關閉count channel,該goroutine將退出,wg的計數器減1,另一個goroutine因等待recv而阻塞的狀態將因為channel的關閉而失敗,ok狀態碼將讓該goroutine退出,於是wg的計數器減為0,main goroutine因為wg.Wait()而繼續執行後面的程式碼。
使用for range迭代channel
前面都是在for無限迴圈中讀取channel中的資料,但也可以使用range來迭代channel,它會返回每次迭代過程中所讀取的資料,直到channel被關閉。
例如,將上面示例中的printCounts()改為for-range的迴圈形式。
func printCounts(label string, count chan int) { defer wg.Done() for val := range count { fmt.Printf("Count: %d received from %s \n", val, label) if val == 10 { fmt.Printf("Channel Closed from %s \n", label) close(count) return } val++ count <- val } }
多個"管道":輸出作為輸入
channel是goroutine與goroutine之間通訊的基礎,一邊產生資料放進channel,另一邊從channel讀取放進來的資料。可以藉此實現多個goroutine之間的資料交換,例如 goroutine_1->goroutine_2->goroutine_3
,就像bash的管道一樣,上一個命令的輸出可以不斷傳遞給下一個命令的輸入,只不過golang藉助channel可以在多個goroutine(如函式的執行)之間傳,而bash是在命令之間傳。
以下是一個示例,第一個函式getRandNum()用於生成隨機整數,並將生成的整數放進第一個channel ch1中,第二個函式addRandNum()用於接收ch1中的資料(來自第一個函式),將其輸出,然後對接收的值加1後放進第二個channel ch2中,第三個函式printRes接收ch2中的資料並將其輸出。
如果將函式認為是Linux的命令,則類似於下面的命令列:ch1相當於第一個管道,ch2相當於第二個管道
getRandNum | addRandNum | printRes
以下是程式碼部分:
package main import ( "fmt" "math/rand" "sync" ) var wg sync.WaitGroup func main() { wg.Add(3) // 建立兩個channel ch1 := make(chan int) ch2 := make(chan int) // 3個goroutine並行 go getRandNum(ch1) go addRandNum(ch1, ch2) go printRes(ch2) wg.Wait() } func getRandNum(out chan int) { // defer the wg.Done() defer wg.Done() var random int // 總共生成10個隨機數 for i := 0; i < 10; i++ { // 生成[0,30)之間的隨機整數並放進channel out random = rand.Intn(30) out <- random } close(out) } func addRandNum(in,out chan int) { defer wg.Done() for v := range in { // 輸出從第一個channel中讀取到的資料 // 並將值+1後放進第二個channel中 fmt.Println("before +1:",v) out <- (v + 1) } close(out) } func printRes(in chan int){ defer wg.Done() for v := range in { fmt.Println("after +1:",v) } }
指定channel的方向
上面通過兩個channel將3個goroutine連線起來,其中起連線作用的是第二個函式addRandNum()。在這個函式中使用了兩個channel作為引數:一個channel用於接收、一個channel用於傳送。
其實channel類的引數變數可以指定資料流向:
-
in <-chan int
:表示channel in通道只用於接收資料 -
out chan<- int
:表示channel out通道只用於傳送資料
只用於接收資料的通道 <-chan
不可被關閉,因為關閉通道是針對傳送資料而言的,表示無資料再需傳送。對於recv來說,關閉通道是沒有意義的。
所以,上面示例中三個函式可改寫為:
func getRandNum(out chan<- int) { ... } func addRandNum(in <-chan int, out chan<- int) { ... } func printRes(in <-chan int){ ... }
buffered channel非同步佇列請求示例
下面是使用buffered channel實現非同步處理請求的示例。
在此示例中:
- 有(最多)3個worker,每個worker是一個goroutine,它們有worker ID。
- 每個worker都從一個buffered channel中取出待執行的任務,每個任務是一個struct結構,包含了任務id(JobID),當前任務的佇列號(ID)以及任務的狀態(worker是否執行完成該任務)。
- 在main goroutine中將每個任務struct傳送到buffered channel中,這個buffered channel的容量為10,也就是最多隻允許10個任務進行排隊。
- worker每次取出任務後,輸出任務號,然後執行任務(run),最後輸出任務id已完成。
- 每個worker執行任務的方式很簡單:隨機睡眠0-1秒鐘,並將任務標記為完成。
以下是程式碼部分:
package main import ( "fmt" "math/rand" "sync" "time" ) type Task struct { IDint JobIDint Statusstring CreateTime time.Time } func (t *Task) run() { sleep := rand.Intn(1000) time.Sleep(time.Duration(sleep) * time.Millisecond) t.Status = "Completed" } var wg sync.WaitGroup // worker的數量,即使用多少goroutine執行任務 const workerNum = 3 func main() { wg.Add(workerNum) // 建立容量為10的buffered channel taskQueue := make(chan *Task, 10) // 啟用goroutine,執行任務 for workID := 0; workID <= workerNum; workID++ { go worker(taskQueue, workID) } // 將待執行任務放進buffered channel,共15個任務 for i := 1; i <= 15; i++ { taskQueue <- &Task{ ID:i, JobID:100 + i, CreateTime: time.Now(), } } close(taskQueue) wg.Wait() } // 從buffered channel中讀取任務,並執行任務 func worker(in <-chan *Task, workID int) { defer wg.Done() for v := range in { fmt.Printf("Worker%d: recv a request: TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) v.run() fmt.Printf("Worker%d: Completed for TaskID:%d, JobID:%d\n", workID, v.ID, v.JobID) } }
select多路監聽
很多時候想要同時操作多個channel,比如從ch1、ch2讀資料。Go提供了一個select語句塊,它像switch一樣工作,裡面放一些case語句塊,用來輪詢每個case語句塊的send或recv情況。
select
用法格式示例:
select { // ch1有資料時,讀取到v1變數中 case v1 := <-ch1: ... // ch2有資料時,讀取到v2變數中 case v2 := <-ch2: ... // 所有case都不滿足條件時,執行default default: ... }
defalut語句是可選的,不允許fall through行為,但允許case語句塊為空塊。select會被return、break關鍵字中斷。
select的行為模式主要是對channel是否可讀進行輪詢,但也可以用來向channel傳送資料。它的行為如下:
- 如果所有的case語句塊都被阻塞,則阻塞直到某個語句塊可以被處理
- 如果多個case同時滿足條件,則 隨機選擇 一個進行處理
- 如果存在default且其它case都不滿足條件,則執行default。所以default必須要可執行而不能阻塞
需要注意的是, 如果在select中執行send操作,則可能會永遠被send阻塞。所以,在使用send的時候,應該也使用defalut語句塊,保證send不會被阻塞 。
一般來說,select會放在一個無限迴圈語句中,一直輪詢channel的可讀事件。
下面是一個示例,pump1()和pump2()都用於產生資料(一個產生偶數,一個產生奇數),並將資料分別放進ch1和ch2兩個通道,suck()則從ch1和ch2中讀取資料。然後在無限迴圈中使用select輪詢這兩個通道是否可讀,最後main goroutine在1秒後強制中斷所有goroutine。
package main import ( "fmt" "time" ) func main() { ch1 := make(chan int) ch2 := make(chan int) go pump1(ch1) go pump2(ch2) go suck(ch1, ch2) time.Sleep(1e9) } func pump1(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 0 { ch <- i } } } func pump2(ch chan int) { for i := 0; i <= 30; i++ { if i%2 == 1 { ch <- i } } } func suck(ch1 chan int, ch2 chan int) { for { select { case v := <-ch1: fmt.Printf("Recv on ch1: %d\n", v) case v := <-ch2: fmt.Printf("Recv on ch2: %d\n", v) } } }