1. 程式人生 > >C++11中的並發

C++11中的並發

type類 ise 動作 tex args 內部存儲 swa unlock 再次

在 C++98 的時代,C++標準並沒有包含多線程的支持,人們只能直接調用操作系統提供的 SDK API 來編寫多線程程序,不同的操作系統提供的 SDK API 以及線程控制能力不盡相同。到了 C++11,終於在標準之中加入了正式的多線程的支持,從而我們可以使用標準形式的類來創建與執行線程,也使得我們可以使用標準形式的鎖、原子操作、線程本地存儲 (TLS) 等來進行復雜的各種模式的多線程編程,而且,C++11 還提供了一些高級概念,比如 promise/future,packaged_task,async 等以簡化某些模式的多線程編程。

一:thread

頭文件<thread>中提供了std::thread 對線程進行封裝。C++11 所定義的線程是和操作系的線程是一一對應的,也就是說我們生成的線程都是直接接受操作系統的調度的,通過操作系統的相關命令(比如 ps -M 命令)是可以看到的,一個進程所能創建的線程數目以及一個操作系統所能創建的總的線程數目等都由運行時操作系統限定。

std::thread類如果在構造函數中指明了線程入口函數的話,則從創建好thread對象那一刻起,線程就開始運行了,此時std::thread對象就表示一個正在執行的線程了。但是std::thread創建的線程,線程入口函數的返回值被忽略了,如果線程中拋出了異常,則會直接調用std::terminate結束進程。

std::thread也可以與底層線程不關聯,比如使用默認構造函數創建的std::thread對象、被移動了的std::thread對象、detach或join過的std::thread對象。沒有兩個std::thread對象會表示同一個線程,std::thread不支持復制構造或復制賦值的,只能移動構造或移動賦值。

std::thread有可能拋出std::system_error異常,表示線程無法啟動。該異常表示要麽是std::errc::resource_unavailable_try_again,要麽就是底層實現創建線程時發生了問題。

創建std::thread同時指明線程入口函數的構造函數是:

template< class Function, class... Args > 
explicit thread( Function&& f, Args&&... args );

調用線程入口函數f時,參數是按值復制或移動傳遞的。因此如果需要傳遞一個引用給入口函數的話,需要使用std::ref或std::cref封裝。

std::thread中定義了native_handle_type類型,它具體是什麽類型取決於底層的線程庫。 native_handle_type是連接 std::thread 和操作系統線程庫之間的橋梁,在 g++(libstdc++) for Linux 裏面,native_handle_type 其實就是 pthread 裏面的 pthread_t 類型,當 std::thread 類的功能不能滿足我們的要求的時候(比如改變某個線程的優先級),可以通過 std::thread 類實例的 native_handle() 返回值作為參數來調用相關的 pthread 函數達到目的。

std::thread中還定義了內部id類std::thread::id,id對象表示std::thread對象的唯一標識,如果std::thread目前沒有關聯的線程,則其id值為一個默認值std::thread::id()。std::thread定義了get_id成員函數返回其id值。

std::thread對象如果有關聯的活動線程的話,則稱其為joinable的,成員函數joinable()可用於查看std::thread是否為joinable的。如果std::thread是joinable的,則其get_id() != std::thread::id()。如果std::thread其底層線程已經執行完,但是尚未被join,則認為其依然是joinable的。下面幾種情況,std::thread::joinable會返回false:std::thread是通過默認構造函數創建(未指定線程入口函數);std::thread被移動了(復制或賦值給其他std::thread);std::thread調用了detach之後;std::thread調用了join之後。

std::thread成員函數detach()斷開std::thread對象與底層線程的關聯,底層線程退出後,其資源自動free掉。如果當前std::thread對象沒有關聯底層線程,則調用detach會拋出std::system_error異常。調用了detach之後,就不能再該std::thread對象上調用join了;

std::thread成員函數join,能阻塞當前線程的執行,直到執行join的std::thread其底層線程執行完成。針對同一個std::thread對象在多個另外線程中執行join是未定義行為;如果發生錯誤則會拋出std::system_error異常,比如若std::thread的joinable為false,則是invalid_argument錯誤;如果線程內部自己調用join,則是resource_deadlock_would_occur錯誤。

