1. 程式人生 > >c++11簡單的線程

c++11簡單的線程

join() ref 交互 variables nds 函數返回 競爭條件 多個 輸入數據

線程的管理

啟動線程

為了讓編譯器識別 std::thread 類,這個簡單的例子也要包含 <thread> 頭文件。

如同大多數C++標準庫一樣

線程在std::thread對象創建(為線程指定任務)啟動

無參任務

最簡單的任務,通常是無參數無返回(void-returning)的函數,這種函數在其所屬線程上運行,直到函數執行完畢,線程也就結束了。

例如:

#include<iostream>

#include<thread>

using namespace std;

void go()

{

cout << "Welcome to Thread!";

}

void main()

{

thread t1(go);

cin.get();

}

運行結果

有參任務

std::thread 可以用可調用(callable)類型構造,將帶有函數調用符類型

的實例傳入 std::thread 類中,替換默認的構造函數。

#include <iostream>

#include <thread>

#include <string>

using namespace std;

void run(int num)

{

cout << "線程" << num

<< endl;

}

void main()

{

thread p(run,1);

cin.get();

}

執行結果!

等待線程

啟動了線程,你需要明確是要等待線程結束(加入式joined),還是讓其自主運行(分

離式——detached)。如果 std::thread 對象銷毀之前還沒有做出決定,程序就會終止

( std::thread 的析構函數會調用 std::terminate() )。因此,即便是有異常存在,也需要確保線程能夠正確的加入(joined)或分離(detached)

例如

#include <iostream>

#include <thread>

#include <string>

#include <chrono> //c++時間庫

using namespace std;

void run(int num)

{

chrono::seconds(3); //c++標準庫休眠3秒鐘

std::cout << "線程" << num << endl;

}

void main()

{

thread t(run, 1);

t.join(); //阻塞主函數等待等待線程結束

cin.get();

}

joinable()查看當前線程是否被join true沒有flase成功

#include <iostream>

#include <thread>

#include <string>

#include <chrono> //c++時間庫

using namespace std;

void run(int num)

{

chrono::seconds(3); //c++標準庫休眠

std::cout << "線程" << num << endl;

}

void main()

{

thread t(run, 1);

if(t.joinable())

t.join();

cin.get();

}

分離線程

使用detach()會讓線程在後臺運行,這就意味著主線程不能與之產生直接交互。也就是說,不會等待這個線程結束;

如果線程分離,那麽就不可能有 std::thread 對象能引用它,分離線程

的確在後臺運行,所以分離線程不能被加入。不過C++運行庫保證,當線程退出時,相關資源的能夠正確回收,後臺線程的歸屬和控制C++運行庫都會處理。

例如

#include <iostream>

#include <thread>

#include <string>

#include <chrono> //c++時間庫

using namespace std;

void run(int num)

{

chrono::seconds(3); //c++標準庫休眠

std::cout << "線程" << num << endl;

}

void main()

{

thread t(run, 1);

t.detach(); //脫離當前主線程自由執行

cin.get();

}

轉移線程所有權

假設要寫一個在後臺啟動線程的函數,想通過新線程返回的所有權去調用這個函數,而不是

等待線程結束再去調用;或完全與之相反的想法:創建一個線程,並在函數中轉移所有

都必須要等待線程結束。總之,新線程的所有權都需要轉移。

程的所有可以在 std::thread 實例中移動,下面將展示一個例子。

例如:

#include<iostream>

#include<thread>

using namespace std;

void run1()

{

cout << "run1" << endl;

}

void run2()

{

cout << "run2" << endl;

}

void main()

{

std::thread t1(run1); // 1

std::thread t2 = std::move(t1); // 2當顯式使用 std::move() 創建t2後,t1的所有權就轉移給了t2

t1 = std::thread(run2);

cin.get();

}

std::thread 支持移,就意味著線程的所有權可以在函數外進行轉移,就如下面程序一樣。

#include<iostream>

#include<thread>

using namespace std;

std::thread run1()

{

void some_function();

return std::thread(some_function);

}

void main()

{

void some_function();

thread t1(std::thread(run1));

}

運行時決定線程數量

std::thread::hardware_concurrency()

這個函數將返回能同時並發在一個程序中的線程數量。

例如,多核系統中,返回值就可以能是CPU核芯的數量。

返回值也僅僅是一個提示,當系統信息無法獲取時,函數也會返回0。但是,這也無

法掩蓋這個函數對啟動線程數量的幫助。

使用線程組來分割任。

//如下

//將100個任務分片,分成4片

#include <iostream>

#include <thread>

#include <vector>

#include <string>

#include <iterator>

#include <numeric>

#include <algorithm>

using namespace std;

template<typename Iterator, typename T>

struct accumulate_block

