1. 程式人生 > >15分鐘讓你瞭解如何實現併發中的Barrier

15分鐘讓你瞭解如何實現併發中的Barrier

說到Barrier,很多語言中已經是標準庫中自帶的概念,一般情況下,只需要直接使用就行了。而最近一些機緣巧合的機會,我需要在c++中使用這麼個玩意兒。但是c++標準庫裡還沒有這個概念,只有boost裡面有這樣現成的東西,而我又不想為了這麼一個小東西引入個boost。所以,我藉著這個機會研究了下,發現其實這些多執行緒/併發中的東西還是蠻有意思的。

 

閱讀本文你可能需要如下的一些知識:

  1. 多執行緒程式設計的概念。

  2. c++的基本語法和有關多執行緒的語法。

 

第二條可能也沒有那麼重要,因為如果理解了多執行緒的這些東西,什麼語言都可以實現其核心概念。好了,廢話少扯,進入正題。

 

一、什麼是Barrier?

 

首先,得介紹下Barrier的概念,Barrier從字面理解是屏障的意思,主要是用作集合執行緒,然後再一起往下執行。再具體一點,在Barrier之前,若干個thread各自執行,然後到了Barrier的時候停下,等待規定數目的所有的其他執行緒到達這個Barrier,之後再一起通過這個Barrier各自幹自己的事情。

 

這個概念特別像小時候集體活動的過程,大家從各自的家裡到學校集合,待人數都到齊之後,之後再一起坐車出去,到達指定地點後一起行動或者各自行動。

 

而在計算機的世界裡,Barrier可以解決的問題很多,比如,一個程式有若干個執行緒併發的從網站上下載一個大型xml檔案,這個過程可以相互獨立,因為一個檔案的各個部分並不相關。而在處理這個檔案的時候,可能需要一個完整的檔案,所以,需要有一條虛擬的線讓這些併發的部分集合一下從而可以拼接成為一個完整的檔案,可能是為了後續處理也可能是為了計算hash值來驗證檔案的完整性。而後,再交由下一步處理。

 

二、如何實現一個Barrier?

 

併發的很多東西都擁有一個壞處就是你很難證明某種實現不是錯誤的,因為很多時候確實情況太多了,無論是死鎖,飢餓對於人腦都是太大的負擔。而反過來,對於我扯這篇文章,也是一個好處,正因為很難證明不是錯誤的,所以我的扯淡可以更放心一點。

 

在研究Barrier的實現中,我查閱了蠻多的資料的。說實話,其實現方式挺多的。在剔除了一些我能明確證明其有可能是錯誤的,我選擇了我自己覺得最容易理解的一種。

 

第一節說過,barrier很像是以前的班級集合,站在一個老師的角度,你需要知道的東西至少有這兩個:

  1. 班級有多少人。

  2. 目前已經到了多少人。

     

只有當目前已經到了的人等於班級人數之後才能出發。

 

所以如果按照這個類比,實現一個barrier至少需要以下的幾個變數:

  1. 需要同時在barrier等待的執行緒的個數。

  2. 當前到達barrier的執行緒的個數。

 

而按照barrier的邏輯,主要應該有這些操作:

  1. 當一個執行緒到達barrier的時候,增加計數。

  2. 如果個數不等於當前需要等待的執行緒個數,等待。

  3. 如果個數達到了需要等待的執行緒個數,通知/喚醒所有等待的程序,讓所有程序通過barrier。

 

在不考慮加鎖的情況下,按照上面的邏輯,虛擬碼大概應該像這樣:

thread_count = n; <-- n是需要一起等待的執行緒的個數
arrived_count = 0; <-- 到達執行緒的個數
-------------------------------------------------------------
 以上是全域性變數,只會初始化一次,以下是barrier開始的程式碼
-------------------------------------------------------------
arrived_count += 1;
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

而在多執行緒環境下,很明顯arrived_count這種全域性變數更新需要加鎖。所以,對於這個程式碼,綜合稍微再改動一下,虛擬碼可以更新下成為這樣:

thread_count = n; <-- n是需要一起等待的執行緒的個數
arrived_count = 0; <-- 到達執行緒的個數
-------------------------------------------------------------
 以上是全域性變數,只會初始化一次,以下是barrier開始的程式碼
-------------------------------------------------------------
lock();
    arrived_count += 1;
unlock();
if(arrived_count == thread_count)
    notify_all_threads_and_unblok();
else
    block_and_wait();

這裡,在有的語言中,鎖的粒度可能小了點,取決於notify_all_threads和wait在這個語言中的定義,但是作為虛擬碼,為了可能展示起來比較方便。

 