std::thread成員函數swap可以將當前std::thread與其他std::thread對象的底層句柄進行互換。相當於二者互換了身份,各自掌握對方的線程。

如果std::thread執行析構時,其依然關聯著底層線程(也就是joinable返回true),則會調用std::terminate。

如果將std::thread對象A移動賦值給對象B,則若B依然關聯底層線程(joinable返回true)的話,則會調用std::terminate。移動賦值之後,B就掌握了A的線程,而A就成了默認構造狀態。

std::thread還有一個靜態成員函數hardware_concurrency,用於返回當前底層實現支持的最大線程並發數,其值只能用於參考。

除了定義std::thread類,<thread>頭文件中還定義了命名空間this_thread表示當前線程,並在其中定義了4個輔助函數:yield、get_id、sleep_for、sleep_until。這四個函數一般是在線程執行函數中調用。

std::this_thread::yield用於提示底層實現調度執行其他線程。它的具體實現依賴於底層實現,特別是操作系統當前使用的調度策略。比如對於實時先入先出調度策略(如linux中的SCHED_FIFO)而言,該函數會掛起當前線程,將其插入到相同優先級的就緒隊列末尾(如果當前沒有相同優先級的其他線程,則yield無效果);

std::this_thread::get_id用戶獲取當前線程的id;

std::this_thread::sleep_for和std::this_thread::sleep_until用於將當前線程的執行休眠一段時間;

二:鎖

頭文件<mutex>中定義了std::mutex類,用於表示非遞歸鎖。如果某個線程已經擁有了std::mutex,仍然調用std::mutex::lock的話,是未定義行為;如果std::mutex析構時,仍有任一線程擁有它,則是未定義行為;如果某線程結束時,依然擁有某個std::mutex,則是未定義行為。std::mutex不可復制,不可移動,它只有一個默認構造函數。

std::mutex定義了native_handle_type類型,類似於std::thread::native_handle_type,它取決於具體的底層實現,成員函數std::mutex::native_handle用於返回該鎖的底層實現句柄;

std::mutex定義了lock、try_lock和unlock成員函數函數。一般情況下,不直接調用這些函數,而是使用std::unique_lock或std::lock_guard等以RAII的方式管理鎖。

<mutex>中還定義了std::timed_mutex,它類似於std::mutex,只不過另外提供了try_lock_for和try_lock_until函數,這倆函數要麽在能立即持有鎖時返回true,要麽最多阻塞一段時間後返回false。

<mutex>中還定義了recursive_mutex、recursive_timed_mutex兩種遞歸鎖,以及shared_mutex、shared_timed_mutex兩種共享鎖(讀寫鎖)。

<mutex>中定義了lock_guard,用於對鎖進行RAII式的封裝,創建lock_guard時會持有鎖,析構lock_guard時會釋放鎖。lock_guard不可復制。

int g_i = 0;
std::mutex g_i_mutex;  // protects g_i
 
void safe_increment() {
    std::lock_guard<std::mutex> lock(g_i_mutex);
    ++g_i;
 
    std::cout << std::this_thread::get_id() << ": " << g_i << \n;
 
    // g_i_mutex is automatically released when lock goes out of scope
}

std::thread t1(safe_increment);
std::thread t2(safe_increment);

t1.join();
t2.join();

<mutex>中還提供了defer_lock_t、try_to_lock_t、adopt_lock_t以及unique_lock、shared_lock,結合std::lock或std::try_lock函數,可以方便的對上面的鎖進行封裝,最常見的就是封裝多個鎖,以避免死鎖的發生。具體可參考en.cppreference.com中的例子。

<mutex>中還提供了std::call_once函數,保證某個函數即使在多個線程中同時調用時,也只被調用一次。

template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

如果調用call_once時flag已經被設置,說明函數f已經被調用過了,這種情況下call_once直接返回;如果flag未被設置,則調用call_once時會直接調用std?::?forward<Callable>(f),並向其傳遞std?::?forward<Args>(args)...參數。如果此時f內拋出了異常,則異常會傳遞給call_once的調用者,並且不會設置flag,這樣可以使得後續使用同一標誌調用call_once時能繼續調用f函數。

