1. 程式人生 > >C++11中多線程庫

C++11中多線程庫

標準 value 生命周期 通過 死鎖 strong () 四種 ...

一、linux 線程同步

線程是在操作系統層面支持的,所以多線程的學習建議還是先找一本linux系統編程類的書,了解linux提供多線程的API。完全完全使用系統調用編寫多線程程序是痛苦,現在也有很多封裝好的多線程庫,但是了解多線程系統對學習編寫多線程程序非常有好處。總的來說linux提供了四類系統用於多程序程序,分別線程的創建、銷毀(thread),用於線程同步的(互斥量(mutex)、條件量(cond),信號量(sem))。

  • 互斥量通過鎖的機制實現線程間的同步。互斥量是一種特殊的變量,可以對它進行加鎖、解鎖操作。通過互斥量可以保證同一時刻只有一個線程訪問線程之間共享的資源。(互斥量對應的操作的加鎖與解鎖)
  • 條件變量的使用需要結合互斥量、條件變量、條件。線程查看條件時需要用互斥量加鎖,當條件滿足線程執行某種操作,條件不滿足時,條件變量(wait)操作自動阻塞該線程。當有另外的線程修改了條件時,會激活阻塞的線程,阻塞線程重新評價條件。條件的檢測必須在互斥所的保護下進行。條件變量對應的操作是wait,try_wait。
  • 互斥量只有鎖和解鎖兩種狀態,信號量可以理解為有多個狀態的特殊的變量,有等待信號量(wait)和釋放(release)兩種操作,分別對應信號量減1和加1。

  參考:Linux 線程同步的三種方法

二、c++11線程同步

  c++11從語言層面支持多線程操作,當然本質上是對系統調用的封裝,但是極大的方便了開發人員。

1、<thread>

線程類thread,使用RAII 風格管理線程的創建和銷毀。創建線程時傳入線程要執行的代碼段(函數、lamda表達式)和參數,thread析構函數會自動銷毀線程。

2、<mutex>

a.操作系統提供mutex可以設置屬性,c++11根據mutext的屬性提供四種的互斥量,分別是

  • std::mutex,最常用,普遍的互斥量(默認屬性), 
  • std::recursive_mutex ,允許同一線程使用recursive_mutext多次加鎖,然後使用相同次數的解鎖操作解鎖。mutex多次加鎖會造成死鎖
  • std::timed_mutex,在mutex上增加了時間的屬性。增加了兩個成員函數try_lock_for(),try_lock_until(),分別接收一個時間範圍,再給定的時間內如果互斥量被鎖主了,線程阻塞,超過時間,返回false。
  • std::recursive_timed_mutex,增加遞歸和時間屬性

b. mutex成員函數加鎖解鎖

  • lock(),互斥量加鎖,如果互斥量已被加鎖,線程阻塞
  • bool try_lock(),嘗試加鎖,如果互斥量未被加鎖,則執行加鎖操作,返回true;如果互斥量已被加鎖,返回false,線程不阻塞。
  • void unlock(),解鎖互斥量

c. mutex RAII式的加鎖解鎖

  • std::lock_guard,管理mutex的類。對象構建時傳入mutex,會自動對mutex加入,直到離開類的作用域,析構時完成解鎖。RAII式的棧對象能保證在異常情形下mutex可以在lock_guard對象析構被解鎖。
  • std::unique_lock 與 lock_guard功能類似,但是比lock_guard的功能更強大。比如std::unique_lock維護了互斥量的狀態,可通過bool owns_lock()訪問,當locked時返回true,否則返回false

3、condition_variable

條件變量的使用要結合條件、互斥量、條件變量三者一起使用。線程在檢測條件之前使用mutex加鎖,滿足某種條件時線程使用條件變量的wait操作進入阻塞狀態。當其它的線程修改條件,激活該條件變量阻塞的線程,阻塞的線程的重新加鎖檢測條件。條件變量提供wait和notify兩種操作。

 1 // condition_variable example
 2 #include <iostream>           // std::cout
 3 #include <thread>             // std::thread
 4 #include <mutex>              // std::mutex, std::unique_lock
 5 #include <condition_variable> // std::condition_variable
 6 
 7 std::mutex mtx;
 8 std::condition_variable cv;
 9 bool ready = false;