而如果你有併發程式設計的知識,你應該敏感的認識到notify_all_threads_and_unblock,block_and_wait這種在這裡雖然是簡單的幾個單詞,但是其包含的操作步驟明顯不止一個,更別說背後的機器指令了。所以作為一個併發概念下執行的程式,不可以簡單的就放這樣一個操作在這裡,如果都是任何函式,指令,程式碼都是自帶原子性的,那麼寫多執行緒/併發程式也沒有啥好研究的了。所以對於這兩個操作,我們必須具體的擴充套件下。

 

對於notify_all_threads_and_unblock和block_and_wait包含相當多的操作,所以下面,得把這兩個操作具體的展開。

 1 thread_count = n; <-- n是需要一起等待的執行緒的個數
 2 arrived_count = 0; <-- 到達執行緒的個數
 3 could_release = false; 
 4 -------------------------------------------------------------
 5  以上是全域性變數,只會初始化一次,以下是barrier開始的程式碼
 6 -------------------------------------------------------------
 7 lock();
 8     if(arrived_count == 0)
 9        could_release = false; 
10     
11     arrived_count += 1;
12 unlock();
13 if(arrived_count == thread_count)
14     could_realse = true;    
15     arrived_count = 0; 
16 else
17     while(could_release == false)
18         spin()

這裡多了一個變數could_release完成上面說的兩個操作。原理也很簡單,如果等待的個數沒有到達指定數目,這個值始終是false,在程式碼中使用迴圈讓執行緒阻塞在spin處(當然,假設spin是原子性的)。如果到達了thread_count,改變could_release的值,這樣迴圈條件不滿足,程式碼可以繼續執行。而在13行的if裡面把arrived_count重新設定為0是因為如果不這樣做,那麼這個barrier就只能用一次,因為沒有地方再把這個表示到達執行緒數目變數的初始值重新設定了。

 

我覺得這裡需要停一下,來思一下上面的程式碼,首先,這個程式碼有很多看起來很像有問題的地方。比如對於could_release和arrived_count的重置處,這都是賦值,而在併發程式中,任何寫操作都需要仔細思考是否需要加鎖,在這裡,加鎖當然沒問題。但是盲目的加鎖會導致效能損失。

 

多執行緒程式最可怕的就是陷入細節,所以,我一般都是整體的思考下是不是有問題。對於一個barrier,錯誤就是指沒有等所有的執行緒都到達了就停止了等待,人沒來齊就發車了。而怎麼會導致這樣的情況呢?只有當arrived_count值在兩個執行緒不同步才會導致錯誤。秉承這個原則,看看上面的程式碼,arrived_count的更新是加鎖的,所以在到達if之前其值是可以信賴的。而if這段判斷本身是讀操作,其判斷就是可以信賴的,因為arrived_count的值更新是可靠的,所以進來的執行緒要麼進入if,要麼進入else。不存線上程1更新了arrived_count的值而執行緒2讀到了arrived_count的值而導致沒有到thread_count就更新了could_release的情況。

 

沒辦法,這類的程式就是很繞,所以我一般都不陷入細節。

 

現在看起來,一切都很完美,但是多執行緒程式最噁心的地方就是可能的死鎖,飢餓等等。而這些又很難證明,而上面這段程式碼,在某些情況下就是會導致死鎖。考慮thread_count等於2,也就是這個barrier需要等待兩個執行緒一起通過。

 

現在有兩個執行緒,t1和t2,t1先執行直到17行,卡住,這時候t2獲得寶貴的cpu機會。很明顯,這時會進入14行,更新could_release的值。如果這個時候t1獲得執行機會,萬事大吉,t1會離開while區域,繼續執行。直到下次再次到達這個barrier。

 

但是如果這個時候t1並沒有獲得執行機會,t2一直執行,雖然喚醒了could_relase,但是t1會一直停留在18行。要知道,這個含有barrier的程式碼可能是在一個迴圈之中,如果t2再次到達barrier的區域,這時候arrived_count等於0(因為arrived_count在上一次t2進入13行之後重置了),這個時候could_relase會變成false。現在t1,t2都在18行了,沒有人有機會去更新could_relase的值,執行緒死鎖了。

 

怎麼辦?仔細思考下,是喚醒機制有問題,很明顯,如果能夠在喚醒的時候原子式的喚醒所有的執行緒,那麼上面所說的問題就不存在了。在很多語言裡都有這樣的方法可以完成上面說的原子性的喚醒所有執行緒,比如c++裡面的notify_all。但是,如果沒有這個函式,該如何實現呢?

 