std::once_flag flag1, flag2;
 
void simple_do_once() {
    std::call_once(flag1, [](){ std::cout << "Simple example: called once\n"; });
}
 
void may_throw_function(bool do_throw) {
  if (do_throw) {
    std::cout << "throw: call_once will retry\n"; // this may appear more than once
    throw std::exception();
  }
  std::cout << "Didn‘t throw, call_once will not attempt again\n"; // guaranteed once
}
 
void do_once(bool do_throw) {
  try {
    std::call_once(flag2, may_throw_function, do_throw);
  }
  catch (...) {
  }
}
 
std::thread st1(simple_do_once);
std::thread st2(simple_do_once);
std::thread st3(simple_do_once);
std::thread st4(simple_do_once);
st1.join();
st2.join();
st3.join();
st4.join();

std::thread t1(do_once, true);
std::thread t2(do_once, true);
std::thread t3(do_once, false);
std::thread t4(do_once, true);
t1.join();
t2.join();
t3.join();
t4.join();

上面代碼的結果是:

Simple example: called once
throw: call_once will retry
throw: call_once will retry
Didnt throw, call_once will not attempt again

三:條件變量

<condition_variable>頭文件中提供了condition_variable類以對條件變量進行支持。條件變量也是一種同步原語,它可以使多個線程阻塞,直到另一個線程修改了某共享變量並且對條件變量進行通知之後,才解除阻塞。

修改共享變量的線程需要:持有某種鎖(一般是通過std::lock_guard),在持有鎖的情況下修改變量,在std::condition_variable上執行notify_one或notify_all(執行通知時不需要持有鎖)。即使共享變量是原子的,也需要在持有鎖的情況下進行修改,以便能夠正確的通知到等待條件變量的線程。

等待條件變量std::condition_variable的線程需要:持有一個std::unique_lock<std::mutex>(該鎖也是修改共享變量線程需要持有的鎖);執行wait、wait_for或wait_until,這些等待操作會原子的釋放鎖並掛起線程;當條件變量得到通知時,或超時時間到時,或者發生虛假喚醒時,線程醒來並且原子性的獲取鎖。此時線程應該檢查條件,如果條件未滿足(虛假喚醒、超時時間到時)繼續等待。

std::condition_variable只能與std::unique_lock<std::mutex>一起使用,這種約束使得在一些平臺上能夠獲得最大效率。<condition_variable>中提供了std::condition_variable_any條件變量,可以與任意類型的鎖(如std::shared_lock)一起工作。

std::condition_variable不能進行復制構造、移動構造,也不能復制賦值或移動賦值。

類似於std::thread和std::mutex,std::condition_variable也提供了native_handle_type類型和native_handle函數,用於返回條件變量底層實現的句柄。

std::condition_variable的wait成員函數:

void wait( std::unique_lock<std::mutex>& lock );

template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );

wait會阻塞當前線程,直到其他線程在相同條件變量上調用了nofify_one或notify_all,或者直到發生了虛假喚醒。調用wait之前需要先鎖住lock,調用wait會原子的釋放lock,阻塞當前線程,將當前線程添加到等待條件變量(*this)的線程列表中。當在條件變量上調用nofify_one或notify_all,或是發生虛假喚醒時,當前線程解除阻塞,並且再次鎖住lock。

第二個重載實際上相當於:

while (!pred()) {
    wait(lock);
}

註意lock只能是std::unique_lock<std::mutex>。如果lock沒有鎖住就調用wait,這是未定義行為;如果鎖住的lock與其他等待相同條件變量的線程使用lock不是同一個lock,這也是未定義行為。如果在notify_one之後才調用wait,則該wait不會喚醒;

std::condition_variable cv;
std::mutex cv_m; // This mutex is used for three purposes:
                 // 1) to synchronize accesses to i
                 // 2) to synchronize accesses to std::cerr
                 // 3) for the condition variable cv
int i = 0;
 
