C++11學習筆記-----互斥量以及條件變數的使用
在多執行緒環境中,當多個執行緒同時訪問共享資源時,由於作業系統CPU排程的緣故,經常會出現一個執行緒執行到一半突然切換到另一個執行緒的情況。以多個執行緒同時對一個共享變數做加法運算為例,自增的彙編指令大致如下,先將變數值存放在某個暫存器中(eax),然後對暫存器進行加一,隨後將結果回寫到變數記憶體上
mov [#address#] eax; // 這裡#address#簡要表示目標變數的地址 // 1
inc eax; // 2
mov eax [#address#]; // 3
假設存在兩個執行緒同時對變數a進行加法操作,a初值為0,如果其中一個執行緒在第一步執行完後被切走,那麼最終a的結果可能不是2而是1
由圖片可知,由於cpu排程的緣故,多執行緒下同時對共享變數進行操作,可能會導致最終的結果並不是期望值。所以,為了保護共享變數,保證同一時刻只能允許一個執行緒對共享變數進行操作,就需要藉助互斥量的協助
Linux下的原生互斥量
Linux下提供了原生互斥量api,定義在標頭檔案<pthread.t>中。互斥量,形象點理解就是一把鎖,在對共享變數進行操作之前,先上鎖,只有獲得鎖的這個執行緒能夠繼續執行,而其他執行緒執行到上鎖語句時,會阻塞在那裡直到獲得鎖的那個執行緒執行解鎖操作,隨後繼續爭搶鎖,搶到鎖的執行緒接著執行,沒有搶到鎖的執行緒繼續阻塞
示例:利用互斥鎖解決多執行緒共享變數問題
在上一篇中提到了建立10個執行緒同時對一個共享變數進行自增,發現結果和預期不同,接下來利用互斥量解決這一問題
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <iostream>
#include <vector>
long long int total = 0;
pthread_mutex_t m;
void* thread_task(void* arg)
{
for(int i = 0; i < 10000; ++i)
{
/* 對total進行加法之前先上鎖,保證同一時刻只能有一個執行緒執行++total */
::pthread_mutex_lock(&m);
++total;
/* 解鎖 */
::pthread_mutex_unlock(&m);
}
::pthread_exit(nullptr);
}
int main()
{
/* 初始化互斥量 */
::pthread_mutex_init(&m, nullptr);
std::vector<pthread_t> tids;
for(int i = 0; i < 10; ++i)
{
pthread_t tid;
::pthread_create(&tid, nullptr, thread_task, nullptr);
tids.emplace_back(tid);
}
for(auto& tid : tids)
::pthread_join(tid, nullptr);
/* 釋放互斥量 */
::pthread_mutex_destroy(&m);
std::cout << total << std::endl;
return 0;
}
C++11下的互斥量和條件變數
互斥量
對比linux原生的庫函式,C++11提供的互斥量突出的特點有
- 無需考慮互斥量的初始化和銷燬,在類的構造和解構函式中管理,無需使用者操心
- 採用RAII對互斥量進行了不同封裝,提供了更加友好的上鎖機制
C++11提供的互斥量位於<mutex>標頭檔案中,提供的介面有
- lock,上鎖
- try_lock,嘗試上鎖,如果失敗則返回false
- unlock,解鎖
這三個函式和linux下的介面差不多,其實也沒什麼不同嘛~。事實上,多數程式都不直接使用std::mutex,標準庫採用RAII(資源獲取時就進行初始化)對std::mutex進行了封裝,使用起來當然是方便得不得了
簡單的鎖機制lock_guard
最簡單的封裝是std::lock_guard,單純利用RAII,構造時上鎖,析構時解鎖,使用示例為
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>
int main()
{
long long int total = 0;
std::mutex m;
std::vector<std::thread> threads;
for(int i = 0; i < 10; ++i)
{
threads.emplace_back(
[&m, &total]
{
for(int i = 0; i < 10000; ++i)
{
{
std::lock_guard<std::mutex> lock(m);
++total;
}
}
}
);
}
for(auto& th : threads)
th.join();
std::cout << total << std::endl;
return 0;
}
想對於共享資料的提供保護,使用std::lock_guard是完全沒有問題的,進入共享區前上鎖,離開後解鎖
更靈活的鎖unique_lock
稍微複雜的封裝是std::unique_lock,它提供了更靈活的上鎖機制,即通過建構函式的引數進行設定,分別可以
- 直接上鎖
- 延遲上鎖,僅儲存互斥量,不進行上鎖工作
- 嘗試上鎖
但是多數情況下采用預設的直接上鎖就可以了,而在std::unique_lock的生存期間,使用者也可以對其進行解鎖再上鎖等工作,這個作用體現在和條件變數的配合上
條件變數
標準庫中條件變數位於標頭檔案<condition_variable>中
其中有三個介面用於阻塞當前執行緒,常用的是wait
void wait(std::unique_lock<std::mutex>& lock);
template <class Predicate>
void wait(std::unique_lock<std::mutex>& lock, Predicate pred);
原子操作釋放鎖lock,阻塞當前執行緒,並將當前執行緒新增到*this上的等待執行緒列表,等待notify_one或者notify_all呼叫時結束阻塞(第二個過載當pred返回true時也會結束阻塞)
此外,還有兩個介面用於通知一個或多個等待執行緒,將其從阻塞狀態變為非阻塞
void notify_one() noexcept;
void notify_all() noexcept;
示例,利用互斥量和條件變數實現執行緒池
執行緒池工作原理
執行緒池的工作原理是預先建立若干執行緒,同時維護一個任務佇列,每個執行緒不斷地從任務佇列中取出任務並執行,使用者可以隨時向任務佇列中新增新任務。當任務佇列為空,執行緒池中的執行緒要麼執行自己那個沒有結束的任務,要麼處於睡眠狀態
在這個問題模型中,任務佇列就相當於共享變數,同一時刻只能有一個執行緒訪問任務佇列並從中取出任務,而新增任務時也需要避免新增和取出同時進行,這就需要互斥量的協助,凡是涉及到對任務佇列的存和取,都需要事先上鎖。
另外,如果任務佇列為空,那麼每個執行緒都不斷的上鎖,取任務(發現為空),解鎖,再上鎖,取任務(發現為空),解鎖…這樣的busy loop會極大消耗cpu,造成了不必要的開銷,所以需要引入條件變數,當任務佇列為空時,採用條件變數令執行緒睡眠
執行緒池定義
可以明確的是,執行緒池除了構造解構函式外,需要提供一個介面用於呼叫者新增任務,所以執行緒池的定義可以明確如下
#include <future>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <vector>
#include <queue>
class ThreadPool
{
public:
ThreadPool(std::size_t threadNums);
~ThreadPool();
void stop() { quit_ = true; }
public:
/* 用於新增任務,std::future<>用於儲存函式f的執行結果 */
template <class F, class... Args>
auto enqueue(F&& f, Args... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
private:
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::atomic<bool> quit_;
std::mutex mutex_;
std::condition_variable cond_;
};
建構函式
當執行緒池構造時,建立threadNums個執行緒,每個執行緒都從任務佇列中取出任務然後執行
ThreadPool::ThreadPool(std::size_t threadNums)
: quit_(false)
{
for(std::size_t i = 0; i < threadNums; ++i)
{
threads_.emplace_back(
[this]
{
while(!this->quit_)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->mutex_);
/* 利用條件變數,等待直到執行緒池退出或者任務佇列不為空 */
cond_.wait(lock, [this]() { return this->quit_ || !this->tasks_.empty(); });
if(this->quit_) return;
task = this->tasks_.front();
this->tasks_.pop();
}
task();
}
}
);
}
}
解構函式
解構函式用於回收執行緒資源
ThreadPool::~ThreadPool()
{
stop();
cond_.notify_all();
for(auto& th : threads_)
th.join();
}
新增任務
enqueue函式用於新增任務,涉及到了一些std::future的內容,這裡先簡單看看
/* class... 表示不定長引數列表 */
template <class F, class... Args>
/*
* auto會根據->後的內容自動推導返回型別
* std::future用於儲存函式執行結果
* std::result_of用於獲取函式執行結果
* std::packaged_task<T>是一個函式包,類似std::function,用於包裝函式
* std::packaged_task<T>::get_future用於返回函式執行結果
* std::unique_lock<std::mutex> 上鎖(這裡也可以用std::lock_guard
*/
auto ThreadPool::enqueue(F&& f, Args... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
std::unique_lock<std::mutex> lock(mutex_);
tasks_.push([task]() { (*task)(); });
return res;
}
測試程式碼
int main()
{
ThreadPool pool(4);
std::vector<std::future<int>> results;
for(int i = 0; i < 10; ++i)
{
results.emplace_back(
pool.enqueue(
[i]
{
/* std::this_thread::sleep_for(std::chrono::seconds(1)); */
return i * i;
}
)
);
}
for(auto&& result : results)
std::cout << result.get() << std::endl;
return 0;
}
小結
互斥鎖是多執行緒環境中不可缺少的重要部分,用於保護共享資源免受cpu排程的危害。另外,條件變數和互斥鎖配合使用可以避免busy loop帶來的不必要損耗