{

void operator()(Iterator first, Iterator last, T& result) //叠代器頭,叠代器尾,線程的數量 (重載)

{

result = std::accumulate(first, last, result);

//累加 開始 結束 累加的初值

}

};

template<typename Iterator, typename T>

T parallel_accumulate(Iterator first, Iterator last, T init)

{

unsigned long const length = std::distance(first, last);

// 若輸入數據為空,則返回初始值

if (!length)

return init;

// 計算所需要的最大線程數量,每個線程至少計算25個數據

unsigned long const min_per_thread = 25;

unsigned long const max_threads =

(length + min_per_thread - 1) / min_per_thread;

// 獲取硬件可並發線程數量

unsigned long const hardware_threads =

std::thread::hardware_concurrency();

// 計算實際要創建的線程數量

unsigned long const num_threads =

std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads);

// 根據線程數量,拆分數據

unsigned long const block_size = length / num_threads;

// 創建用於存放每個線程計算結果的容器和線程

std::vector<T> results(num_threads);

std::vector<std::thread> threads(num_threads - 1);

Iterator block_start = first;

for (unsigned long i = 0; i<(num_threads - 1); ++i)

{

Iterator block_end = block_start;

// 移動叠代器

std::advance(block_end, block_size);

// 啟動新線程,對一塊數據進行處理

threads[i] = std::thread(

accumulate_block<Iterator, T>(),

block_start, block_end, std::ref(results[i]));

// 為下一個線程準備數據

block_start = block_end;

}

// 當啟動了所有的子線程對數據進行計算,本線程就對數據的最後一塊進行計算

accumulate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);

// 使用fore_each對所有的線程執行join操作,等待它們執行結束

std::for_each(threads.begin(), threads.end(),

std::mem_fn(&std::thread::join));

// 最後對所有的計算結果求和

return std::accumulate(results.begin(), results.end(), init);

}

int main()

{

std::cout << "threads: " << std::thread::hardware_concurrency() << std::endl;

std::vector<int> vi;

for (int i = 0; i<100; ++i)

{

vi.push_back(1);

}

int sum = parallel_accumulate(vi.begin(), vi.end(), 5);

std::cout << "sum=" << sum << std::endl;

cin.get();

return 0;

}

識別線程

線程標識類型是 std::thread::id ,可以通過兩種方式進行檢索。

第一種,可以通過調用 std::thread 對象的成員函數 get_id() 來直接獲取。

如果 std::thread 對象沒有與任何執行線程相關聯, get_id() 將返回 std::thread::type 默認構造值,這個值表示“沒有線程”。

二種,當前線程中調用 std::this_thread::get_id() (這個函數定義在 <thread> 頭文件中)也可

以獲得線程標識

std::thread::id 實例常用作檢測線程是否需要進行一些操作,比如:當用線程來分割一項工

主線程可能要做一些與其他線程不同的工作。這種情況下,啟動其他線程

前,它可以將自己的線程ID通過 std::this_thread::get_id() 得到,並進行存儲。

就是算法核心部分(所有線程都一樣的),每個線程都要檢查一下,其擁有的線程ID是否與初始線程的ID相同。

std::thread::id master_thread;

void some_core_part_of_algorithm()

{

if(std::this_thread::get_id()==master_thread)

{

do_master_thread_work();

}

do_common_work();

}

總結

討論了C++標準庫中基本的線程管理方式:啟動線程,等待結束和不等待結束(因為需要它們運行在後臺)。

並了解應該如何在線程啟動前,向線程函數中傳遞參數,如何轉移線程的

所有權,如何使用來分割任務。

最後使用線程標識來確定數據,以及特殊線程的特殊解決方案

線程間共享數據

當線程在訪問共享數據的時候,必須定一些規矩,用來限定線程可訪問的數據。

還有,一個線程更新了共享數據,需要對其他線程進行通知。

從易用性的角度,同一進程中的多個線程進行數據共享,有利有弊。

錯誤的共享數據使用是產生並發bug的一個主要原因。

共享數據帶來的問題

當涉及到共享數據時,問題很可能是因為共享數據修改所導致。

如果共享數據是只讀的,那麽操作不會影響到數據,更不會涉及對數據的修改,所以所有線程都會獲得同樣的數據。

但是,當一個或多個修改共享數據時,就會產生很多麻煩。這種情況下,就必須

小心,才能確保一切所有線程都工作正常

例如破壞一個鏈表

如圖:

條件競爭

良心競爭:

爭條件的形成,取決於一個以上線程的相對執行順序,每個線程都搶著完成自己的任務。大多數情況下,即使改變執行順序,也是良性競爭,其結果可以接受。

惡心競爭:

例如,有兩個線程同時向一個處理隊列中添加任務,因為系統提供的保持不變,所以都不會有什麽影響。量遭到破壞,才會產生條件競爭。

並發中對數據的條件競爭通常表示為“”(problematic)條件競爭,們對問題的良性條件不感興趣。

C++標準中也定義了數據競爭(data race)這個術語,一種特殊的條件競爭:並發的

去修改一個獨立對象,數據競爭是(可怕的)定義行為(undefine behavior)的起

因。

避免惡性條件競爭

這裏提供一些方法來解決惡性條件競爭,最簡單的辦法就是對數據結構采用某種機制,確保只有進行修改的線程才能看到不變量被破壞時的中間狀態。

從其他訪問線程的角度來看,修改不是已經完成了,就是還沒開始。

另一個選擇是對數據結構和不變量的設計進行修改,修改完的結構必須能完成一系列不可分

割的變化,也就是保證每個不變量保持穩定的狀,這就是無鎖編程

另一種處理條件競爭的方式是,使用事務(transacting)的方式去處理數據結構的更新,這裏的"處理"就如同對數據庫進行更新一樣。

所需的一些數據和讀取都存儲在事務日誌中,然後將之前的操作合為一步,再進行提交。

當數據結構被另一個線程修改後,或處理已經重啟的情況下,提交就會無法進行,這稱作為“軟件事務內存”(software transactional memory

(STM))。理論研究中,這是一個很熱門的研究領域。這個概念將不會在本書中再進行介紹,

因為在C++中沒有對STM進行直接支持。

保護共享數據結構的最基本的方式,是使用C++標準庫提供的互斥量(mutex)

使用互斥量保護共享數據

當程序中有共享數據,肯定不想讓其陷入條件競爭,或是不變量被破壞。

那麽,將所有訪問共享數據結構的代碼都標記為互斥豈不是更好?這樣任何一個線程在執行這些代碼時,其他任何線程試圖訪問共享數據結構,就必須等到那一段代碼執行結束。

於是,一個線程就不可能會看到被破壞的不變量,除非它本身就是修改共享數據的線程。

當訪問共享數據前,使用互斥量將相關數據鎖住,再當訪問結束後,再將數據解鎖。線程庫需要保證,當一個線程使用特定互斥量鎖住共享數據時,其他的線程想要訪問鎖住的數據,

都必須等到之前那個線程對數據進行解鎖後,才能進行訪問。這就保證了所有線程能看到共享數據,而不破壞不變量。

互斥量是C++中一種最通用的數據保護機制,但它不是“銀蛋”;精心組織代碼來正確的數據,並在接口內部避免競爭條件是非常重要的。但互斥量自身也有問

題,也會造成,或是對數據保護的太多(或太少)。

C++中使用互斥量

C++中通過實例化 srd::mutex 創建互斥量,通過調用成員函數lock()進行上鎖,unlock()進行解鎖。

不推薦中直接去調用成員函數,因為調用成員函數就意味著,必須記住在每個函數出口都要去調unlock(),也包括異常的情況。

C++標準庫為互斥量提供了一個RAII法的模板 std::lack_guard ,其會在構造的時候提供的互斥量,並在行解,從而保證了一個已鎖的互斥量總是會被正確的解

std::mutex 和 std::lock_guard 都在 <mutex> 頭文件中聲明。

實踐調用成員函數

//進程的鎖定

#include <iostream>

#include <thread>

#include <string>

#include<windows.h>

#include<mutex>

using namespace std;

//兩個線程並行訪問一個變量

int g_num = 20;//找到或者找不到的標識

mutex g_mutex;

void goA(int num)

{

g_mutex.lock();//你訪問的變量,在你訪問期間,別人訪問不了

for (int i = 0; i < 15; i++)

{

g_num = 10;

std::cout << "線程" << num << " " << g_num << endl;

}

g_mutex.unlock();

}

void goB(int num)

{

for (int i = 0; i < 15; i++)

{

g_num = 11;

std::cout << "線程" << num << " " << g_num << endl;

}

}

void main()

{

thread t1(goA, 1);

thread t2(goB, 2);

t1.join();

t2.join();

std::cin.get();

}

運行結果

RAII語法的模板類lack_guard()

RAII語法實現自動解鎖

//進程的鎖定

#include <iostream>

#include <thread>

#include<mutex>

using namespace std;

//兩個線程並行訪問一個變量

int g_num = 20;//找到或者找不到的標識

mutex g_mutex;

void goA(int num)

{

lock_guard<std::mutex>guard(g_mutex);//自動解鎖

for (int i = 0; i < 15; i++)

{

g_num = 10;

std::cout << "線程" << num << " " << g_num << endl;

}

}

void goB(int num)

{

for (int i = 0; i < 15; i++)

{

g_num = 11;

std::cout << "線程" << num << " " << g_num << endl;

}

}

void main()

{

thread t1(goA, 1);

thread t2(goB, 2);

t1.join();

t2.join();

std::cin.get();

}

精心組織代碼來保護共享數據

