1. 程式人生 > >C++ 執行緒安全下Lock 類的兩種使用方式

C++ 執行緒安全下Lock 類的兩種使用方式

“不定義,做一個保持好奇心的普通人”

 

꿈을 이루게 될 거예요.

2018.12.19

快三年了:

 

Mutex 又稱互斥量,C++ 11中與 Mutex 相關的類(包括鎖型別)和函式都宣告在 <mutex> 標頭檔案中,所以如果你需要使用 std::mutex,就必須包含 <mutex> 標頭檔案。

<mutex> 標頭檔案介紹

Mutex 系列類(四種)

  • std::mutex,最基本的 Mutex 類。
  • std::recursive_mutex,遞迴 Mutex 類。
  • std::time_mutex,定時 Mutex 類。
  • std::recursive_timed_mutex,定時遞迴 Mutex 類。

Lock 類(兩種)

  • std::lock_guard,與 Mutex RAII 相關,方便執行緒對互斥量上鎖。
  • std::unique_lock,與 Mutex RAII 相關,方便執行緒對互斥量上鎖,但提供了更好的上鎖和解鎖控制。

其他型別

  • std::once_flag
  • std::adopt_lock_t
  • std::defer_lock_t
  • std::try_to_lock_t

函式

  • std::try_lock,嘗試同時對多個互斥量上鎖。
  • std::lock,可以同時對多個互斥量上鎖。
  • std::call_once,如果多個執行緒需要同時呼叫某個函式,call_once 可以保證多個執行緒對該函式只調用一次。

std::mutex

下面以 std::mutex 為例介紹 C++11 中的互斥量用法。

std::mutex 是C++11 中最基本的互斥量,std::mutex 物件提供了獨佔所有權的特性——即不支援遞迴地對 std::mutex 物件上鎖,而 std::recursive_lock 則可以遞迴地對互斥量物件上鎖。

std::mutex 的成員函式

  • 建構函式,std::mutex不允許拷貝構造,也不允許 move 拷貝,最初產生的 mutex 物件是處於 unlocked 狀態的。
  • lock(),呼叫執行緒將鎖住該互斥量。執行緒呼叫該函式會發生下面 3 種情況:(1). 如果該互斥量當前沒有被鎖住,則呼叫執行緒將該互斥量鎖住,直到呼叫 unlock之前,該執行緒一直擁有該鎖。(2). 如果當前互斥量被其他執行緒鎖住,則當前的呼叫執行緒被阻塞住。(3). 如果當前互斥量被當前呼叫執行緒鎖住,則會產生死鎖(deadlock)。
  • unlock(), 解鎖,釋放對互斥量的所有權。
  • try_lock(),嘗試鎖住互斥量,如果互斥量被其他執行緒佔有,則當前執行緒也不會被阻塞。執行緒呼叫該函式也會出現下面 3 種情況,(1). 如果當前互斥量沒有被其他執行緒佔有,則該執行緒鎖住互斥量,直到該執行緒呼叫 unlock 釋放互斥量。(2). 如果當前互斥量被其他執行緒鎖住,則當前呼叫執行緒返回 false,而並不會被阻塞掉。(3). 如果當前互斥量被當前呼叫執行緒鎖住,則會產生死鎖(deadlock)。

 

接下來

重點說一下lock_guard 和 unique_lock

 

※※std::lock_guard

std::lock_gurad 是 C++11 中定義的模板類。定義如下:
template<class Mutex> class lock_guard;

注意:無論是std::mutex還是std::lock_gurad、std::unique_lock 都是類,需要建立自己的物件使用!!!

lock_guard 物件呢通常是用來管理一個 std::mutex 型別的物件,即通過定義一個 lock_guard 一個物件來管理 std::mutex 的上鎖和解鎖。在 lock_guard 初始化的時候進行上鎖,然後在 lock_guard 析構的時候進行解鎖。值得注意的是,lock_guard 物件並不負責管理 std::mutex 物件的生命週期,lock_guard 物件只是簡化了 mutex 物件的上鎖和解鎖操作,方便執行緒對互斥量上鎖,即在某個 lock_guard 物件的宣告週期內,它所管理的鎖物件會一直保持上鎖狀態;而 lock_guard 的生命週期結束之後,它所管理的鎖物件會被解鎖(注:類似 shared_ptr 等智慧指標管理動態分配的記憶體資源 ),也就是說在使用 lock_guard 的過程中,如果 std::mutex 的物件被釋放了,那麼在 lock_guard 析構的時候進行解鎖就會出現空指標錯誤之類。

