1. 程式人生 > >C++11學習筆記-----互斥量以及條件變數的使用

C++11學習筆記-----互斥量以及條件變數的使用

在多執行緒環境中,當多個執行緒同時訪問共享資源時,由於作業系統CPU排程的緣故,經常會出現一個執行緒執行到一半突然切換到另一個執行緒的情況。以多個執行緒同時對一個共享變數做加法運算為例,自增的彙編指令大致如下,先將變數值存放在某個暫存器中(eax),然後對暫存器進行加一,隨後將結果回寫到變數記憶體上

mov [#address#] eax;    // 這裡#address#簡要表示目標變數的地址   // 1
inc eax;    // 2
mov eax [#address#];    // 3

假設存在兩個執行緒同時對變數a進行加法操作,a初值為0,如果其中一個執行緒在第一步執行完後被切走,那麼最終a的結果可能不是2而是1

由圖片可知,由於cpu排程的緣故,多執行緒下同時對共享變數進行操作,可能會導致最終的結果並不是期望值。所以,為了保護共享變數,保證同一時刻只能允許一個執行緒對共享變數進行操作,就需要藉助互斥量的協助

Linux下的原生互斥量

Linux下提供了原生互斥量api,定義在標頭檔案<pthread.t>中。互斥量,形象點理解就是一把鎖,在對共享變數進行操作之前,先上鎖,只有獲得鎖的這個執行緒能夠繼續執行,而其他執行緒執行到上鎖語句時,會阻塞在那裡直到獲得鎖的那個執行緒執行解鎖操作,隨後繼續爭搶鎖,搶到鎖的執行緒接著執行,沒有搶到鎖的執行緒繼續阻塞

示例:利用互斥鎖解決多執行緒共享變數問題

在上一篇中提到了建立10個執行緒同時對一個共享變數進行自增,發現結果和預期不同,接下來利用互斥量解決這一問題

#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>

#include <iostream>
#include <vector>

long long int total = 0;
pthread_mutex_t m;

void* thread_task(void* arg)
{
    for(int i = 0; i < 10000; ++i)
    {
        /* 對total進行加法之前先上鎖,保證同一時刻只能有一個執行緒執行++total */
::pthread_mutex_lock(&m); ++total; /* 解鎖 */ ::pthread_mutex_unlock(&m); } ::pthread_exit(nullptr); } int main() { /* 初始化互斥量 */ ::pthread_mutex_init(&m, nullptr); std::vector<pthread_t> tids; for(int i = 0; i < 10; ++i) { pthread_t tid; ::pthread_create(&tid, nullptr, thread_task, nullptr); tids.emplace_back(tid); } for(auto& tid : tids) ::pthread_join(tid, nullptr); /* 釋放互斥量 */ ::pthread_mutex_destroy(&m); std::cout << total << std::endl; return 0; }

C++11下的互斥量和條件變數

互斥量

對比linux原生的庫函式,C++11提供的互斥量突出的特點有

  • 無需考慮互斥量的初始化和銷燬,在類的構造和解構函式中管理,無需使用者操心
  • 採用RAII對互斥量進行了不同封裝,提供了更加友好的上鎖機制

C++11提供的互斥量位於<mutex>標頭檔案中,提供的介面有

  • lock,上鎖
  • try_lock,嘗試上鎖,如果失敗則返回false
  • unlock,解鎖

這三個函式和linux下的介面差不多,其實也沒什麼不同嘛~。事實上,多數程式都不直接使用std::mutex,標準庫採用RAII(資源獲取時就進行初始化)對std::mutex進行了封裝,使用起來當然是方便得不得了

簡單的鎖機制lock_guard

最簡單的封裝是std::lock_guard,單純利用RAII,構造時上鎖,析構時解鎖,使用示例為

#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

int main()
{
    long long int total = 0;
    std::mutex m;
    std::vector<std::thread> threads;    
    for(int i = 0; i < 10; ++i)
    {
        threads.emplace_back(
                        [&m, &total]
                        {
                            for(int i = 0; i < 10000; ++i)
                            {
                                {
                                    std::lock_guard<std::mutex> lock(m);
                                    ++total;
                                }
                            }
                        }
                    );
    }
    for(auto& th : threads)
      th.join();

    std::cout << total << std::endl;
    return 0;
}

想對於共享資料的提供保護,使用std::lock_guard是完全沒有問題的,進入共享區前上鎖,離開後解鎖

更靈活的鎖unique_lock

稍微複雜的封裝是std::unique_lock,它提供了更靈活的上鎖機制,即通過建構函式的引數進行設定,分別可以

  • 直接上鎖
  • 延遲上鎖,僅儲存互斥量,不進行上鎖工作
  • 嘗試上鎖

但是多數情況下采用預設的直接上鎖就可以了,而在std::unique_lock的生存期間,使用者也可以對其進行解鎖再上鎖等工作,這個作用體現在和條件變數的配合上

條件變數

標準庫中條件變數位於標頭檔案<condition_variable>中

其中有三個介面用於阻塞當前執行緒,常用的是wait

void wait(std::unique_lock<std::mutex>& lock);
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);

原子操作釋放鎖lock,阻塞當前執行緒,並將當前執行緒新增到*this上的等待執行緒列表,等待notify_one或者notify_all呼叫時結束阻塞(第二個過載當pred返回true時也會結束阻塞)

此外,還有兩個介面用於通知一個或多個等待執行緒,將其從阻塞狀態變為非阻塞

void notify_one() noexcept;
void notify_all() noexcept;

示例,利用互斥量和條件變數實現執行緒池

執行緒池工作原理

執行緒池的工作原理是預先建立若干執行緒,同時維護一個任務佇列,每個執行緒不斷地從任務佇列中取出任務並執行,使用者可以隨時向任務佇列中新增新任務。當任務佇列為空,執行緒池中的執行緒要麼執行自己那個沒有結束的任務,要麼處於睡眠狀態

在這個問題模型中,任務佇列就相當於共享變數,同一時刻只能有一個執行緒訪問任務佇列並從中取出任務,而新增任務時也需要避免新增和取出同時進行,這就需要互斥量的協助,凡是涉及到對任務佇列的存和取,都需要事先上鎖。

另外,如果任務佇列為空,那麼每個執行緒都不斷的上鎖,取任務(發現為空),解鎖,再上鎖,取任務(發現為空),解鎖…這樣的busy loop會極大消耗cpu,造成了不必要的開銷,所以需要引入條件變數,當任務佇列為空時,採用條件變數令執行緒睡眠

執行緒池定義

可以明確的是,執行緒池除了構造解構函式外,需要提供一個介面用於呼叫者新增任務,所以執行緒池的定義可以明確如下

#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>

class ThreadPool
{
    public:
        ThreadPool(std::size_t threadNums);
        ~ThreadPool();

        void stop() { quit_ = true; }
    public:
        /* 用於新增任務,std::future<>用於儲存函式f的執行結果 */
        template <class F, class... Args>
        auto enqueue(F&& f, Args... args)
            -> std::future<typename std::result_of<F(Args...)>::type>;
    private:
        std::vector<std::thread> threads_;
        std::queue<std::function<void()>> tasks_;
        std::atomic<bool> quit_;
        std::mutex mutex_;
        std::condition_variable cond_;
};

建構函式

當執行緒池構造時,建立threadNums個執行緒,每個執行緒都從任務佇列中取出任務然後執行

ThreadPool::ThreadPool(std::size_t threadNums)
        : quit_(false)
{
    for(std::size_t i = 0; i < threadNums; ++i)
    {
        threads_.emplace_back(
                        [this]
                        {
                            while(!this->quit_)
                            {
                                std::function<void()> task;
                                {
                                    std::unique_lock<std::mutex> lock(this->mutex_);
                                    /* 利用條件變數,等待直到執行緒池退出或者任務佇列不為空 */
                                    cond_.wait(lock, [this]() { return this->quit_ || !this->tasks_.empty(); });
                                    if(this->quit_) return;
                                    task = this->tasks_.front();
                                    this->tasks_.pop();
                                }
                                task();
                            }
                        }
                    );     
    }
}

解構函式

解構函式用於回收執行緒資源

ThreadPool::~ThreadPool()
{
    stop();
    cond_.notify_all();
    for(auto& th : threads_)
      th.join();
}

新增任務

enqueue函式用於新增任務,涉及到了一些std::future的內容,這裡先簡單看看

/* class... 表示不定長引數列表 */
template <class F, class... Args>
/* 
 * auto會根據->後的內容自動推導返回型別
 * std::future用於儲存函式執行結果
 * std::result_of用於獲取函式執行結果
 * std::packaged_task<T>是一個函式包,類似std::function,用於包裝函式
 * std::packaged_task<T>::get_future用於返回函式執行結果
 * std::unique_lock<std::mutex> 上鎖(這裡也可以用std::lock_guard 
 */
auto ThreadPool::enqueue(F&& f, Args... args)
        -> std::future<typename std::result_of<F(Args...)>::type>
{
   using return_type = typename std::result_of<F(Args...)>::type;
   auto task = std::make_shared<std::packaged_task<return_type()>>(
                    std::bind(std::forward<F>(f), std::forward<Args>(args)...)
               );
   std::future<return_type> res = task->get_future();
   std::unique_lock<std::mutex> lock(mutex_);
   tasks_.push([task]() { (*task)(); });
   return res;
}

測試程式碼

int main()
{
    ThreadPool pool(4);
    std::vector<std::future<int>> results;
    for(int i = 0; i < 10; ++i)
    {
        results.emplace_back(
                    pool.enqueue(
                            [i]
                            {
                                /* std::this_thread::sleep_for(std::chrono::seconds(1)); */
                                return i * i;
                            }
                        )
                    );
    }
    for(auto&& result : results)
      std::cout << result.get() << std::endl;
    return 0;
}

小結

互斥鎖是多執行緒環境中不可缺少的重要部分,用於保護共享資源免受cpu排程的危害。另外,條件變數和互斥鎖配合使用可以避免busy loop帶來的不必要損耗