用互斥量來保護數據,並不是僅僅在每一個成員函數中都加入一個 std::lock_guard 對象那麽簡單。

一個迷失的指針或引用,將會讓這種保護形同虛設。

函數可能沒在互斥量保護的區域內,存儲著指針或者引用,這樣就很危險。

更危險的是:將保護數據作為一個運行時參數.

如同下面:

#include <iostream>

#include <thread>

#include<mutex>

class some_data

{

public :

int a;

std::string b;

public:

void do_something()

{

std::cout << a;

}

};

class data_wrapper

{

private:

some_data data;

std::mutex m;

public:

template<typename Function>

void process_data(Function func) //通過傳遞的函數將,保護的數據傳遞出去,跳過保護

{

std::lock_guard<std::mutex> l(m);

data.a = 10;

func(data); // 1 傳遞“保護”數據給用戶函數

}

};

some_data* unprotected;

void malicious_function(some_data& protected_data)

{

unprotected = &protected_data;

}

data_wrapper x;

void foo()

{

x.process_data(malicious_function); // 2 傳遞一個惡意函數

unprotected->do_something(); // 3 在無保護的情況下訪問保護數據

}

void main()

{

foo();

std::cin.get();

}

例子中process_data看起來沒有任何問題, std::lock_guard 對數據做了很好的保護,但調用

用戶提供的函數func①,就意味著foo能夠繞過保護機制將函數 malicious_function 傳遞進去

在沒有鎖定互斥量的情況下調用 do_something() 。

這段代碼的問題在於,它根本沒有做到保護:只是將所有可訪問的數據結構代碼標記為互斥。

發現接口內在的條件競爭

因為使用了互斥量或其他機制保護了共享數據,就不必再為條件競爭所擔憂嗎?並不是這樣,你依舊需要確定特定的數據受到了保護。

回想之前雙鏈表的例子,為了能讓線程安全地刪除一個節點,需要確保防止對這三個節點(待刪除的節點及其前後相鄰的節點)的發訪問

如果只對指向每個節點的指針進行訪問保護,那就和沒有使用互斥量一樣,條件競爭仍會發生——整個數據結構和整個刪除操作需要保護,但指針不需要保護。

這種情況下最簡單的解決方案就是使用互斥量來保護整個,盡管對鏈表的個別操作是安全的,但不意味著你就能走出困境;即使在一個很簡單的接口中,依舊可能遇到條件競爭

例如,構建一個類似於 std::stack 結構的棧除了構造函數和swap()以外,需要對 std::stack 提供五個操作:push()一個新元素進棧,pop()一個元素出棧,top()查看棧頂元素,empty()判斷棧是否是空棧,size()了解棧中有多少個元素。

即使修改了top(),使其返回一個拷貝而非引用,對內部數據使用一個互斥量進行保護,不過這個接口仍存在條件競爭。

這個問題不僅存在於基於互斥量實現的接口中,在無鎖實現的接口中,條件競爭依舊會產生。

這是接口的問題,與其實現方式無關。

一個給定操作需要兩個或兩個以上的互斥量時,另一個潛在的問題將出現:死鎖(deadlock)。

與條件競爭完全相反——不同於兩個線程會互相等待,從而什麽都沒做。

死鎖

但線程有對鎖的競爭:一對線程需要對他們所有的互斥量做一些操作,其中每個線程都有一個互斥量,且等待另一個解鎖。

這樣沒有線程能工作,因為他們都在等待對方釋放互斥量。這種情況就是死鎖,它的最大問題就是由個或個以上的互斥量來一個操作。

避免死鎖的一般建議,就是讓兩個互斥量總以相同的順序上鎖:總在互斥量B之前鎖住互斥量A,就永遠不會死鎖。某些情況下是可以這樣用,因為不同的互斥量用於不同的地方。

不過,事情沒那麽簡單,比如:當有多個互斥量保護同一個類的獨立實例時,一個操作對同一個類的兩個不同實例進行數據的交換操作,為了保證數據交換操作的正確性,就要避免數據被並發修改,並確保每個實例上的互斥量都能鎖住自己要保護的區域。

不過,選擇一個固定的順序(例如,實例提供的第一互斥量作為第一個參數,提供的第二個互斥量為第二個參數),可能會適得其反:在參數交換了之後,兩個線程試圖在相同的兩個實例間進行數據交換時,程序又死鎖了!

std::lock ——可以一次性住多個(個以上)的互斥量,並且沒有副作用(死鎖風險)

交換操作中使用 std::lock() 和 std::lock_guard

#include <iostream>

#include <thread>

#include<mutex>

#include <string>

class some_big_object

{

};

void swap(some_big_object& lhs, some_big_object& rhs);

class X

