1. 程式人生 > >Go併發模式:管道和取消

Go併發模式:管道和取消

 

WHY?

Go的併發原語可以輕鬆構建流資料流水線,從而有效利用I/O和多個CPU。

WHAT?

管道是一種資料結構,傳送方可以以字元流形式將資料送入該結構,接收方可以從該結構接收資料。

HOW?

Go中沒有正式的管道定義;但它是眾多併發程式中的一種,是通過通道(channel)連線的一系列階段,且每個階段是一組運行同一個函式的goroutine。在每個階段:goroutine們

1.通過輸入通道,從上游的獲取資料

2.在資料上執行一系列資料,通常會產生新值

3.通過輸出通道,將新產生的值送往下游

每個階段都擁有任意數量的輸入通道和輸出通道,除了第一個和最後一個階段,第一個階段只有輸出通道,最後一個階段只有輸入通道,第一個又被稱為來源或生產者,最後一個階段又被稱為槽或消費者。

DO:

在正式程式碼之前,可以先從HOW之中的敘述大致推理出函式的樣子。假設共有三個階段,則三個階段的函式形式類似:

func stage1() <-chan {}

func stage2(in <-chan) <-chan{}

func stage(in <-chan){}

這樣心裡就有數了。如果要求某些數的平方值:該如何運用Go管道的技術呢?

依上述虛擬碼嘗試寫一下。

首先第一階段,要有數字,然後要把這些數以管道的形輸出給下一階段使用:

func genx(nums...int) <-chan int {
	out := make(chan int)
	for n := range nums {
		out <- n
	}

	close(out)

	return out
}

觀察這個函式,符不符合上面的WHAT和HOW?

發現並不符合,HOW中要求每一個階段都是一組goroutine的組合,而這個第一階段並沒有goroutine。所以要用到Goroutine而要用到goroutine且goroutine要滿足從通過輸入通道從上游獲取值,通過輸出通道向下遊傳送值,而且要處理舊值得到新值,意思是所有一切都要在Goroutine中處理,不能直接在函式中處理,要在函式中重開goroutine處理。

改動第一個階段程式碼:

func genx(t *testing.T, nums...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}

		t.Logf("genx Closed")
		close(out)
	}()

	return out
}

為什麼把把close(out)放在goroutine中執行呢?因為向一個關閉的channel傳送資料會引起panic,而goroutine的執行時必 genx的執行順序慢的,所以如果放在外面,在把nums寫入out的時候就會報錯。

繼續第二階段,生成平方數:

func sqx(t *testing.T, in <-chan int) <-chan int{
	out := make(chan int)
	go func() {
		for n := range in {
			//t.Logf("NN=%v", n*n)
			out <- n * n
		}

		t.Logf("sqx Closed")
		close(out)
	}()

	return out
}

第三階段,消費者輸出:

func TestX(t *testing.T) {
	a := genx(4, 5, 6)
	b := sqx(t, a)
	go func() {
		for n := range b {
			t.Logf("^o^_%v", n)
		}
	}()
}

這樣對嗎?執行一下發現,沒有任何列印。這是為何?因為TestX執行在主goroutine中,一旦執行完畢程式立刻就退出了,並沒有給TestX中的goroutine執行的機會。而我們在TestX的goroutine中呼叫的是range而不是 <- b這樣的函式,不會引起函式的goroutine阻塞,直到有資料讀入才停止阻塞。引起阻塞就不會退出主routine,就能看到列印。

所以第三階段把程式碼修改為:

func TestX(t *testing.T) {
	a := genx(4, 5, 6)
	b := sqx(t, a)
	
	for n := range b {
		t.Logf("^o^_%v", n)
	}
}

這下可以了,打印出16, 25,36。

這樣貌似結束了。

但是還有一些問題。

如果輸入的Goroutine不止一個呢?

這裡牽涉到兩個概念。扇入和扇出。

扇出:多個功能可以從同一個Channel讀取,直到該通道關閉為止,提供了一種在一群工作者之間,分配工作給並行化CPU使用和IO的方法。

扇入:從多個輸入讀取,並繼續執行,直到一切都停止下來。當以下情況發生時。通過複用多個輸入的Channel到一個單獨的Channel。當所有的輸入都關閉的時候,這個Channel將會關閉(這個Channel關閉會導致一切【接收輸入,輸出等】都停止下來。)

扇出的程式碼:

func TestX1(t *testing.T) {
	in := genx(4, 5, 6)

	// 一個輸入點(扇形的頂點),有多引用(扇形的扇面),扇出
	c1 := sqx(t, in)
	c2 := sqx(t, in)
	
	for n := range mergex(c1, c2) {
		t.Logf("n=%v", n)
	}
}