上面死鎖問題的誕生在於一個執行緒不恰當的更新了全域性的could_relase,導致全部的判斷條件跟著錯誤的改變。解決這樣的問題,需要的是一個只有每個執行緒各自能看到,可以獨立更新,互相不干擾而又能被使用的變數。幸好,在設計多執行緒概念時,有一個概念叫做thread local,剛好能夠滿足這個要求。而運用這樣的變數,上述的概念可以表述成為:

 1 thread_count = n; <-- n是需要一起等待的執行緒的個數
 2 arrived_count = 0; <-- 到達執行緒的個數
 3 could_release = false;
 4 thread_local_flag = could_release; <-- 執行緒區域性變數,每個執行緒獨立更新 
 5 -------------------------------------------------------------
 6  以上是全域性變數,只會初始化一次,以下是barrier開始的程式碼
 7 -------------------------------------------------------------
 8 thread_local_flag = !thread_local_flag
 9 lock();
10     arrived_count += 1;
11 unlock();
12 if(arrived_count == thread_count)
13     could_realse = thread_local_flag;    
14     arrived_count = 0; 
15 else
16     while(could_release != thread_local_flag)
17         spin()

這裡要著重解釋下,為什麼不會死鎖,由於thread_local_flag是每個執行緒獨立更新的,所以很明顯,其是不用加鎖的。其餘程式碼和上面的虛擬碼類似,不同的是,如果發生上面一樣的情況,t2更新thread_local_flag的時候,只有其區域性的變數會被置反而不會影響其餘的執行緒的變數,而因為could_realse是全域性變數,在t2第一次執行到13行的時候已經設定成thread_local_flag一樣的值了。這個時候, 哪怕t2再次執行到16行也會因為其內部變數已經被置反而阻塞在這個while迴圈之中。而t1只要獲得執行機會,就可以通過這個barrier。

 

有點繞,但是仔細想想還是蠻有意思的。

 

三、如何運用c++實現Barrier?

 

雖然上面說了那麼多,但是c++中實現Barrier不需要這麼複雜,這要感謝c++ 11中已經自帶了很多原子性的操作,比如上面說的notify_all。所以,程式碼就沒有那麼複雜了,當然,c++也有thread_local,如果不畏勞苦,可以真的從最基礎的寫起。

#include <iostream>
#include <condition_variable>
#include <thread>
#include <chrono>
​
using namespace std;
​
class TestBarrier{
public:
    TestBarrier(int nThreadCount):
        m_threadCount(nThreadCount),
        m_count(0),
        m_release(0)
    {}
​
    void wait1(){
        unique_lock<mutex> lk(m_lock);
        if(m_count == 0){
            m_release = 0;
        }
        m_count++;
        if(m_count == m_threadCount){
            m_count = 0;
            m_release = 1;
            m_cv.notify_all();
        }
        else{
            m_cv.wait(lk, [&]{return m_release == 1;});
        } 
    }
​
private:
    mutex m_lock;
    condition_variable m_cv;
    unsigned int m_threadCount;
    unsigned int m_count; 
    unsigned int m_release;
};

這裡多虧了c++標準庫中引進的condition_variable,使得上面的概念可以簡單高效而又放心的實現,你也不需要操心什麼執行緒區域性量。而關於c++併發相關的種種知識可能需要專門的若干篇幅才能說清楚,如果你並不熟悉c++,可以跳過這些不知所云的部分。驗證上述程式碼可以使用如下程式碼:

unsigned int threadWaiting = 5;
TestBarrier barrier(5);
​
void func1(){
    this_thread::sleep_for(chrono::seconds(3));
    cout<<"func1"<<endl;
    barrier.wait1();
    cout<<"func1 has awakended!"<<endl;
}
​
void func2(){
    cout<<"func2"<<endl;
    barrier.wait1();
    cout<<"func2 has awakended!"<<endl;
}
​
void func3(){
    this_thread::sleep_for(chrono::seconds(1));
    cout<<"func3"<<endl;
    barrier.wait1();
    cout<<"func3 has awakended!"<<endl;
}
​
int main(){
    for(int i = 0; i < 5; i++){
        thread t1(func1);
        thread t2(func3);
        thread t3(func2);
        thread t4(func3);
        thread t5(func2);
        t1.join();
        t2.join();
        t3.join();
        t4.join();
        t5.join();
    }
}

好了,在我機器上的執行結果是這樣的,由於輸出沒有同步,所以輸出可能並沒有想象的那麼整潔。但是不影響整體結果,可以看到,所有執行緒到齊之後才各自執行各自後面的程式碼:


這篇文章也在我的公眾號同步發表,我的這個公眾號嘛,佛系更新,當然,本質上是想到一個話題不容易(懶的好藉口),歡迎關注哦: