1. 程式人生 > >併發並行與Go併發程式設計

併發並行與Go併發程式設計

併發與並行

  • 併發(concurrency) 併發的關注點在於任務切分。舉例來說,你是一個創業公司的CEO,開始只有你一個人,你一人分飾多角,一會做產品規劃,一會寫程式碼,一會見客戶,雖然你不能見客戶的同時寫程式碼,但由於你切分了任務,分配了時間片,表現出來好像是多個任務一起在執行。

  • 並行(parallelism) 並行的關注點在於同時執行。還是上面的例子,你發現你自己太忙了,時間分配不過來,於是請了工程師,產品經理,市場總監,各司一職,這時候多個任務可以同時執行了。

GreenThread

  • 使用者空間 首先是在使用者空間,避免核心態和使用者態的切換導致的成本。

  • 由語言或者框架層排程

  • 更小的棧空間允許建立大量例項(百萬級別)

幾個概念

  • Continuation 這個概念不熟悉 FP 程式設計的人可能不太熟悉,不過這裡可以簡單的顧名思義,可以理解為讓我們的程式可以暫停,然後下次呼叫繼續(contine)從上次暫停的地方開始的一種機制。相當於程式呼叫多了一種入口。

  • Coroutine 是 Continuation 的一種實現,一般表現為語言層面的元件或者類庫。主要提供 yield,resume 機制。

  • Fiber 和 Coroutine 其實是一體兩面的,主要是從系統層面描述,可以理解成 Coroutine 執行之後的東西就是 Fiber。

Goroutine

Goroutine 其實就是前面 GreenThread 系列解決方案的一種演進和實現。

  • 首先,它內建了 Coroutine 機制。因為要使用者態的排程,必須有可以讓程式碼片段可以暫停/繼續的機制。

  • 其次,它內建了一個排程器,實現了 Coroutine 的多執行緒並行排程,同時通過對網路等庫的封裝,對使用者遮蔽了排程細節。

  • 最後,提供了 Channel 機制,用於 Goroutine 之間通訊,實現 CSP 併發模型(Communicating Sequential Processes)。因為 Go 的 Channel 是通過語法關鍵詞提供的,對使用者遮蔽了許多細節。其實 Go 的 Channel 和 Java 中的 SynchronousQueue 是一樣的機制,如果有 buffer 其實就是 ArrayBlockQueue。

Goroutine 排程器

這個圖一般講 Goroutine 排程器的地方都會引用,想要仔細瞭解的可以看看原部落格(小編:點選閱讀原文獲取)。這裡只說明幾點:

  1. M 代表系統執行緒,P 代表處理器(核),G 代表 Goroutine。Go 實現了 M : N 的排程,也就是說執行緒和 Goroutine 之間是多對多的關係。這點在許多GreenThread / Coroutine 的排程器並沒有實現。比如 Java 1.1 版本之前的執行緒其實是 GreenThread(這個詞就來源於 Java),但由於沒實現多對多的排程,也就是沒有真正實現並行,發揮不了多核的優勢,所以後來改成基於系統核心的 Thread 實現了。

  2. 某個系統執行緒如果被阻塞,排列在該執行緒上的 Goroutine 會被遷移。當然還有其他機制,比如 M 空閒了,如果全域性佇列沒有任務,可能會從其他 M 偷任務執行,相當於一種 rebalance 機制。這裡不再細說,有需要看專門的分析文章。

  3. 具體的實現策略和我們前面分析的機制類似。系統啟動時,會啟動一個獨立的後臺執行緒(不在 Goroutine 的排程執行緒池裡),啟動 netpoll 的輪詢。當有 Goroutine 發起網路請求時,網路庫會將 fd(檔案描述符)和 pollDesc(用於描述 netpoll 的結構體,包含因為讀 / 寫這個 fd 而阻塞的 Goroutine)關聯起來,然後呼叫 runtime.gopark 方法,掛起當前的 Goroutine。當後臺的 netpoll 輪詢獲取到 epoll(Linux 環境下)的 event,會將 event 中的 pollDesc 取出來,找到關聯的阻塞 Goroutine,並進行恢復。

Goroutine 是銀彈麼?