10 
11 void print_id (int id) {
12   std::unique_lock<std::mutex> lck(mtx);
13   while (!ready) cv.wait(lck);
14   // ...
15   std::cout << "thread " << id << \n;
16 }
17 
18 void go() {
19   std::unique_lock<std::mutex> lck(mtx);
20   ready = true;
21   cv.notify_all();
22 }
23 
24 int main ()
25 {
26   std::thread threads[10];
27   // spawn 10 threads:
28   for (int i=0; i<10; ++i)
29     threads[i] = std::thread(print_id,i);
30 
31   std::cout << "10 threads ready to race...\n";
32   go();                       // go!
33 
34   for (auto& th : threads) th.join();
35 
36   return 0;
37 }

4、信號量(CSemaphore)

C++11多線程庫沒有提供信號量的類,但是很容易通過條件變量、互斥量自己實現。

//信號量類
class CSemaphore {
private:
    std::condition_variable cv;
    std::mutex mutex;
    int value;
public:
    CSemaphore(int init) :
            value(init) {
    }

    void wait() {
        std::unique_lock<std::mutex> lock(mutex);
        while (value < 1) {
            cv.wait(lock);
        }
        value--;
    }

    bool try_wait() {
        std::unique_lock<std::mutex> lock(mutex);
        if (value < 1)
            return false;
        value--;
        return true;
    }

    void post() {
        {
            std::unique_lock<std::mutex> lock(mutex);
            value++;
        }
        cv.notify_one();
    }
};

5、原子操作<atomic>

針對多線程的共享數據的存儲讀寫,多線程指令交叉可能造成未知的錯誤(undefine行為),需要限制並發程序以某種特定的順序執行,除了前面介紹的互斥量加鎖的操縱,還可以使用C++11中提供的原則操作(atomic)。原子操作使得某個線程對共享數據的操作要不一步完成,要不不做。

a. std::atomic_flag是一個bool原子類型有兩個狀態:set(flag=true) 和 clear(flag=false),必須被ATOMIC_FLAG_INIT初始化此時flag為clear狀態,相當於靜態初始化。一旦atomic_flag初始化後只有三個操作:test_and_set,clear,析構,均是原子化操作。atomic_flag::test_and_set檢查flag是否被設置,若被設置直接返回true,若沒有設置則設置flag為true後再返回false。atomic_clear()清楚flag標誌即flag=false。不支持拷貝、賦值等操作,這和所有atomic類型一樣,因為兩個原子類型之間操作不能保證原子化。atomic_flag的可操作性不強導致其應用局限性,還不如atomic<bool>。

b.atomic<T>模板類。T必須滿足trivially copy type。定義了拷貝/移動/賦值函數;沒有虛成員;基類或其它任何非static成員都是trivally copyable。典型的內置類型bool、int等屬於trivally copyable type。註意某些原子操作可能會失敗,比如atomic<float>、atomic<double>,,沒有原子算術操作針對浮點數。

atomic<T>特別針對整數和指針做了特化。整數包括har, signed char, unsigned char, short, unsigned short, int, unsigned int, long, unsigned long, long long, unsigned long long, char16_t, char32_t, wchar_t。由於在實際中,用得比較多的原子類型是整數,下面以整數原子類型介紹原子類型的操作函數

  • 構造函數。構造函數傳入一個T類型的整數,初始化一個std::atomic對象,拷貝構造函數禁用。std::atomic <int> foo = 0;
  • std::atomic::operator=(T val),賦值操作函數。一個類型為T的變量可以賦值給相應的原子類型變量,相當於隱式轉換,並且該操作是原子的。
  • std::atomic::is_lock_free,判斷std:atomic對象是否具備lock-free特性,在多個線程範文該對象時不會導致線程阻塞。(可能使用某種事務內存transactional memory 方法實現 lock-free 的特性)。
  • store 修改被封裝的值,sync指定內存序,默認為順序一致性
  • load 和store相對應,讀取被封裝的值
  • exchange。讀取並修改被封裝的值,exchange 會將 val 指定的值替換掉之前該原子對象封裝的值,並返回之前該原子對象封裝的值,整個過程是原子的(因此exchange 操作也稱為 read-modify-write 操作)。sync參數指定內存序(Memory Order)