{

private:

some_big_object some_detail;

std::mutex m;

public:

X(some_big_object const& sd) :some_detail(sd) {}

friend void swap(X& lhs, X& rhs)

{

if (&lhs == &rhs)

return;

std::lock(lhs.m, rhs.m); // 1

std::lock_guard<std::mutex> lock_a(lhs.m, std::adopt_lock); // 2

std::lock_guard<std::mutex> lock_b(rhs.m, std::adopt_lock); // 3

swap(lhs.some_detail, rhs.some_detail);

}

};

① 鎖住兩個互斥量,並且兩個 std:lock_guard 實例已經創建好②③,還有一個

互斥量。提供 std::adopt_lock 參數除了表示 std::lock_guard 對象已經上鎖外,還表示現成的鎖,而非嘗試創建新的鎖。

這樣,就能保證在大多數情況下,函數退出時互斥量能被正確的解鎖(保護操作可能會拋出一個異常),也允許使用一個簡單的“return”作為返回。還有,需要註意的是,當使用 std::lock 去鎖lhs.m或rhs.m時,可能會拋出異常;這種情況下,異常會傳播到 std::lock 之外。

當 std::lock 成功的獲取一個互斥量上的鎖,並且當其嘗試從另一個互斥量上再獲取鎖時,就會有異常拋出,第一個鎖也會隨著異常的產生而自動釋放,所以 std::lock 要麽將兩個鎖都鎖住,要不一個都不鎖。

避免死鎖的進階

雖然鎖是產生死鎖的一般原因,但也不排除死鎖出現在其他地方。

無鎖的情況下,僅需要每個 std::thread 對象調用join(),兩個線程就能產生死鎖。這種情況下,沒有線程可以繼續運行,因為他們正在互相等待。這種情況很常見,一個線程會等待另一個線程,其他線程同時也會等待第一個線程結束,所以三個或更多線程的互相等待也會發生死鎖。

為了避免死鎖,這裏的指導意見為:當機會來臨時,不要拱手讓人(don’t wait for another thread if there’s achance it’s waiting for you)。

以下提供一些的指導建議,如何識別死鎖,並消除其他線程的等待。

避免嵌套鎖第一個建議往往是最簡單的:

一個線程已獲得一個鎖時,再別去獲取第二個(don’t acquire alock if you already hold one)。

如果能堅持這個建議,因為每個線程只持有一個鎖,鎖上就不會產生死鎖。即使互斥鎖造成死鎖的最常見原因,也可能會在其他方面受到死鎖的困擾(比如:線程間的互相等待)。

當你需要獲取多個鎖,使用一個 std::lock 來做這件事(對獲取鎖的操作上鎖),避免產生死鎖。

使用固定順序獲取鎖當硬性條件要求你獲取兩個以上(包括兩個)的鎖,並且不能使用 std::lock 單獨操作來獲取它們;那麽最好在每個線程上,用固定的順序獲取它們獲取它們(鎖)。

獲取兩個互斥量時,避免死鎖的方法:關鍵是如何在線程之間,以一致性的順序獲取鎖。一些情況下,這種方式相對簡單。

unique_lock——靈活的鎖

unique_lock 介紹

std::unqiue_lock 通過對不變量的放松(by relaxing the invariants),會比 std:lock_guard 更加靈活;一個 std::unique_lock 實現不會總是擁有與互斥量相關的數據類型。

首先,就像你能將 std::adopt_lock 作為第二個參數傳入到構造函數,對互斥所進行管理,你也可以把 std::defer_lock 作為第二個參數傳遞進去,為了表明互斥量在結構上應該保持解鎖狀態。

這樣,就可以被後面調用lock()函數的 std::unique_lock 對象(不是互斥量)所獲取,或傳遞 std::unique_lock 對象本身到 std::lock() 中。清單3.6可以很容易被改寫為清單3.9中的代

碼,使用 std::unique_lock 和 std::defer_lock ,而非 std::lock_guard 和 std::adopt_lock 。

代碼長度相同,且幾乎等價,唯一不同的就是: std::unique_lock 會占用比較多的空間,並且比 std::lock_guard 運行的稍慢一些。保證靈活性是要付出代價的,這個代價就允許 std::unique_lock 實例不攜帶互斥量:該信息已被存儲,且已被更新。

unique_lock 構造函數

default 構造函數

新創建的 unique_lock 對象不管理任何 Mutex 對象。

locking 初始化

新創建的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.lock() 對 Mutex 對象進行上鎖,如果此時另外某個 unique_lock 對象已經管理了該 Mutex 對象 m,則當前線程將會被阻塞。

try-locking 初始化

新創建的 unique_lock 對象管理 Mutex 對象 m,並嘗試調用 m.try_lock() 對 Mutex 對象進行上鎖,但如果上鎖不成功,並不會阻塞當前線程。

deferred 初始化