在 lock_guard 物件構造時,傳入的 Mutex 物件(即它所管理的 Mutex 物件)會被當前執行緒鎖住。在lock_guard 物件被析構時,它所管理的 Mutex 物件會自動解鎖,由於不需要程式設計師手動呼叫 lock 和 unlock 對 Mutex 進行上鎖和解鎖操作,因此這也是最簡單安全的上鎖和解鎖方式,尤其是在程式丟擲異常後先前已被上鎖的 Mutex 物件可以正確進行解鎖操作,極大地簡化了程式設計師編寫與 Mutex 相關的異常處理程式碼。

例:

//
// Created by zxkj on 2018.12.19
//
 
#include <iostream>
#include <thread>
#include <mutex>
#include <stdexcept>
 
std::mutex mtx;
using std::cout;
using std::endl;
 
void print_event(int x)
{
  if (x % 2 == 0)
    {
      cout << x << "is event" << endl;
    }
  else
    {
      throw(std::logic_error("not event"));
    }
}
 
void print_id(int id)
{
  try
    {
      std::lock_guard<std::mutex> lck(mtx);
      print_event (id);
    }
  catch(std::logic_error&)
    {
      cout << "[exception caught]\n";
    }
 
}
 
int main()
{
  std::thread threads[10];
  for (int i = 0; i < 10; ++i)
    {
      threads[i] = std::thread(print_id, i+1);
    }
 
  for (auto &th : threads)
    {
      th.join ();
    }
 
 
  return 0;
}


 ※※std::unique_lock

unique_lock 和 lock_guard 一樣,對 std::mutex 型別的互斥量的上鎖和解鎖進行管理,一樣也不管理 std::mutex 型別的互斥量的宣告週期。但是它的使用更加的靈活。std::unique_lock 的建構函式的數目相對來說比 std::lock_guard 多,其中一方面也是因為 std::unique_lock 更加靈活,從而在構造 std::unique_lock 物件時可以接受額外的引數。總地來說,std::unique_lock 建構函式如下:

(1) 預設建構函式
    新建立的 unique_lock 物件不管理任何 Mutex 物件。
(2) locking 初始化
    新建立的 unique_lock 物件管理 Mutex 物件 m,並嘗試呼叫 m.lock() 對 Mutex 物件進行上鎖,如果此時另外某個 unique_lock 物件已經管理了該 Mutex 物件 m,則當前執行緒將會被阻塞。
(3) try-locking 初始化
    新建立的 unique_lock 物件管理 Mutex 物件 m,並嘗試呼叫 m.try_lock() 對 Mutex 物件進行上鎖,但如果上鎖不成功,並不會阻塞當前執行緒。
(4) deferred 初始化
    新建立的 unique_lock 物件管理 Mutex 物件 m,但是在初始化的時候並不鎖住 Mutex 物件。 m 應該是一個沒有當前執行緒鎖住的 Mutex 物件。
(5) adopting 初始化
    新建立的 unique_lock 物件管理 Mutex 物件 m, m 應該是一個已經被當前執行緒鎖住的 Mutex 物件。(並且當前新建立的 unique_lock 物件擁有對鎖(Lock)的所有權)。
(6) locking 一段時間(duration)
    新建立的 unique_lock 物件管理 Mutex 物件 m,並試圖通過呼叫 m.try_lock_for(rel_time) 來鎖住 Mutex 物件一段時間(rel_time)。
(7) locking 直到某個時間點(time point)
    新建立的 unique_lock 物件管理 Mutex 物件m,並試圖通過呼叫 m.try_lock_until(abs_time) 來在某個時間點(abs_time)之前鎖住 Mutex 物件。
(8) 拷貝構造 [被禁用]
    unique_lock 物件不能被拷貝構造。
(9) 移動(move)構造
    新建立的 unique_lock 物件獲得了由 x 所管理的 Mutex 物件的所有權(包括當前 Mutex 的狀態)。呼叫 move 構造之後, x 物件如同通過預設建構函式所建立的,就不再管理任何 Mutex 物件了。