void waits() {
    std::unique_lock<std::mutex> lk(cv_m);
    std::cerr << "Waiting... \n";
    cv.wait(lk, []{return i == 1;});
    std::cerr << "...finished waiting. i == 1\n";
}
 
void signals() {
    std::this_thread::sleep_for(std::chrono::seconds(1));
    {
        std::lock_guard<std::mutex> lk(cv_m);
        std::cerr << "Notifying...\n";
    }
    cv.notify_all();
 
    std::this_thread::sleep_for(std::chrono::seconds(1));
 
    {
        std::lock_guard<std::mutex> lk(cv_m);
        i = 1;
        std::cerr << "Notifying again...\n";
    }
    cv.notify_all();
}
 
int main() {
    std::thread t1(waits), t2(waits), t3(waits), t4(signals);
    t1.join(); 
    t2.join(); 
    t3.join();
    t4.join();
}

結果是:

Waiting...
Waiting...
Waiting...
Notifying...
Notifying again...
...finished waiting. i == 1
...finished waiting. i == 1
...finished waiting. i == 1

wait_for和wait_until類似於wait,只不過它們能指定等待的時間。

如果當前有任一線程等待條件變量,則notify_one函數會喚醒其中的一個。註意,調用notify_one的線程沒必要鎖住等待條件變量線程使用的lock,實際上這麽做有時候是有害無益的,因為這會導致被喚醒的線程重新阻塞,直到發起notify的線程釋放該lock。不過在需要精確調度事件的場景中,這樣做也是有必要的。比如如果等待線程在條件滿足之後會結束進程,這就會導致條件變量被析構,如果在解鎖之後notify之前發生了虛假喚醒,將會導致在已析構的條件變量上調用notify。

notify_all將會喚醒所有等待在條件變量上的線程。

std::condition_variable只能和std::unique_lock<std::mutex>一起工作,如果需要能夠使用其他鎖,則可以使用std::condition_variable_any。該類是不可復制,也不可移動的。如果std::condition_variable_any也使用std::unique_lock<std::mutex>的話,其效率沒有std::condition_variable高。

<condition_variable>中還提供了std::notify_all_at_thread_exit函數:

void notify_all_at_thread_exit( std::condition_variable& cond,
                                std::unique_lock<std::mutex> lk );

該函數用於通知其他線程當前線程已經結束。該函數的操作是:將已經獲取的鎖lk的擁有權轉移到內部存儲,然後當前線程退出時,調用條件變量的notify_all:

lk.unlock();
cond.notify_all();

lk會一直鎖住,直到線程退出。該函數一般用於detached線程退出時調用。

四:Futures

std::thread有個缺點,就是無法獲取調用函數的返回值或者捕獲其拋出的異常。因此,<future>提供了一系列用於獲取異步線程的返回值或者其拋出的異常的機制。這些返回值通過共享狀態(shared state)進行通信,線程將返回值或拋出的異常存儲在共享狀態中,然後其他線程通過操作與這些共享狀態關聯的std::future或std::shared_future來獲取內部的值。

std::promise提供了這樣一種機制:它關聯一個共享狀態(shared state),該共享狀態中保存了一些狀態信息,並用於存儲值或異常。該值或異常之後的某個時刻會被std::promise創建的std::future對象異步的取出。std::promise的使用是一次性的,而且它不支持復制,只支持移動,當對象A移動構造/賦值給對象B後,A就沒有關聯的共享狀態了。

std::promise對共享狀態可以做三件事:

標記為ready,std::promise將值或異常存儲到共享狀態中,將狀態標記為ready。這樣就解除了其他線程在關聯該共享狀態上的期值上的阻塞;

release,std::promise放棄與共享狀態的關聯。如果當前是共享狀態最後的引用,則共享狀態被銷毀。如果該共享狀態是std::async創建的,並且尚未ready,則該動作會阻塞;

abandon,std::promise將以std::future_errc::broken_promise為錯誤碼的std::future_error的異常存儲在共享狀態中,然後將共享狀態標記為ready後release。可以這樣理解,promise意思是許諾,許諾將來會把值存儲在共享狀態中,現在它放棄了這個諾言,因而是broken_promise。

