1. 程式人生 > >C++併發編成 03 執行緒同步

C++併發編成 03 執行緒同步

這一節主要講講執行緒同步的方式,C++ 11中提供了豐富的執行緒同步元語,如condition_variable,futrue,std::packaged_task<>,std::promise,std::async等,本節後續內容將就這些話題進行闡述。

1. Lambda表示式

lambda表示式是C++ 11提供的新特性,在高階語言當中,該語法特性已經得到了普遍的支援。lambda函式能夠大大簡化程式碼複雜度(語法糖:利於理解具體的功能),避免實現呼叫物件。能為等待函式,例如 std::condition_variable提供很好謂詞函式,其語義可以用來快速的表示可訪問的變數,而非使用類中函式來對成員變數進行捕獲。在開始講執行緒同步的內容之前,我想簡單講講lambda表示式,主要是後面在寫示例程式碼的時候會使用到lambda表示式的知識,就算是做個知識的提前鋪墊吧。

 lambda表達書以方括號開頭,所有操作語義都在一對大括號中{ },並以()結尾,

 
 
[capture list] (params list) mutable exception-> return type { function body }

各項具體含義如下

  1. capture list:捕獲外部變數列表
  2. params list:形參列表
  3. mutable指示符:用來說用是否可以修改捕獲的變數
  4. exception:異常設定
  5. return type:返回型別
  6. function body:函式體

此外,我們還可以省略其中的某些成分來宣告“不完整”的Lambda表示式,常見的有以下幾種:

序號 格式
1 [capture list] (params list) -> return type {function body}
2 [capture list] (params list) {function body}
3 [capture list] {function body}

其中:

  • 格式1聲明瞭const型別的表示式,這種型別的表示式不能修改捕獲列表中的值。
  • 格式2省略了返回值型別,但編譯器可以根據以下規則推斷出Lambda表示式的返回型別: (1):如果function body中存在return語句,則該Lambda表示式的返回型別由return語句的返回型別確定; (2):如果function body中沒有return語句,則返回值為void型別。
  • 格式3中省略了引數列表,類似普通函式中的無參函式。

 

例 1 使用了lambda表示式對以一個verctor容器中的所有元素進行的列印.

std::vector<int> data={1,2,3,4,5,6,7,8,9};
std::for_each(data.begin(),data.end(),[](int i){std::cout<<i<<"\n";});

例2 容器比較函式

#include <iostream>
#include <vector>
#include <algorithm>
using namespace std;

bool cmp(int a, int b)
{
    return  a < b;
}

int main()
{
    vector<int> myvec{ 3, 2, 5, 7, 3, 2 };
    vector<int> lbvec(myvec);

    sort(myvec.begin(), myvec.end(), cmp); // 舊式做法
    cout << "predicate function:" << endl;
    for (int it : myvec)
        cout << it << ' ';
    cout << endl;

    sort(lbvec.begin(), lbvec.end(), [](int a, int b) -> bool { return a < b; });   // Lambda表示式
    cout << "lambda expression:" << endl;
    for (int it : lbvec)
        cout << it << ' ';
}

在C++11之前,我們使用STL的sort函式,需要提供一個謂詞函式。如果使用C++11的Lambda表示式,我們只需要傳入一個匿名函式即可,方便簡潔,而且程式碼的可讀性也比舊式的做法好多了。

 

Lambda表示式通過在最前面的方括號[]來明確指明其內部可以訪問的外部變數,主要有值捕獲、引用捕獲、隱式捕獲三種。

捕獲形式 說明
[] 不捕獲任何外部變數
[變數名, …] 預設以值得形式捕獲指定的多個外部變數(用逗號分隔),如果引用捕獲,需要顯示宣告(使用&說明符)
[this] 以值的形式捕獲this指標
[=] 以值的形式捕獲所有外部變數
[&] 以引用形式捕獲所有外部變數
[=, &x] 變數x以引用形式捕獲,其餘變數以傳值形式捕獲
[&, x] 變數x以值的形式捕獲,其餘變數以引用形式捕獲

舉例:

#include <iostream>
using namespace std;

int main()
{
    int a = 123;
    auto f = [a] { cout << a << endl; };   // 值捕獲外部本地變數a
    f(); // 輸出:123

    //或通過“函式體”後面的‘()’傳入引數
    auto x = [](int a){cout << a << endl; return 0; }(123);  // 顯示傳入外部變數值(值傳遞)給lambda表示式,輸出123


    auto refFunction = [&a] { cout << a << endl; };  // 通過lambda表示式定義函式,引用捕獲外部變數
    a = 321;
    refFunction(); //呼叫lambda函式,輸出:321

    
    auto fvalue = [=] { cout << a << endl; };    // 隱式值捕獲外部所有變數
    fvalue(); // 輸出:321

    auto fref = [&] { cout << a << endl; };    // 隱式引用捕獲外部所有變數
    a = 321;
    fref(); // 輸出:321
}

 2. std::condition_variable