新創建的 unique_lock 對象管理 Mutex 對象 m,但是在初始化的時候並不鎖住 Mutex 對象。 m 應該是一個沒有當前線程鎖住的 Mutex 對象。

adopting 初始化

新創建的 unique_lock 對象管理 Mutex 對象 m, m 應該是一個已經被當前線程鎖住的 Mutex 對象。(並且當前新創建的 unique_lock 對象擁有對鎖(Lock)的所有權)。

locking 一段時間(duration)

新創建的 unique_lock 對象管理 Mutex 對象 m,並試圖通過調用 m.try_lock_for(rel_time) 來鎖住 Mutex 對象一段時間(rel_time)。

locking 直到某個時間點(time point)

新創建的 unique_lock 對象管理 Mutex 對象m,並試圖通過調用 m.try_lock_until(abs_time) 來在某個時間點(abs_time)之前鎖住 Mutex 對像。

copy [deleted]

unique_lock 對象不能被拷貝構造。

移動(move)構造

新創建的 unique_lock 對象獲得了由 x 所管理的 Mutex 對象的所有權(包括當前 Mutex 的狀態)。調用 move 構造之後, x 對象如同通過默認構造函數所創建的,就不再管理任何 Mutex 對象了。

unique_lock 的構造函數參考

#include <iostream> // std::cout

#include <thread> // std::thread

#include <mutex> // std::mutex, std::lock, std::unique_lock

using namespace std;

// std::adopt_lock, std::defer_lock

std::mutex foo, bar;

void task_a() {

std::lock(foo, bar); // simultaneous lock (prevents deadlock)

std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);

std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);

std::cout << "task a\n";

// (unlocked automatically on destruction of lck1 and lck2)

}

void task_b() {

// foo.lock(); bar.lock(); // replaced by:

std::unique_lock<std::mutex> lck1, lck2;

lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);

lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);

std::lock(lck1, lck2); // simultaneous lock (prevents deadlock)

std::cout << "task b\n";

// (unlocked automatically on destruction of lck1 and lck2)

}

int main()

{

std::thread th1(task_a);

std::thread th2(task_b);

th1.join();

th2.join();

cin.get();

return 0;

}

unique_lock 移動(move assign)賦值操作

移動情況是鎖的所有權需要從一個域轉到另一個

移動賦值(move assignment)之後,由 A所管理的 Mutex 對象及其狀態將會被新的 std::unique_lock 對象取代。

如果被賦值的對象之前已經獲得了它所管理的 Mutex 對象的鎖,則在移動賦值(move assignment)之前會調用 unlock 函數釋放它所占有的鎖

調用移動賦值(move assignment)之後, A對象如同通過默認構造函數所創建的,也就不再管理任何 Mutex 對象了

例如

#include <iostream> // std::cout

#include <thread> // std::thread

#include <mutex> // std::mutex, std::unique_lock

#include<string>

std::mutex mtx; // mutex for critical section

void print_fifty(std::string c) {

std::unique_lock<std::mutex> lck; // default-constructed

lck = std::unique_lock<std::mutex>(mtx); // move-assigned

std::cout << c;

std::cout << ‘\n‘;

}

int main()

{

std::thread th1(print_fifty, "Move OK !");

th1.join();

std::cin.get();

return 0;

}

unique_lock 主要成員函數

1、 上鎖/解鎖操作:locktry_locktry_lock_fortry_lock_until unlock

2、 修改操作:移動賦值(move assignment),交換(swap)與另一個 std::unique_lock 對象交換它們所管理的 Mutex 對象的所有權),釋放(release)(返回指向它所管理的 Mutex 對象的指針,並釋放所有權)

3、 獲取屬性操作:owns_lock(返回當前 std::unique_lock 對象是否獲得了鎖)、operator bool()(與 owns_lock 功能相同,返回當前 std::unique_lock 對象是否獲得了鎖)、mutex(返回當前 std::unique_lock 對象所管理的 Mutex 對象的指針)

std::unique_lock::lock

上鎖操作,調用它所管理的 Mutex 對象的 lock 函數。如果在調用 Mutex 對象的 lock 函數時該 Mutex 對象已被另一線程鎖住,則當前線程會被阻塞,直到它獲得了鎖。

該函數返回時,當前的 unique_lock 對象便擁有了它所管理的 Mutex 對象的鎖。如果上鎖操作失敗,則拋出 system_error 異常。

// unique_lock::lock/unlock

#include <iostream> // std::cout

#include <thread> // std::thread

#include <mutex> // std::mutex, std::unique_lock, std::defer_lock

std::mutex mtx; // mutex for critical section

void print_thread_id(int id) {

std::unique_lock<std::mutex> lck(mtx, std::defer_lock);

// critical section (exclusive access to std::cout signaled by locking lck):

lck.lock();

std::cout << "thread #" << id << ‘\n‘;

lck.unlock();

}

int main()

