1. 程式人生 > >Go語言常用的併發模式(上)

Go語言常用的併發模式(上)

Confinement

該模式用於處理資料限制問題,類似於生產者和消費者模式。使用channel的方式通過共享資訊的方式進行。有一個協程專門負責生產,另外一個協程負責接收資料。程式碼中使用隨機的時間模擬實際情況中耗時部分。

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    chanOwner := func(n int) <-chan int {
        results := make(chan int, 5)
        go func() {
            defer
close(results) for i := 0; i < n; i++ { results <- i fmt.Printf("produce %d\n", i) t := rand.Intn(2000) time.Sleep(time.Duration(t) * time.Millisecond) // 隨機睡眠0-2秒 } }() return results } consumer := func
(results <-chan int) { for result := range results { t := rand.Intn(2000) time.Sleep(time.Duration(t) * time.Millisecond) // 隨機睡眠0-2秒 fmt.Printf("Received %d\n", result) } fmt.Println("Done receiving") } results := chanOwner(5) consumer(results) } /* 程式碼輸出: produce 0 produce 1 Received 0 produce 2 Received 1 produce 3 produce 4 Received 2 Received 3 Received 4 Done receiving */

這種模式可以根據實際情況定義生產和消費的方式,不用擔心出現數據競爭的問題。

for-select迴圈

最常規的模式:

for { // 死迴圈
    select {
    // 在這裡採取有關操作
    }
}

通過迭代的方式把資料寫入channel

for _, s := range []string{"a", "b", "c"} {
    select {
    case <-done:  // 這裡是完成條件的標記
        return
    case stringStream <- s:  // 在這裡寫入資料
    }
}

死迴圈等待結束標記。
這種方式會盡可能早的結束工作,只要done訊號到達,立刻終止:

for {
    select {
    case <-done:
        return
    default:
    }
    // do something here
}

另一個等價方式

for {
    select {
    case <-done:
        return
    default:
        // do something here
    }
}

防止goroutine洩露

儘管goroutine是一種輕量級的程序,而且一般不必擔心使用太多的協程導致記憶體的問題,Go語言的有自動回收機制;但是,在某些情況下確實需要考慮出現某些協程一直無法回收的問題,這可還會引發一些其它的不良後果,給出下面的例子:

doWork := func(strings <-chan string) <-chan interface{} {
    completed := make(chan interface{})
    go func() {
        defer fmt.Println("doWork exited")
        defer close(completed)
        for s := range strings {
            // Do something
            fmt.Println(s)
        }
    }()
    return completed
}
doWork(nil)
fmt.Println("Done")

在上述的程式碼中,doWork會永遠阻塞,因為空的string channel不會有任何內容輸出。本例子中的開銷可能很小,但是在實際的工程中,這可能會引發大的問題。解決方法是通過父協程結束子協程。通過父協程給子協程發射終止的訊號,使得子協程自動終止。

給出一般的操作方式:

   doWork := func(
        done <-chan interface{},  // 終止的訊號
        strings <-chan string,    // 等待讀取的字串
    ) <-chan interface{} {        // 子協程返回自己終止的訊號
        terminated := make(chan interface{})
        go func() {
            defer fmt.Println("doWork exited")
            defer close(terminated)
            for {
                select {
                case s := <-strings:
                    // do something
                    fmt.Println(s)
                case <-done:  // 如果關閉,則直接執行return
                    return
                }
            }
        }()
        return terminated
    }

    done := make(chan interface{})
    terminated := doWork(done, nil)  // 接收子協程的終止訊號
    go func() {
        // cancel the operation after 1 second
        time.Sleep(time.Second)
        fmt.Println("Canceling doWork goroutine...")
        close(done)  // 關閉後相當於不存在阻塞的情況了。。。
    }()
    <-terminated  // 在這裡等待子協程的終止
    fmt.Println("Done.")

上述程式碼是讀資料的例子,下面給出寫資料時發生協程洩露的例子:

package main

import (
    "fmt"
    "math/rand"
)

func main() {
    newRandStream := func() <-chan int {
        randStream := make(chan int)
        go func() {
            defer fmt.Println("newRandStream closure exited.")
            defer close(randStream)
            for {
                randStream <- rand.Int()
            }
        }()
        return randStream
    }

    randStream := newRandStream()
    n := 3
    fmt.Printf("%d random ints:\n", n)
    for i := 0; i < n; i++ {
        fmt.Printf("%d: %d\n", i, <-randStream)  // 注意這種使用方式,也是合法的
    }
}
/*
輸出結果:
3 random ints:
0: 5577006791947779410
1: 8674665223082153551
2: 6129484611666145821
*/

上述程式碼中,randStream始終沒有結束,出現了協程洩露。。

改進方案:和寫資料的方式類似,通過父協程給子協程發射結束訊號即可。程式碼方案:

package main

import (
    "fmt"
    "math/rand"
    "time"
)

func main() {
    newRandStream := func(done <-chan interface{}) <-chan int {
        randStream := make(chan int)
        go func() {
            defer fmt.Println("newRandStream closure exited...")
            defer close(randStream)
            for {
                select {
                case randStream <- rand.Int():
                case <-done:
                    return
                }
            }
        }()
        return randStream
    }

    done := make(chan interface{})
    randStream := newRandStream(done)
    n := 3
    fmt.Printf("%d random ints\n", n)
    for i := 0; i < 3; i++ {
        fmt.Printf("%d:%d\n", n, <-randStream)
    }
    close(done)
    // 等待同步看效果,不用等待也可以正常結束的,這裡僅僅是為了顯式說明一下
    time.Sleep(time.Second)
}
/*
輸出結果:
3 random ints
0:5577006791947779410
1:8674665223082153551
2:6129484611666145821
newRandStream closure exited...
*/

or-channel方式

這種模式的方式是:把多個channeldone連線到一個done上,如果這些channel中任何至少一個關閉,則關閉這個done。 程式碼中,如果出現一個任意一個協程結束,那麼就出現終止訊號。終止訊號出現後,如果有協程沒有結束,他們會繼續執行,程式碼只是檢測是否有協程終止,而不主動結束協程。
給出例項程式碼:

package main

import (
    "fmt"
    "time"
)

func main() {
    // 從這裡傳入各個channel的done
    var or func(channels ...<-chan interface{}) <-chan interface{}
    or = func(channels ...<-chan interface{}) <-chan interface{} {
        switch (len(channels)) {
        case 0: // 遞迴結束的條件
            return nil
        case 1: // 只有一個直接返回
            return channels[0]
        }
        orDone := make(chan interface{}) // 這是自己的標記
        // 可以理解成一棵協程樹,父節點需要孩子節點結束才能銷燬。。。
        // 在這裡進行協程孩子節點的拓展,WTF好難理解。。。。。
        // 在匿名函式中,如果一個channel的任何一個子channel結束,那麼匿名函式的阻塞就會立刻結束,
        // 之後會執行內部的defer操作,然後return一個關閉了的channel,相當於解除阻塞
        go func() {
            defer close(orDone) // 結束的時候釋放本身的done訊號
            switch len(channels) {
            case 2:
                select {
                case <-channels[0]:
                case <-channels[1]:
                }
            default:
                select {
                // 如果case失敗,則進行default,需要再判斷一下,防止此次突然有結束的訊號了
                case <-channels[0]:
                case <-channels[1]:
                case <-channels[2]:
                // 在這裡追加父節點的協程終止訊號,因為這是一棵或的樹,只要有一個節點成功就可以釋放掉
                // 因此把父節點一起傳入,只要有一個釋放掉,父節點的channel就立刻進行釋放......好機智的操作
                // 這裡追加自己的orDone,是為了`
                case <-or(append(channels[3:], orDone)...): // 注意使用...符號
                }
            }
        }()
        return orDone
    }

    sig := func(after time.Duration) <-chan interface{} {
        c := make(chan interface{})  
        go func() {
            defer close(c) // 所在的goroutine結束後close,使用時間模擬工作時間
            time.Sleep(after)
        }()
        return c
    }

    start := time.Now()
    <-or(
        sig(2*time.Hour),
        sig(5*time.Minute),
        sig(1*time.Second),
        sig(1*time.Hour),
        sig(1*time.Minute),
    )
    fmt.Printf("done after %v\n", time.Since(start))
}
/*
輸出結果:
done after 1.000182106s
*/

從上述看出,僅僅執行到結果最短的那個,相當於一個“或”操作。程式碼採用了尾遞迴的方式,因為select方式無法預判channel的數量,而迴圈的方式需要處理大量的阻塞問題,不如尾遞迴的方式簡潔。

上述程式碼最後的遞迴中,有一個地方不太理解:程式碼遞迴的過程中為什麼要加入orDone?希望有明白的同學可以解釋一下!

相關推薦

區塊鏈技術語言二十九Go語言常用工具包

原文連結:區塊鏈技術語言(二十九)—Go語言常用工具包(上) 常用工具包分為兩節內容。本節介紹格式化輸入輸出和對字串處理的常用工具包和函式;下節介紹JSON處理和對文字的幾種操作。   一、格式化輸入輸出 fmt包提供了格式化的輸入和輸出的操作。 1.1

Go語言常用併發模式

Confinement 該模式用於處理資料限制問題,類似於生產者和消費者模式。使用channel的方式通過共享資訊的方式進行。有一個協程專門負責生產,另外一個協程負責接收資料。程式碼中使用隨機的時間模擬實際情況中耗時部分。 package main i

區塊鏈技術基礎語言三十Go語言常用工具包

原文連結:區塊鏈技術基礎語言(三十):Go語言常用工具包(下) 一、JSON處理 JSON(JavaScript Object Notation)是一種輕量級的資料交換格式,方便人們閱讀和編寫,也方便程式地解析和生成。雖然JSON是JavaScript的子集,但其格式完全獨立於程式語言,表現

go語言實現設計模式:策略模式

策略模式定義了演算法家族,在呼叫演算法家族的時候不感知演算法的變化,客戶也不會受到影響。 下面用《大話設計模式》中的一個例項進行改寫。 例:超市中經常進行促銷活動,促銷活動的促銷方法就是一個個

go語言實現設計模式:簡易工廠

簡易工廠主要是用來解決物件“建立”的問題。以下的例子取自《大話設計模式》中第一章,實現一個可擴充套件的“計算器”。當增加新的功能時,並不需改動原來已經實現的演算法。由於是簡易工廠,所以我們還是需要對工廠類進行相應修改。 1.首先,我們定義一個計算的介面

【鏈塊技術35期】區塊鏈技術語言——Go語言併發程式設計

併發程式設計分為上、下兩節。這一節包括了併發程式設計的概述、goroutine和channel的部分內容。 一、概述 1.1 並行和併發 並行(parallel):在多個處理器上同時執行多條指令,如圖1所示。 併發(concurrency):同一時刻只有一條指令在

Windows Phone開發10常用控件

androi chm att size near grid txt idt inf Windows Phone的控件有幾個來源,和傳統的桌面應用程序開發或Web開發一樣,有默認提供的控件和第三方開者發布的控件。一般而言,如果不是過於復雜的界面布局,使用默認控件就足矣。相比之

跟我學設計模式視頻教程——管擦者模式,責任鏈模式

tar eight color font content 設計模式 name -m ack 課程視頻 觀察者模式(下) 責任鏈模式(上) 課程筆記 課程筆記 課程代碼 課程代碼 新課程火熱報名中 課程介紹

go 語言 基礎 類型1

必須 const 表達式 基本 基本類 image 關鍵字 字符串 函數 變量 使用關鍵字 var定義變量,自動初始化為0值。如果提供初始化值,可省略變量類型,由編譯器自動推斷。 在函數內部可以使用 := 方式定義變量 func main() { x := 123

查漏補缺:socket編程:TCP粘包問題和常用解決方案

原因 image 延遲確認 大小 style bsp 緩沖 ket 導致   1、TCP粘包問題的產生(發送端)   由於TCP協議是基於字節流並且無邊界的傳輸協議,因此很容易產生粘包問題。TCP的粘包可能發生在發送端,也可能發生在接收端。發送端的粘包是TCP協議本身引起的

java之SpringAOP前奏-動態代理設計模式

對象 .cn 分享圖片 賦值 alt his 編程習慣 輸出 style 我們常常會遇到這樣的事,項目經理讓你為一個功能類再加一個功能A,然後你加班為這個類加上了功能A; 過了兩天又來了新需求,再在A功能後面加上一個新功能B,你加班寫好了這個功能B,加在了A後面;又過

Go語言內存管理內存分配

特定 offset 閾值 簡化 思路 三種 私有ca 空間不足 ces Go語言內存管理(一)內存分配 golang作為一種“高級語言”,也提供了自己的內存管理機制。這樣一方面可以簡化編碼的流程,降低因內存使用導致出現問題的頻率(C語言使用者尤其是初學者應該深有體會),對程

C語言的printf()語句

C語言 編程入門  在前面幾篇博文中,大家會發現我使用了多次printf()這個語句,那麽今天我們就講一講如何使用printf()語句 。  首先,我們來看一下printf的基本格式: printf("輸出的文字"); printf("輸出格式",需要輸出的變量); &

python3+django2 開發易語言網絡驗證

程序 技術 paths client 默認值 格式 width django https 創作背景:   在某論壇中下載到一套php開發易語言網絡驗證的教程,照著看下來,花了兩天的時間,結果發現教程裏開發的網絡驗證,以及隨著教程一起給學員的源碼,都存在著根本用不了的bug!

觀察者模式和發布訂閱模式

nts 針對 處理 nds script 分享圖片 .data cto 這樣的 觀察者模式 定義:觀察者模式(Observer Pattern):定義對象間的一種一對多依賴關系,使得每當一個對象狀態發生改變時,其相關依賴對象皆得到通知並被自動更新。 其中有兩個定義需要明確,

C/S模式,發布/訂閱模式和PUSH/PULL模式

沒有 入庫 即使 分時 流程 https 分享圖片 內容 怎麽 CS模式(客戶端/服務器模式) 最場景的信息傳遞模式,也稱為Request/Response模式,或者調用模式。http/https協議即此模式。因為最常用所以大家一般都比較熟悉,這裏不重點講了,大家請看圖下圖

Java設計模式簡介:行為型模式

本章講到第三種設計模式——行為型模式,共11種:策略模式、模板方法模式、觀察者模式、迭代子模式、責任鏈模式、命令模式、備忘錄模式、狀態模式、訪問者模式、中介者模式、直譯器模式。 先來張圖,看看這11中模式的關係: 第一類:通過父類與子類的關係進行實現。第二類:兩個類之間。第三類:類的狀態。第

Go語言字串高效拼接

在我們程式設計的時候,和字串打交道是必不可少的,我們對資料庫裡文字的處理,Web文字的顯示,文字資料的儲存等都需要和字串打交道,那麼對於字串來說,查詢、拼接這些都是常用的操作,尤其是以拼接使用的比較多,比如把一個人的姓名和年齡拼接在一起顯示。 在Go語言(golang)中,對於字串的拼接處理有很多種方法,那

Go語言字串高效拼接

在上一篇關於字串拼接的文章 Go語言字串高效拼接(一) 中,我們演示的多種字串拼接的方式,並且使用一個例子來測試了他們的效能,通過對比發現,我們覺得效能高的Builder並未發揮出其應該的效能,反而+號拼接,甚至strings.Join方法的效能更優越,那麼這到底是什麼原因呢?今天我們開始解開他們神祕的面紗,

Go語言字串高效拼接

在上一篇關於字串拼接的文章Go語言字串高效拼接(二) 中,我們終於為Builder拼接正名了,果真不負眾望,尤其是拼接的字串越來越多時,其效能的優越性更加明顯。 在上一篇的結尾中,我留下懸念說其實還有優化的空間,這就是今天這篇文章,字串拼接系列的第三篇,也是字串拼接的最後一篇產生的原因,今天我們就看下如何再