std::condition_variable可以被多個thread都可以訪問的並等待條件達成,如果條件未達成,則訪問執行緒阻塞;反之,如果條件達成,可以在達成的執行緒內呼叫std::condition_variable.notify_one()或std::condition_variable.notify_all來喚醒一個或全部等待執行緒。等待執行緒收到通知,就知道條件滿足,就可以繼續執行了。std::condition_variable需要和std::mutex,std::unique_lock一起配合來使用

 1 #include <mutex>
 2 #include <condition_variable>
 3 #include <thread>
 4 #include <queue>
 5 
 6 bool more_data_to_prepare()
 7 {
 8     return false;
 9 }
10 
11 struct data_chunk
12 {};
13 
14 data_chunk prepare_data()
15 {
16     return data_chunk();
17 }
18 
19 void process(data_chunk&)
20 {}
21 
22 bool is_last_chunk(data_chunk&)
23 {
24     return true;
25 }
26 
27 std::mutex mut;
28 std::queue<data_chunk> data_queue;
29 std::condition_variable data_cond;
30 
31 void data_preparation_thread()
32 {
33     while(more_data_to_prepare())
34     {
35         data_chunk const data=prepare_data();
36         std::lock_guard<std::mutex> lk(mut);
37         data_queue.push(data);
38         data_cond.notify_one();
39     }
40 }
41 
42 void data_processing_thread()
43 {
44     while(true)
45     {
46         std::unique_lock<std::mutex> lk(mut);
47         data_cond.wait(lk,[]{return !data_queue.empty();});
48         data_chunk data=data_queue.front();
49         data_queue.pop();
50         lk.unlock();
51         process(data);
52         if(is_last_chunk(data))
53             break;
54     }
55 }
56 
57 int main()
58 {
59     std::thread t1(data_preparation_thread);
60     std::thread t2(data_processing_thread);
61     
62     t1.join();
63     t2.join();
64 }

示例程式碼演示了一個生產者data_preparation_thread和消費者data_processing_thread兩個執行緒之間的同步。

1. 生產者執行緒data_preparation_thread迴圈進行訊息的準備,在37行往佇列中插入一條記錄後,通過38行的std::condition_variable例項喚醒一個等待執行緒。

2. 消費者執行緒data_processing_thread在47行檢查通過std::condition_variable.wait中的 lambda表示式檢查佇列是否為空,如果佇列為空,wait()函式將解鎖互斥量,並且將這個執行緒(上段提到的處理資料的執行緒)置於阻塞或等待狀態。直到生產者執行緒data_preparation_thread有資料push到佇列中後,通過std::condition_variable.notify_one()在38行進行通知喚醒。一旦被喚醒,消費者執行緒會再次獲取互斥鎖,並且對條件再次檢查,並在條件滿足的情況下,繼續持有鎖,並48行和以下部分的程式碼。

3. std::condition_variable.wait等待條件滿足是直接與等待條件相關的,而與通知到達無關,通知到達了,只是喚醒等待執行緒重新鎖定共享資料,檢查條件滿足沒有。wait的第一個引數為對共享資料進行保護的鎖,只有在鎖定狀態下,wait才會去檢查wait條件是否達成。

 執行緒安全佇列程式碼示例:

 

#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>

template<typename T>
class threadsafe_queue
{
private:
    mutable std::mutex mut;
    std::queue<std::shared_ptr<T> > data_queue;
    std::condition_variable data_cond;
public:
    threadsafe_queue()
    {}

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        value=std::move(*data_queue.front());
        data_queue.pop();
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return false;
        value=std::move(*data_queue.front());
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_queue.empty();});
        std::shared_ptr<T> res=data_queue.front();
        data_queue.pop();
        return res;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_queue.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res=data_queue.front();
        data_queue.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_queue.empty();
    }

    void push(T new_value)
    {
        std::shared_ptr<T> data(
            std::make_shared<T>(std::move(new_value)));
        std::lock_guard<std::mutex> lk(mut);
        data_queue.push(data);
        data_cond.notify_one();
    }

};

3. std::future<T>

std::future是一個非同步結果獲得的物件,將任務與一個future物件關聯,任務做完後就可以通過std::future物件得到結果,當我們需要時就可以隨時去從std::future物件取得我們期待的結果.  非同步需要執行的任務是通過provider來設定的,通過在定義provider的時候,將future與某個task關聯在一起,就能通過future非同步取得執行結果(假設fut為future物件,通過ful.get()取得結果)了。future也是一個模板類future<T>,T表示future返回結果的型別。