扇入的程式碼:

// 扇入,很多輸入(扇面),匯聚到一個Channel(扇形頂點)
func mergex(t *testing.T, cs ... <-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)
	output := func(c <-chan int) {
		for n:= range c {
			out <- n
		}

		wg.Done()
	}

	wg.Add(len(cs))

	for _, c := range cs{
		go output(c)
	}

	go func() {
		wg.Wait()
		// when all the  are closed,
		t.Logf("mergex closed")
		close(out)
	}()

	return out
}

執行後TestX1的列印為:

  main_test.go:230: genx Closed
    main_test.go:245: sqx Closed
    main_test.go:245: sqx Closed
    main_test.go:282: n=16
    main_test.go:282: n=36
    main_test.go:282: n=25
    main_test.go:307: mergex closed

可以看到genx關閉了一次,sqx關閉兩次,最後關閉的是mergex。

當所有的輸入兩個sqx都關閉了之後,接下來mergex就關閉了。mergex關閉之後,一切都停止了。

 

突然停止:

我們的管道功能有一種模式:

  • 所有傳送操作完成後,階段關閉其出站通道。
  • 階段保持從入站通道接收值,直到這些通道關閉。

此模式允許將每個接收階段寫為range迴圈,並確保在所有值成功傳送到下游後所有goroutine都退出。

但在實際管道中,階段並不總是接收所有入站值。有時這是設計的:接收器可能只需要一個值的子集來取得進展。更常見的是,階段會提前退出,因為入站值表示較早階段的錯誤。在任何一種情況下,接收器都不必等待剩餘的值到達,並且我們希望早期階段停止產生後續階段不需要的值。

在我們的示例管道中,如果某個階段無法使用所有入站值,則嘗試傳送這些值的goroutine將無限期地阻塞:


    //從輸出中獲取第一個值。
    out:= merge(c1,c2)
    fmt.Println(< -  out)// 4或9 
    返回
    //由於我們沒有收到out的第二個值,
    //其中一個輸出goroutine掛起試圖傳送它。
}

這是資源洩漏:goroutines消耗記憶體和執行時資源,goroutine堆疊中的堆引用使資料不被垃圾收集。Goroutines不是垃圾收集; 他們必須自己退出。

即使下游階段未能收到所有入站值,我們也需要安排管道的上游階段退出。一種方法是將出站通道更改為具有緩衝區。緩衝區可以包含固定數量的值; 如果緩衝區中有空間,則立即傳送操作:

c:= make(chan int,2)//緩衝區大小為2
c < -  1 //立即成功
c < -  2 //立即成功
c < -  3 //阻塞,直到另一個goroutine <-c並收到1

當在通道建立時知道要傳送的值的數量時,緩衝區可以簡化程式碼。例如,我們可以重寫gen以將整數列表複製到緩衝通道中,並避免建立新的goroutine

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到我們管道中阻塞的goroutines,我們可能會考慮為返回的出站通道新增一個緩衝區merge

func merge(cs ... < -  chan int)<-chan int { 
    var wg sync.WaitGroup 
    out:= make(chan int,1)//足夠的空間用於未讀輸入
    // ...其餘部分保持不變。 ..

雖然這修復了此程式中阻塞的goroutine,但這是錯誤的程式碼。此處緩衝區大小為1的選擇取決於知道merge 將接收的值的數量以及下游階段將消耗的值的數量。這很脆弱:如果我們傳遞一個額外的值gen,或者如果下游階段讀取任何更少的值,我們將再次阻止goroutines。

上面這一小段沒有看懂!等會看下緩衝區Channel的用法,回頭再看。

相反,我們需要為下游階段提供一種方式,向發件人表明他們將停止接受輸入。

main決定退出而不接收所有值時 out,它必須告訴上游階段的goroutines放棄他們試圖傳送的值。它通過在名為的通道上傳送值來實現done。它傳送兩個值,因為可能有兩個阻塞的發件人:

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

傳送goroutines用一個select語句替換它們的傳送操作,該語句在傳送out時或從接收到的值時繼續done。值型別done是空結構,因為值無關緊要:它是指示out應該放棄傳送的接收事件。所述output夠程繼續迴圈在其入站通道,c,所以上游階段不被堵塞。(我們將在稍後討論如何讓這個迴圈儘早返回。)

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

這種方法存在一個問題:每個下游接收器需要知道可能被阻塞的上游傳送器的數量,並安排在早期返回時發訊號通知這些傳送器。跟蹤這些計數是乏味且容易出錯的。

我們需要一種方法來告訴未知和無限數量的goroutine停止向下遊傳送它們的值。在Go中,我們可以通過關閉通道來完成此操作,因為 關閉通道上的接收操作總是可以立即進行,從而產生元素型別的零值。

這意味著main只需關閉done頻道即可解鎖所有發件人。這種關閉實際上是傳送者的廣播訊號。我們將每個管道函式擴充套件為接受 done作為引數並通過defer語句安排接近發生 ,以便所有返回路徑main將通知管道階段退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.
}