針對整型特化,增加的一些操作函數:

  • fetch_add,將原子對象封裝的值增加某個值,並返回原子對象的舊值
  • fetch_sub,將原子對象封裝的值減少某個值,並返回原子對象的舊值
  • fetch_and,將原子對象封裝的值與某個值相與,並返回原子對象的舊值
  • fetch_or
  • fetch_xor
  • 支持operator++,原子對象自增
  • 支持operator--,原子對象自減

6、future

參考:C++並發實戰13:std::future、std::async、std::promise、std::packaged_task

多線程程序設計時,一方面要註意多線程共享變量的訪問的安全性,另一方面有些異步任務之間會有結果的傳遞。C++11標準提供了幾種異步任務處理機制。通常thread不能直接返回執行的結構(可以通過傳遞應用,指針),而在異步處理當中很多時候一個線程(privider)創建某個線程(executor)處理某個任務,provider在某個時候獲取executor執行結果,如果executor沒有完成任務,provider線程就會阻塞等待,直到executor線程完成任務,返回結果。

std::future可用於異步任務中獲取任務結果,但是它只是獲取結果而已,真正的異步調用需要配合std::async,std::packaged_task,std::promise。async是個模板函數,packaged_task和promise是模板類,通常模板實例化參數是任務函數。

a. aysnc函數+future 模式

 std::future<bool> fut = std::async (is_prime,313222313);

這裏裏async自動創建一個後臺線程,執行任務is_prime函數,並將計算結果保存在myFuture中,這裏future的模板參數要和任務task返回類型一致為bool.

b.packaged_task+future

std::packaged_task內部包含了兩個最基本的元素。一、被包裝的任務,任務是一個可調用的對象,函數對象、函數指針。二、共享狀態(shared state),用於保存任務的返回值,使用std::future對象異步訪問共享狀態。

可以通過 std::packged_task::get_future 來獲取與共享狀態相關聯的 std::future 對象。在調用該函數之後,兩個對象共享相同的共享狀態,具體解釋如下:

  • std::packaged_task 對象是異步 Provider,它在某一時刻通過調用被包裝的任務來設置共享狀態的值。
  • std::future 對象是一個異步返回對象,通過它可以獲得共享狀態的值,當然在必要的時候需要等待共享狀態標誌變為 ready.

std::packaged_task 的共享狀態的生命周期一直持續到最後一個與之相關聯的對象被釋放或者銷毀為止。

具體實例參考:http://www.cplusplus.com/reference/future/packaged_task/

c.promise + future

aync和packaged_task,是provider線程獲取executor線程的結果。promise是provider線程通過future對象項executor線程傳遞參數。

promise 對象可以保存某一類型 T 的值,該值可被 future 對象讀取(可能在另外一個線程中)。在 promise 對象構造時可以和一個共享狀態(通常是std::future)相關聯,並可以在相關聯的共享狀態(std::future)上保存一個類型為 T 的值。

可以通過 get_future 來獲取與該 promise 對象相關聯的 future 對象,調用該函數之後,兩個對象共享相同的共享狀態(shared state)

  • promise 對象是異步 Provider,它可以在某一時刻設置共享狀態的值。
  • future 對象可以異步返回共享狀態的值,或者在必要的情況下阻塞調用者並等待共享狀態標誌變為 ready,然後才能獲取共享狀態的值。
 1 #include <iostream>       // std::cout
 2 #include <functional>     // std::ref
 3 #include <thread>         // std::thread
 4 #include <future>         // std::promise, std::future
 5 
 6 void print_int(std::future<int>& fut) {
 7     int x = fut.get(); // 獲取共享狀態的值.
 8     std::cout << "value: " << x << \n; // 打印 value: 10.
 9 }
10 
11 int main ()
12 {
13     std::promise<int> prom; // 生成一個 std::promise<int> 對象.
14     std::future<int> fut = prom.get_future(); // 和 future 關聯.
15     std::thread t(print_int, std::ref(fut)); // 將 future 交給另外一個線程t.
16     prom.set_value(10); // 設置共享狀態的值, 此處和線程t保持同步.
17     t.join();
18     return 0;
19 }

C++11中多線程庫