#include <iostream>
#include <thread>
#include <mutex>
std::mutex foo,bar;
void task_a() {
	std::lock(foo, bar);//foo和bar已被當前執行緒鎖住
	/*******************************************************
	*adopting 初始化:
	*adopt_lock 是一個常量物件,通常作為引數傳入給unique_lock 或 
	*lock_guard 的建構函式。新建立的 unique_lock 物件管理 Mutex 
	*物件 m, m 應該是一個已經被當前執行緒鎖住的 Mutex 物件。
	*******************************************************/
	std::unique_lock<std::mutex> lck1(foo, std::adopt_lock);
	std::unique_lock<std::mutex> lck2(bar, std::adopt_lock);
	std::cout << "task a\n";
}
void task_b() {
	//新建立的 unique_lock 物件不管理任何 Mutex 物件。
	std::unique_lock<std::mutex> lck1, lck2;
	/******************************************************
	* deferred 初始化:
	*新建立的 unique_lock 物件管理 Mutex 物件 m,但是在初始化
	*的時候並不鎖住 Mutex 物件。 m 應該是一個沒有當前執行緒鎖住的 
	*Mutex 物件。
	******************************************************/
	lck1 = std::unique_lock<std::mutex>(bar, std::defer_lock);
	lck2 = std::unique_lock<std::mutex>(foo, std::defer_lock);
	std::lock(lck1, lck2);
	std::cout << "task b\n";
}
int main() {
	std::thread th1(task_a);
	std::thread th2(task_b);
	th1.join();
	th2.join();
	system("pause");
	return EXIT_SUCCESS;
}

 

總結:

    1. unique_lock比lock_guard使用更加靈活,功能更加強大。使用unique_lock需要付出更多的時間、效能成本。std::unique_lock也可以提供自動加鎖、解鎖功能

    2. std::lock_guard 在建構函式中進行加鎖,解構函式中進行解鎖;是RAII模板類的簡單實現,功能簡單。

    3.大部分情況下,兩者的功能是一樣的,不過unique_lock 比lock_guard 更靈活.unique_lock提供了lock, unlock, try_lock等介面.
lock_guard沒有多餘的介面,建構函式時拿到鎖,解構函式時釋放鎖,lock_guard 比unique_lock 要省時.

   4. lock_guard 同一時間鎖住兩個mutex, 再建立guards用來管理鎖的釋放工作;

       unique_lock  先建立guards, 再同時鎖住兩個鎖。

 

 ※※std::condition_variable

<condition_variable>是C++標準程式庫中的一個頭檔案,定義了C++11標準中的一些用於併發程式設計時表示條件變數的類與方法等。

互斥鎖std::mutex是一種最常見的執行緒間同步的手段,但是在有些情況下不太高效。

假設想實現一個簡單的消費者生產者模型,一個執行緒往佇列中放入資料,一個執行緒往佇列中取資料,取資料前需要判斷一下佇列中確實有資料,由於這個佇列是執行緒間共享的,所以,需要使用互斥鎖進行保護,一個執行緒在往佇列新增資料的時候,另一個執行緒不能取,反之亦然。用互斥鎖實現如下:

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>

std::deque<int> q;
std::mutex mu;