{

std::thread threads[10];

// spawn 10 threads:

for (int i = 0; i<10; ++i)

threads[i] = std::thread(print_thread_id, i + 1);

for (auto& th : threads) th.join();

std::cin.get();

return 0;

}

std::unique_lock::try_lock

上鎖操作,調用它所管理的 Mutex 對象的 try_lock 函數,如果上鎖成功,則返回 true,否則返回 false。

#include <iostream> // std::cout

#include <vector> // std::vector

#include <thread> // std::thread

#include <mutex> // std::mutex, std::unique_lock, std::defer_lock

std::mutex mtx; // mutex for critical section

void print_star() {

std::unique_lock<std::mutex> lck(mtx, std::defer_lock);

// print ‘*‘ if successfully locked, ‘#‘ otherwise:

if (lck.try_lock())

std::cout << ‘*‘;

else

std::cout << ‘#‘;

}

int main()

{

std::vector<std::thread> threads;

for (int i = 0; i<500; ++i)

threads.emplace_back(print_star);

for (auto& x : threads) x.join();

std::cin.get();

return 0;

}

std::unique_lock::try_lock_for

上鎖操作,調用它所管理的 Mutex 對象的 try_lock_for 函數,如果上鎖成功,則返回 true,否則返回 false。

#include <iostream> // std::cout

#include <chrono> // std::chrono::milliseconds

#include <thread> // std::thread

#include <mutex> // std::timed_mutex, std::unique_lock, std::defer_lock

std::timed_mutex mtx;

void fireworks() {

std::unique_lock<std::timed_mutex> lck(mtx, std::defer_lock);

// waiting to get a lock: each thread prints "-" every 200ms:

while (!lck.try_lock_for(std::chrono::milliseconds(200))) {

std::cout << "-";

}

// got a lock! - wait for 1s, then this thread prints "*"

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

std::cout << "*\n";

}

int main()

{

std::thread threads[10];

// spawn 10 threads:

for (int i = 0; i<10; ++i)

threads[i] = std::thread(fireworks);

for (auto& th : threads) th.join();

std::cin.get();

return 0;

}

std::unique_lock::owns_lock

返回當前 std::unique_lock 對象是否獲得了鎖。

#include <iostream>       // std::cout
#include <vector>         // std::vector
#include <thread>         // std::thread
#include <mutex>          // std::mutex, std::unique_lock, std::try_to_lock
 
std::mutex mtx;           // mutex for critical section
 
void print_star () {
  std::unique_lock<std::mutex> lck(mtx,std::try_to_lock);
  // print ‘*‘ if successfully locked, ‘x‘ otherwise: 
  if (lck.owns_lock())
    std::cout << ‘*‘;
  else                    
    std::cout << ‘x‘;
}
 
int main ()
{
  std::vector<std::thread> threads;
  for (int i=0; i<500; ++i)
    threads.emplace_back(print_star);
 
  for (auto& x: threads) x.join();
 
  return 0;
}

同步並發操作

當你不僅想要保護數據,還想對單獨的線程進行同步。例如,在第一個線程完成前,可能需要等待另一個線程執行完成。

通常情況下,線程會等待一個特定事件的發生,或者等待某一條件達成(為true)。這可能需要定期檢查“任務完成”標識,或將類似的東西放到共享數據中,但這與理想情況還是差很多。

像這種情況就需要在線程中進行同步,C++標準庫提供了一些工具可用於同步操作,形式上表現為

條件變量(condition variables)和期望(futures)。

等待一個事件或其他條件三種方式

當一個程等待一個程完成任務時,它會有很多選擇

一、它可以持續的檢查共享數據標誌(用於做保護工作的互斥量),直

到另一線程完成工作時對這個標誌進行重設。

不過,就是一種浪費:線程消耗寶貴的執行時間持續的檢查對應標誌,並且當互斥量被等待線程上鎖後,其他線程就沒有辦法獲取鎖,這樣線程就會持續等待。因為以上方式對等待線程限制資源,並且在完成時阻礙對標識的設置。

二、個選擇是在等待線程在檢查間隙,使用 std::this_thread::sleep_for() 進行周期性的間歇

例如

bool flag;

std::mutex m;

void wait_for_flag()

{

std::unique_lock<std::mutex> lk(m);

while(!flag)

{

lk.unlock(); // 1 解鎖互斥量

std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 2 休眠100ms

lk.lock(); // 3 再鎖互斥量

}

}

這個實現就進步很多,因為當線程休眠時,線程沒有浪費執行時間,但是很難確定正確的休

眠時間。太短的休眠和沒有休眠一樣,都會浪費執行時間;太長的休眠時間,可能會讓任務

等待線程醒來。休眠時間過長是很少見的情況,因為這會直接影響到程序的行為,當在高節