Goroutine 很大程度上降低了併發的開發成本,是不是我們所有需要併發的地方直接 go func 就搞定了呢?

Go 通過 Goroutine 的排程解決了 CPU 利用率的問題。但遇到其他的瓶頸資源如何處理?比如帶鎖的共享資源,比如資料庫連線等。網際網路線上應用場景下,如果每個請求都扔到一個 Goroutine 裡,當資源出現瓶頸的時候,會導致大量的 Goroutine 阻塞,最後使用者請求超時。這時候就需要用 Goroutine 池來進行控流,同時問題又來了:池子裡設定多少個 Goroutine 合適?

所以這個問題還是沒有從更本上解決。

go沒有嚴格的內建的logical processor數量限制,但是go的runtime預設限制了每個program最多使用10,000個執行緒,可以通過SetMaxThreads修改.下圖展示了Concurrency和Parallelism的區別 

goroutine使用

go塊

go的用法很簡單,如下. 如果沒有最外面的括號{}(),會顯示go塊必須是一個函式呼叫.沒有()只是一個函式的宣告,有了()是一個呼叫(沒有引數的)

go func() {
  for _,n := range nums {
    out <- n
  }
  close(out)
}()

channel

channel預設上是阻塞的,也就是說,如果Channel滿了,就阻塞寫,如果Channel空了,就阻塞讀。於是,我們就可以使用這種特性來同步我們的傳送和接收端。

channel <-,傳送一個新的值到通道中 <-channel,從通道中接收一個值,這個更像有兩層含義,一個是會返回一個結果,當做賦值來用:msg := <-channel;另外一個含義是等待這個channel傳送訊息,所以還有一個等的含義在.所以如果你直接寫fmt.Print(<-channel)本意只是想輸出下這個chan傳來的值,但是其實他還會阻塞住等著channel來發.

預設傳送和接收操作是阻塞的,直到傳送方和接收方都準備完畢。

funcmain() {
    messages := make(chan string)
    go func() { messages <- "ping" }()
    msg := <-messages
    fmt.Println(msg)
}

所以你要是這麼寫:是一輩子都不會執行到print的(會死鎖)

func main() {
    messages := make(chan string)
    messages <- "ping"
    msg := <-messages
    fmt.Println(msg)
}

所以在一個go程中,傳送messages <- "msg"channel的時候,要格外小心,不然一不留神就死鎖了.(解決方法:1. 用帶快取的chan; 2. 使用帶有default的select傳送)

select {
case messages <- "msg":
    fmt.Println("sent message")
default:
    fmt.Println("no message sent")
}

range

用於channel的range是阻塞的.下面程式會顯示deadloc,去掉註釋就好了.

queue := make(chan string, 2)
//queue <- "one"
//queue <- "two"
//close(queue)
for elem := range queue {
  fmt.Println(elem)
}

通道緩衝

加了快取之後,就像你向channel傳送訊息的時候(message <- "ping"),"ping"就已經發送出去了(到快取).就像一個非同步的佇列?到時候,<-message直接從快取中取值就好了(非同步...)

但是你要這麼寫,利用通道緩衝,就可以.無緩衝的意味著只有在對應的接收(<-chan)通道準備好接收時,才允許傳送(chan <-),可快取通道允許在沒有對應接收方的情況下,快取限定數量的值。

func main() {
  message := make(chan string,1)
  message <- "ping"
  msg := <-message
  fmt.Print(msg)
}

要是多發一個messages <- "channel",fatal error: all goroutines are asleep - deadlock!,要是多接受一個fmt.Println(<-messages),會打印出buffered channel,然後報同樣的error

func main() {
    messages := make(chan string, 2)
    messages <- "buffered"
    messages <- "channel"
    fmt.Println(<-messages)
    fmt.Println(<-messages)
}

通道同步

使用通道同步,如果你把 <- done 這行程式碼從程式中移除,程式甚至會在 worker還沒開始執行時就結束了。

funcworker(done chanbool) {
    fmt.Print("working...")
    time.Sleep(time.Second) // working
    fmt.Println("done")
    done <- true
}
funcmain() {
    done := make(chan bool, 1)
    go worker(done)
    <-done //blocking 阻塞在這裡,知道worker執行完畢
}