void function_1() {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock();
        std::this_thread::sleep_for(std::chrono::seconds(1));
        count--;
    }
}

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        if (!q.empty()) {
            data = q.back();
            q.pop_back();
            locker.unlock();
            std::cout << "t2 got a value from t1: " << data << std::endl;
        } else {
            locker.unlock();
        }
    }
}
int main() {
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

//輸出結果
//t2 got a value from t1: 10
//t2 got a value from t1: 9
//t2 got a value from t1: 8
//t2 got a value from t1: 7
//t2 got a value from t1: 6
//t2 got a value from t1: 5
//t2 got a value from t1: 4
//t2 got a value from t1: 3
//t2 got a value from t1: 2
//t2 got a value from t1: 1

可以看到,互斥鎖其實可以完成這個任務,但是卻存在著效能問題。

首先,function_1函式是生產者,在生產過程中,std::this_thread::sleep_for(std::chrono::seconds(1));表示延時1s,所以這個生產的過程是很慢的;function_2函式是消費者,存在著一個while迴圈,只有在接收到表示結束的資料的時候,才會停止,每次迴圈內部,都是先加鎖,判斷佇列不空,然後就取出一個數,最後解鎖。所以說,在1s內,做了很多無用功!這樣的話,CPU佔用率會很高,可能達到100%(單核)。

這就引出了條件變數(condition variable),c++11中提供了#include <condition_variable>標頭檔案,其中的std::condition_variable可以和std::mutex結合一起使用,其中有兩個重要的介面,notify_one()wait()wait()可以讓執行緒陷入休眠狀態,在消費者生產者模型中,如果生產者發現佇列中沒有東西,就可以讓自己休眠,但是不能一直不幹活啊,notify_one()就是喚醒處於wait中的其中一個條件變數(可能當時有很多條件變數都處於wait狀態)。那什麼時刻使用notify_one()比較好呢,當然是在生產者往佇列中放資料的時候了,佇列中有資料,就可以趕緊叫醒等待中的執行緒起來幹活了。

使用條件變數修改後如下:

#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>

std::deque<int> q;
std::mutex mu;
std::condition_variable cond;

void function_1() {
    int count = 10;
    while (count > 0) {
        std::unique_lock<std::mutex> locker(mu);
        q.push_front(count);
        locker.unlock();
        cond.notify_one();  // Notify one waiting thread, if there is one.
        std::this_thread::sleep_for(std::chrono::seconds(1));
        count--;
    }
}

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        while(q.empty())
            cond.wait(locker); // Unlock mu and wait to be notified
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}
int main() {
    std::thread t1(function_1);
    std::thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

上面的程式碼有三個注意事項:

  1. function_2中,在判斷佇列是否為空的時候,使用的是while(q.empty()),而不是if(q.empty()),這是因為wait()從阻塞到返回,不一定就是由於notify_one()函式造成的,還有可能由於系統的不確定原因喚醒(可能和條件變數的實現機制有關),這個的時機和頻率都是不確定的,被稱作偽喚醒,如果在錯誤的時候被喚醒了,執行後面的語句就會錯誤,所以需要再次判斷佇列是否為空,如果還是為空,就繼續wait()阻塞。
  2. 在管理互斥鎖的時候,使用的是std::unique_lock而不是std::lock_guard,而且事實上也不能使用std::lock_guard,這需要先解釋下wait()函式所做的事情。可以看到,在wait()函式之前,使用互斥鎖保護了,如果wait的時候什麼都沒做,豈不是一直持有互斥鎖?那生產者也會一直卡住,不能夠將資料放入佇列中了。所以,wait()函式會先呼叫互斥鎖的unlock()函式,然後再將自己睡眠,在被喚醒後,又會繼續持有鎖,保護後面的佇列操作。lock_guard沒有lockunlock介面,而unique_lock提供了。這就是必須使用unique_lock的原因。
  3. 使用細粒度鎖,儘量減小鎖的範圍,在notify_one()的時候,不需要處於互斥鎖的保護範圍內,所以在喚醒條件變數之前可以將鎖unlock()

還可以將cond.wait(locker);換一種寫法,wait()的第二個引數可以傳入一個函式表示檢查條件,這裡使用lambda函式最為簡單,如果這個函式返回的是truewait()函式不會阻塞會直接返回,如果這個函式返回的是falsewait()函式就會阻塞著等待喚醒,如果被偽喚醒,會繼續判斷函式返回值。

void function_2() {
    int data = 0;
    while ( data != 1) {
        std::unique_lock<std::mutex> locker(mu);
        cond.wait(locker, [](){ return !q.empty();} );  // Unlock mu and wait to be notified
        data = q.back();
        q.pop_back();
        locker.unlock();
        std::cout << "t2 got a value from t1: " << data << std::endl;
    }
}

 除了notify_one()函式,c++還提供了notify_all()函式,可以同時喚醒所有處於wait狀態的條件變數。

 

※※提供一個執行緒安全的堆疊例項(.hpp):

#ifndef THRAEDSAFE_STACK_HPP
#define THRAEDSAFE_STACK_HPP

#include <mutex>
#include <condition_variable>
#include <stack>
#include <memory>

template<typename T>
class threadsafe_stack
{
private:
    mutable std::mutex mut;
    std::stack<T> data_stack;
    std::condition_variable data_cond;
public:
    threadsafe_stack()
    {}
    threadsafe_stack(threadsafe_stack const& other)
    {
        std::lock_guard<std::mutex> lk(other.mut);
        data_stack=other.data_stack;
    }

    void push(T new_value)
    {
        std::lock_guard<std::mutex> lk(mut);
        data_stack.push(new_value);
        data_cond.notify_one();
    }

    void wait_and_pop(T& value)
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_stack.empty();});
        value=data_stack.top();
        data_queue.pop();
    }

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> lk(mut);
        data_cond.wait(lk,[this]{return !data_stack.empty();});
        std::shared_ptr<T> res(std::make_shared<T>(data_stack.top()));
        data_queue.pop();
        return res;
    }

    bool try_pop(T& value)
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_stack.empty())
            return false;
        value=data_stack.top();
        data_stack.pop();
        return true;
    }

    std::shared_ptr<T> try_pop()
    {
        std::lock_guard<std::mutex> lk(mut);
        if(data_stack.empty())
            return std::shared_ptr<T>();
        std::shared_ptr<T> res(std::make_shared<T>(data_stack.top()));
        data_stack.pop();
        return res;
    }

    bool empty() const
    {
        std::lock_guard<std::mutex> lk(mut);
        return data_stack.empty();
    }
};

#endif

 

ok~

未來再見!