奏遊戲(fast-paced game)中,它意味著丟幀,或在一個實時應用中超越了一個時間片。

三、選擇(也是優先的選擇)是,使用C++標準庫提供的工具去等待事件的發生。

通過另一線程觸發等待事件的機制是最基本的喚醒方式(例如:流水線上存在額外的任務時),這種機制就稱為“條件變量”(condition variable)。

從概念上來說,一個條件變量會與多個事件或其他條件相關,並且一個或多個線程會等待條件的達成。

當某些線程被終止時,為了喚醒等待線程(允許等待線程繼續執行)終止的線程將會向等待著的線程廣播“條件達成”的信息。

等待條件達成

C++標準庫對條件變量有兩套實

現: std::condition_variable 和 std::condition_variable_any 。

這兩個實現都包含在 <condition_variable> 頭文件的聲明中。

兩者都需要與一個互斥量一起才能工作(互斥量是為了同步);前者僅限於std::mutex 一起工作,而後者可以和任何滿足最低標準的互斥量一起工作,從而加上了_any的後綴。

因為 std::condition_variable_any 更加通用,這就可能從體積、性能,以及系統資源的使用方面產生額外的開銷,所以 std::condition_variable 一般作為首選的類型,當對靈活性有硬性要求時,我們才會去考慮 std::condition_variable_any 。

所以,如何使用 std::condition_variable 去處理之前提到的情況——當有數據需要處理時,

如何喚醒休眠中的線程對其進行處理?以下清單展示了一種使用條件變量做喚醒的方式。

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable
 
std::mutex mtx; // 全局互斥鎖.
std::condition_variable cv; // 全局條件變量.
bool ready = false; // 全局標誌位.
 
void do_print_id(int id)
{
    std::unique_lock <std::mutex> lck(mtx);
    while (!ready) // 如果標誌位不為 true, 則等待...
        cv.wait(lck); // 當前線程被阻塞, 當全局標誌位變為 true 之後,
    // 線程被喚醒, 繼續往下執行打印線程編號id.
    std::cout << "thread " << id << ‘\n‘;
}
 
void go()
{
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 設置全局標誌位為 true.
    cv.notify_all(); // 喚醒所有線程.
}
 
int main()
{
    std::thread threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i)
        threads[i] = std::thread(do_print_id, i);
 
    std::cout << "10 threads ready to race...\n";
    go(); // go!
 
  for (auto & th:threads)
        th.join();
 
    return 0;
}

std::condition_variable 構造函數

default (1)

condition_variable();

copy [deleted] (2)

condition_variable (const condition_variable&) = delete;

std::condition_variable 的拷貝構造函數被禁用,只提供了默認構造函數。

std::condition_variable::wait() 介紹

unconditional (1)

void wait (unique_lock<mutex>& lck);

predicate (2)

template <class Predicate>

void wait (unique_lock<mutex>& lck, Predicate pred);

std::condition_variable 提供了兩種 wait() 函數。

當前線程調用 wait() 後將被阻塞(此時當前線程應該獲得了鎖(mutex),不妨設獲得鎖 lck),直到另外某個線程調用 notify_* 喚醒了當前線程。

在線程被阻塞時,該函數會自動調用 lck.unlock() 釋放鎖,使得其他被阻塞在鎖競爭上的線程得以繼續執行。

另外,一旦當前線程獲得通知(notified,通常是另外某個線程調用 notify_* 喚醒了當前線程)wait() 函數也是自動調用 lck.lock(),使得 lck 的狀態和 wait 函數被調用時相同。

在第二種情況下(即設置了 Predicate),只有當 pred 條件為 false 時調用 wait() 才會阻塞當前線程,並且在收到其他線程的通知後只有當 pred true 時才會被解除阻塞.

#include <iostream> // std::cout

#include <thread> // std::thread, std::this_thread::yield

#include <mutex> // std::mutex, std::unique_lock

#include <condition_variable> // std::condition_variable

std::mutex mtx;

std::condition_variable cv;

int cargo = 0;

bool shipment_available()

{

return cargo != 0;

}

// 消費者線程.

void consume(int n)

{

for (int i = 0; i < n; ++i) {

std::unique_lock <std::mutex> lck(mtx);

cv.wait(lck, shipment_available);

std::cout << cargo << ‘\n‘;

cargo = 0;

}

}

int main()

{

std::thread consumer_thread(consume, 10); // 消費者線程.

// 主線程為生產者線程, 生產 10 個物品.

for (int i = 0; i < 10; ++i) {

while (shipment_available())

std::this_thread::yield(); //線程調用yield()方法後,表明自己做的事已經完成,讓出自己的cpu時間給其他線程使用

std::unique_lock <std::mutex> lck(mtx);

cargo = i + 1;

cv.notify_one();

}

consumer_thread.join();

std::cin.get();

return 0;

}

c++11簡單的線程