我們的每個管道階段現在都可以在done關閉後自由返回。在output常規merge可以返回而不消耗其入站通道,因為它知道上游傳送者,sq將停止嘗試時傳送 done關閉。 output確保wg.Done通過defer語句在所有返回路徑上呼叫:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

同樣,一旦done關閉,sq就可以返回。 通過defer宣告,sq確保其out通道在所有返回路徑上關閉:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

以下是管道施工的指南:

  • 所有傳送操作完成後,階段關閉其出站通道。
  • 階段保持從入站通道接收值,直到這些通道關閉或發件人被解鎖。

管道通過確保為所有傳送的值提供足夠的緩衝區或通過在接收方放棄通道時顯式地傳送信令來發送訊號,從而解鎖傳送方。

最後以將一個目錄作為引數,並列印該目錄下每個常規檔案的摘要值,按路徑名排序。。

1.獲取某個目錄下的全部檔案路徑,並匯出到Channel paths中。

2.讀取paths中的路徑,並按路徑讀取檔案中的資料,並對資料生成md5簽名,返回路徑+簽名+err的 Channel

3.讀取2返回的Channel並列印其中的結果

第一版先不用併發執行:

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

第二版使用併發執行:

我們分裂MD5All成兩個階段的管道。

第一個階段,sumFiles遍歷樹,在新的goroutine中消化每個檔案,並在值為型別的通道上傳送結果result

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

第二階段,MD5All從中接收摘要值c。 MD5All錯誤時提前返回,done通過以下方式關閉defer:

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

第二版函式有個很大問題就是對每個檔案啟動一個goroutine求md5和,在具有許多大檔案的目錄中,這可能會分配比計算機上可用記憶體更多的記憶體。

我們可以通過限制並行讀取的檔案數來限制這些分配。

第三版有限的並行性

我們通過建立固定數量的goroutine來讀取檔案。我們的管道現在有三個階段:走樹,讀取和消化檔案,並收集摘要。

第一個階段,walkFiles發出樹中常規檔案的路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中間階段啟動固定數量的digestergoroutine,用來從paths接收檔名,並在通道上c傳送results:

// digester reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
	for path := range paths { // HLpaths
		data, err := ioutil.ReadFile(path)
		select {
		case c <- result{path, md5.Sum(data), err}:
		case <-done:
			return
		}
	}
}

與前面的示例不同,digester不會關閉其輸出通道,因為多個goroutine正在共享通道上傳送。相反,程式碼MD5All 安排digesters在完成所有操作後關閉頻道:

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.  In that case,
// MD5All does not wait for inflight read operations to complete.
func MD5All(root string) (map[string][md5.Size]byte, error) {
	// MD5All closes the done channel when it returns; it may do so before
	// receiving all the values from c and errc.
	done := make(chan struct{})
	defer close(done)

	paths, errc := walkFiles(done, root)

	// Start a fixed number of goroutines to read and digest files.
	c := make(chan result) // HLc
	var wg sync.WaitGroup
	const numDigesters = 20
	wg.Add(numDigesters)
	for i := 0; i < numDigesters; i++ {
		go func() {
			digester(done, paths, c) // HLc
			wg.Done()
		}()
	}
	go func() {
		wg.Wait()
		close(c) // HLc
	}()
	// End of pipeline. OMIT

	m := make(map[string][md5.Size]byte)
	for r := range c {
		if r.err != nil {
			return nil, r.err
		}
		m[r.path] = r.sum
	}
	// Check whether the Walk failed.
	if err := <-errc; err != nil { // HLerrc
		return nil, err
	}
	return m, nil
}

我們可以讓每個消化器建立並返回自己的輸出通道,但是我們需要額外的goroutine來扇入結果。

 

參考:https://blog.golang.org/pipelines

最後階段接收所有results從那裡c檢查錯誤errc。此檢查不會更早發生,因為在此之前,walkFiles可能阻止向下遊傳送值:

本文介紹了在Go中構建流資料管道的技術。處理此類管道中的故障非常棘手,因為管道中的每個階段都可能阻止嘗試向下遊傳送值,並且下游階段可能不再關心傳入的資料。我們展示了關閉一個通道如何向管道啟動的所有goroutine廣播“完成”訊號,並定義正確構建管道的準則。