通過thread在執行的任務是沒有反獲值的,如果需要獲取執行緒執行的返回值,就需要通過future。std::future是一次性等待任務,只是執行一次,然後結果會返回給future,如果還來就需要重新設定。std:condition_variable是可以重複執行的。

3.1 std::async()

std::async啟動一個非同步任務並會返回一個std::future 物件,這個物件持有最終計算出來的結果。當你需要這個值時,你只需要呼叫這個futrue物件的get()成員函式;並且直到“期望”狀態為就緒的情況下,執行緒才會阻塞;之後,返回計算結果。 std::async的引數格式為std::async(parameA,paramB,paramC,paramD),引數說明如下:

a) paramA是執行的方式,std::launch::deferred表示延遲執行,即在future上呼叫get()或者wait()才開始執行; std::launch::async表示立即啟動一個新執行緒執行。   此引數忽略,如果忽略表示由具體實現選擇方式。
b) paramB是需要執行任務的函式名。              此引數為必選引數
c) paramC表示,呼叫需要執行任務的函式的物件。   此引數為可選引數,因為可能不是一個類的成員函式,所以可以直接呼叫
d) paramD表示函式的引數,所以D可以有很多項。

如:
#include <future>
#include <iostream>
#include <string>
using namespace std;

int find_the_answer_to_ltuae()
{
    return 42;
}

void do_something_in_main_thread()
{ 
    std::cout<<"I'm in main thead\n";
}

struct X
{
    void foo(int inumber,std::string const& svalue)
    {
        cout<<"struct X inumber:"<<inumber<<"  svalue:"<<svalue<<endl;
    }
    std::string bar(std::string const& svalue)
    {
        cout<<"struct X svalue:" <<svalue<<endl;
    }
};


X x;
auto f1=std::async(&X::foo,&x,42,"hello");
auto f2=std::async(&X::bar,x,"goodbye");

struct Y
{
    double operator()(double value)
    {
        std::cout<<"struct Y :"<<value<<std::endl;
    }
};
Y y;
auto f3=std::async(Y(),3);
auto f4=std::async(std::ref(y),4);

auto f6=std::async(std::launch::async,Y(),6); // 在新執行緒上執行
auto f7=std::async(std::launch::deferred,Y(),7); // 在wait()或get()呼叫時執行
// f7.wait() //// 呼叫延遲函式

int main()
{
    std::future<int> the_answer=std::async(find_the_answer_to_ltuae);
    do_something_in_main_thread();
    std::cout<<"The answer is "<<the_answer.get()<<std::endl;
}

輸出如下:

[email protected]:~/workspace/open-source$ ./a.out
struct X svalue:goodbyestruct Y :
struct X inumber:42  svalue:hello
3
struct Y :4
struct Y :6
I'm in main thead
The answer is 42

 

3.2 std::packaged_task<>
std::packaged_task<>將一個可呼叫物件(即需要執行的任務)或函式和一個future封裝在一起的可呼叫物件。當構造一個 std::packaged_task<> 例項時,必須傳入一個函式或可呼叫物件,這個函式或可呼叫的物件需要能接收指定的引數和返回可轉換為指定返回型別的值。型別可以不完全匹配;比如使用float f(int )函式,來構建 std::packaged_task<double(double)> 的例項,因為在這裡,型別可以隱式轉換。使用std::packaged_task關聯的std::future物件儲存的資料型別是可調物件的返回結果型別,如示例函式的返回結果型別是int,那麼宣告為std::future<int>,而不是std::future<int(int)。程式碼示例:

#include <iostream>
#include <type_traits>
#include <future>
#include <thread>

using namespace std;
int main()
{
    std::packaged_task<int()> task([]() {
        std::this_thread::sleep_for(std::chrono::seconds(5));// 執行緒睡眠5s
        return 4; });
    std::thread t1(std::ref(task));
    std::future<int> f1 = task.get_future();
    
    auto r = f1.get();// 執行緒外阻塞等待
    std::cout << r << std::endl;

    t1.join();
return 0; }

 

 

3.3 std::promise<>

 std::promise用來包裝一個值將資料和future繫結起來,為獲取執行緒函式中的某個值提供便利,需要顯示設定promise的返回值,取值是間接通過promise內部提供的future來獲取的,也就是說promise的層次比future高。

#include <iostream>
#include <type_traits>
#include <future>
#include <thread>

using namespace std;
int main()
{
    std::promise<int> promiseParam;
    std::thread t([](std::promise<int>& p)
    {
        std::this_thread::sleep_for(std::chrono::seconds(10));// 執行緒睡眠10s
        p.set_value_at_thread_exit(4);//
    }, std::ref(promiseParam));
    std::future<int> futureParam = promiseParam.get_future();

    auto r = futureParam.get();// 執行緒外阻塞等待
    std::cout << r << std::endl;
 
    t.join()
return 0; }