傳送方向

可以指定這個通道是不是隻用來發送或者接收值。這個特性提升了程式的型別安全性。pong 函式允許通道(pings)來接收資料,另一通道(pongs)來發送資料。

funcping(pings chan<- string, msg string) {
    pings <- msg
}

funcpong(pings <-chanstring, pongs chan<- string) {
    msg := <-pings
    pongs <- msg
}

funcmain() {
    pings := make(chan string, 1)
    pongs := make(chan string, 1)
    ping(pings, "passed message")
    pong(pings, pongs)
    fmt.Println(<-pongs)
}

select

Go 的select 讓你可以同時等待多個通道操作。(poll/epoll?) 注意select 要麼寫個死迴圈用超時,要不就定好次數.或者加上default讓select變成非阻塞的

go func() {
    time.Sleep(time.Second * 1)
    c1 <- "one"
}()

go func() {
    time.Sleep(time.Second * 2)
    c2 <- "two"
}()

for i := 0; i < 2; i++ {
    select {
    case msg1 := <-c1:
        fmt.Println("received", msg1)
    case msg2 := <-c2:
        fmt.Println("received", msg2)
    }
}

超時處理

其中time.After返回<-chan Time,直接向select傳送訊息

select {
case res := <-c1:
    fmt.Println(res)
case <-time.After(time.Second * 1):
    fmt.Println("timeout 1")
}

非阻塞通道操作

default,當監聽的channel都沒有準備好的時候,預設執行的.

select {
case msg := <-messages:
    fmt.Println("received message", msg)
default:
    fmt.Println("no message received")
}

可以使用 select 語句來檢測 chan 是否已經滿了

ch := make (chan int, 1)
ch <- 1
select {
case ch <- 2:
default:
    fmt.Println("channel is full !")
}

通道關閉

一個非空的通道也是可以關閉的,但是通道中剩下的值仍然可以被接收到

queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
    fmt.Println(elem)
}

定時器

在未來某一刻執行一次時使用的

定時器表示在未來某一時刻的獨立事件。你告訴定時器需要等待的時間,然後它將提供一個用於通知的通道。可以顯示的關閉

timer1 := time.NewTimer(time.Second * 2)
<-timer1.C

<-timer1.C 直到這個定時器的通道 C 明確的傳送了定時器失效的值(2s)之前,將一直阻塞。如果你只是要單純的等待用time.Sleep,定時器是可以在它失效之前把它給取消的stop2 := timer2.Stop()

打點器

當你想要在固定的時間間隔重複執行,定時的執行,直到我們將它停止

funcmain() {
    //打點器和定時器的機制有點相似:一個通道用來發送資料。這裡我們在這個通道上使用內建的 range 來迭代值每隔500ms 傳送一次的值。
    ticker := time.NewTicker(time.Millisecond * 500)
    go func() {
        for t := range ticker.C {
            fmt.Println("Tick at", t)
        }
    }()

    //打點器可以和定時器一樣被停止。一旦一個打點停止了,將不能再從它的通道中接收到值。我們將在執行後 1600ms停止這個打點器。
    time.Sleep(time.Millisecond * 1600)
    ticker.Stop()
    fmt.Println("Ticker stopped")
}

生成器

類似於提供了一個服務,不過只是適用於呼叫不是很頻繁

funcrand_generator_2()chanint {
    out := make(chan int)
    go func() {
        for {
            out <- rand.Int()
        }
    }()
    return out
}

funcmain() {
    // 生成隨機數作為一個服務
    rand_service_handler := rand_generator_2()
    fmt.Printf("%dn", <-rand_service_handler)
}

多路複用

Apache使用處理每個連線都需要一個程序,所以其併發效能不是很好。而Nighx使用多路複用的技術,讓一個程序處理多個連線,所以併發效能比較好。

多路複用技術可以用來整合多個通道。提升效能和操作的便捷。

其實就是整合了多個上面的生成器

funcrand_generator_3()chanint {
    rand_generator_1 := rand_generator_2()
    rand_generator_2 := rand_generator_2()
    out := make(chan int)

    go func() {
        for {
            //讀取生成器1中的資料,整合
            out <- <-rand_generator_1
        }
    }()
    go func() {
        for {
            //讀取生成器2中的資料,整合
            out <- <-rand_generator_2
        }
    }()
    return out
}