如果當前關聯的共享狀態是ready的,std::promise的析構函數會release共享狀態;如果共享狀態不是ready的,則析構函數以std::future_errc::broken_promise為錯誤碼將異常std::future_error存儲到共享狀態中,並將其標記為ready後release。

std::promise的set_value是原子的將值存儲到共享狀態中,並且將其置為ready;set_exception用於在共享狀態中存儲異常;而set_value_at_thread_exit將值存儲到共享狀態後並不會立即將其置為ready,而是在線程退出時才將其置為ready;set_exception_at_thread_exit的行為類似。set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit的行為,就好像他們在更新promise狀態時獲取一個鎖。如果當前沒有關聯的共享狀態,或者共享狀態中已經存儲了值或異常,則這些操作就會拋出異常。

std::promise處於promise-future通信信道的寫端,在共享狀態中寫入了值之後,任何在該共享狀態上等待的操作(比如std::future::get)就會成功返回。std::promise的get_future成員函數,返回一個關聯相同共享狀態的std::future對象。如果當前沒有關聯的共享狀態,或是get_future已經被調用過一次了,則會拋出異常。如果需要在promise-future傳輸信道上有多個讀端,則可以使用std::future::share。調用get_future並不會和set_value、set_exception、set_value_at_thread_exit或set_exception_at_thread_exit產生data race。

std::future用於訪問異步操作的結果,異步操作(通過std::async,std::package_task或std::promise創建)可以提供給異步操作的創建者一個std::future,然後創建者使用std::future的各種方法訪問、等待,或從std::future中取值。當異步操作尚未提供一個值時,這些操作可能會導致阻塞。

std::future的valid成員函數可以判斷該std::future對象是否關聯了共享狀態。默認構造的std::future就沒有關聯的共享狀態,因而其valid返回false;std::future不支持復制(構造或賦值),只支持移動(構造或賦值)。當移動A到B時,A的valid就會返回false;

std::future的析構函數會release共享狀態,如果當前是關聯到共享狀態最後一個引用,則該共享狀態被銷毀;std::future斷開與共享狀態的關聯。如果當前std::future由std::async返回,且共享狀態尚未ready,且當前std::future是該共享狀態的最後引用,則析構時會阻塞。

std::future的get操作將阻塞到future能夠從共享狀態中取到值。如果當前未關聯共享狀態,則調用get是未定義行為;調用get後,共享狀態被release了,且valid會返回false。如果是常規的future,如std::future<int>,則get從共享狀態中取到的值,相當於執行了std::move操作;如果模板實參為T&,如std::future<int&>,則get返回的是共享狀態中保存的值的引用;如果共享狀態中保存的是異常的話,則調用get將會拋出該異常;

std::future的wait,wait_for和wait_until用於從共享狀態中等待其變為ready;

std::future的share操作將共享狀態轉移給std::shared_future,調用完share之後,當前future的valid返回false。多個std::shared_future可以關聯同一個共享狀態,std::shared_future支持復制,多線程通過自己的shared_future副本同時訪問同一個共享狀態是安全的。

<future>提供的std::packaged_task,可以封裝任意可調用對象,而且其本身也具有operator()成員函數,所以它可以作為std::thread的調用對象,與普通可調用對象不同的是,std::packaged_task關聯了共享狀態,它的返回值或者拋出的異常可以存儲到該共享狀態中。該值或異常就可以通過get_future函數(get_future只能調用一次)返回的std::future進行訪問。

std::packaged_task不可復制,只能移動;類似於std::promise,如果共享狀態在ready之前,std::packaged_task被析構了,則該共享狀態中會存儲一個以std::future_errc::broken_promise為錯誤碼的std::future_error異常。

<future>提供的std::async模板函數可以異步的調用f,返回一個std::future,用於獲取f的返回值,或者其拋出的異常。std::async調用f的方式有兩種,一種是std::launch::async,表示啟動一個新線程執行f;另一種是std::launch::deferred,表示在獲取f返回值(通過future)時,才同步的執行f(延遲計算)。

https://en.cppreference.com/w/cpp/thread

https://www.ibm.com/developerworks/cn/linux/1412_zhupx_thread/index.html

C++11中的並發