Furture技術

可以在不準備好引數的情況下呼叫函式。函式呼叫和函式引數準備這兩個過程可以完全解耦。可以在呼叫的時候不關心資料是否準備好,返回值是否計算好的問題。讓程式中的元件在準備好資料的時候自動跑起來。 這個最後取得<-q.result也是可以放到execQuery上面的把

Furture技術可以和各個其他技術組合起來用。可以通過多路複用技術,監聽多個結果Channel,當有結果後,自動返回。也可以和生成器組合使用,生成器不斷生產資料,Furture技術逐個處理資料。Furture技術自身還可以首尾相連,形成一個併發的pipe filter。這個pipe filter可以用於讀寫資料流,操作資料流。

type query struct {
    sql chan string
    result chan string
}

funcexecQuery(q query) {
    go func() {
        sql := <-q.sql
        q.result <- "get " + sql
    }()

}

funcmain() {
    q := query{make(chan string, 1), make(chan string, 1)}
    execQuery(q)

    //準備引數
    q.sql <- "select * from table"
    fmt.Println(<-q.result)
}

Chain Filter技術

程式建立了10個Filter,每個分別過濾一個素數,所以可以輸出前10個素數。

funcGenerate(ch chan<- int) {
    for i := 2; ; i++ {
        ch <- i 
    }
}

funcFilter(in <-chanint, out chan<- int, prime int) {
    for {
        i := <-in // Receive value from 'in'.
        if i%prime != 0 {
            out <- i // Send 'i' to 'out'.
        }
    }
}

// The prime sieve: Daisy-chain Filter processes.
funcmain() {
    ch := make(chan int) // Create a new channel.
    go Generate(ch)      // Launch Generate goroutine.
    for i := 0; i < 10; i++ {
        prime := <-ch
        print(prime, "n")
        ch1 := make(chan int)
        go Filter(ch, ch1, prime)
        ch = ch1
    }
}

共享變數

有些時候使用共享變數可以讓程式碼更加簡潔

type sharded_var struct {
    reader chan int
    writer chan int
}

funcsharded_var_whachdog(v sharded_var) {//共享變數維護協程
    go func() {
        var value int = 0
        for { //監聽讀寫通道,完成服務
            select {
            case value = <-v.writer:
            case v.reader <- value:
            }
        }
    }()
}

funcmain() {
    v := sharded_var{make(chan int), make(chan int)} //初始化,並開始維護協程
    sharded_var_whachdog(v)

    fmt.Println(<-v.reader)
    v.writer <- 1
    fmt.Println(<-v.reader)
}

Concurrency patterns

下面介紹了一些常用的併發模式.

Runner

當你的程式會執行在後臺,可以是cron job或者是Iron.io這樣的worker-based雲環境.這個程式就可以監控和中斷你的程式,如果你的程式執行的太久了.

定義了三個channel來通知任務狀態.

  • interrupt:接收系統的終止訊號(比如ctrl-c),接收到之後系統就優雅的退出
  • complete:指示任務完成狀態或者返回錯誤
  • timeout:當超時了之後,系統就優雅的退出

tasks是一個函式型別的slice,你可以往裡面存放簽名為func funcName(id int){}的函式,作為你的任務.task(id)就是在執行任務了(當然只是用來模擬任務,可以定義一個任務介面來存放任務,此處是為了簡便). 注意tasks裡面的任務是序列執行的,這些任務的執行發生在一個單獨的goroutine中.

New方法裡的interrupt channel buffer設定為1,也就是說當用戶重複ctrl+c的時候,程式也只會收到一個訊號,其他的訊號會被丟棄.

在run()方法中,在開始執行任務前(task(id)),會前檢查執行流程有沒有被中斷(if r.gotInterrupt() {}),這裡用了一個帶default語句的select.一旦收到中斷的事件,程式就不再接受任何其他事件了(signal.Stop(r.interrupt)).

在Start()方法中,在go塊中執行run()方法,任何當前的goroutine會阻塞在select這邊,直到收到run()返回的complete channel或者超時返回.

// Runner runs a seto