C++實現執行緒安全的佇列
C++標準庫已經提供了std::queue這一佇列容器,但不是執行緒安全的。std::queue這個容器已經提供了pop(),push(),empty()等這些讀寫操作容器的函式,只要在這些函式上面加個鎖,就可以使其執行緒安全。
在C++原有容器上面進行簡單封裝即可實現一個執行緒安全的佇列,實現程式碼如下:
#include <iostream>
#include <string>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <memory>
template<class T, class Container = std::queue<T>>
class ThreadSafeQueue {
public:
ThreadSafeQueue() = default;
template <class Element>
void Push(Element&& element) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::forward<Element>(element));
not_empty_cv_.notify_one();
}
void WaitAndPop(T& t) {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, []() {
return !queue_.empty();
});
t = std::move(queue_.front());
queue_.pop()
}
std::shared_ptr<T> WaitAndPop() {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, [this ]() {
return !queue_.empty();
});
std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front());
queue_.pop();
return t_ptr;
}
bool TryPop(T& t) {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return false;
}
t = std::move(queue_.front());
queue_.pop()
return true;
}
std::shared_ptr<T> TryPop() {
std::lock_guard<std::mutex> lock(mutex_);
if (queue_.empty()) {
return std::shared_ptr<T>();
}
t = std::move(queue_.front());
std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front());
queue_.pop();
return t_ptr;
}
bool IsEmpty() const {
std::lock_guard<std::mutex> lock(mutex_);
return queue_.empty();
}
private:
ThreadSafeQueue(const ThreadSafeQueue&) = delete;
ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete;
ThreadSafeQueue(ThreadSafeQueue&&) = delete;
ThreadSafeQueue& operator=(ThreadSafeQueue&&) = delete;
private:
Container queue_;
std::condition_variable not_empty_cv_;
mutable std::mutex mutex_;
};
程式碼分析:
1. 條件變數std::condition_variable的使用。
使用條件變數的原因是為了實現WaitAndPop()這個函式。這個函式的作用是如果容器中有資料則進行Pop,如果沒有資料則進行等待。
每次在Pop資料的時候都會呼叫條件變數 not_empty_cv_.wait();
,如果滿足wait()的條件則程式阻塞到這裡,等待其他執行緒Push資料之後進行not_empty_cv_.notify_one();
來喚醒wait()。
2. 條件變數wait函式的作用。
void WaitAndPop(T& t) {
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, []() {
return !queue_.empty();
});
t = std::move(queue_.front());
queue_.pop()
}
在上述程式碼中,wait()
之前首先會使用std::unique_lock獲取互斥元mutex_,然後當代碼阻塞到wait()這裡的時候,Push函式也要獲取互斥元mutex_才能插入資料,這裡看起來十分奇怪。實際上wait()
函式將執行緒阻塞到這裡的時候,會解鎖互斥元,所以其他執行緒仍然可以正常獲取mutex_。當其他執行緒進行notify_one()
的時候,會喚醒剛才阻塞等待的執行緒,該執行緒會重新上鎖,然後判斷跳出wait()的條件是否滿足,如果滿足則跳出wait(),進行後面操作,否則繼續進行阻塞等待的動作。
3. 使用wait()時需要傳入鎖std::unique_lock。
給條件變數wait()函式傳入的鎖是std::unique_lock
,而不是std::lock_guard
。因為執行緒在執行wait()函式的時候,如果進入等待,就要解鎖,被喚醒後又會重新加鎖,std::lock_guard功能比較簡單,不能滿足wait()的要求,所以要用std::unique_lock。
4. 使用wait()時需要傳入判斷條件,來防止假喚醒。
std::unique_lock<std::mutex> lock(mutex_);
not_empty_cv_.wait(lock, []() {
return !queue_.empty();
})
這裡的判斷條件是lambda表示式[](){ return !queue_.empty(); }
這個匿名函式就是用來判斷佇列是否不為空。這個條件判斷其實就是退出等待的條件。wait(lock, 退出等待條件函式)
,這條語句實際就是英文的表達習慣wait until ...
,意思是執行緒進行等待直到滿足退出等待條件為止
。
如果這裡沒有加判斷條件而是直接呼叫wait(lock);
,這樣會導致兩個嚴重的問題:
- 假喚醒問題,這種問題導致容器中還沒有資料就進行了Pop。
- 如果容器中已經有了資料,在Pop的時候還要等wait()收到notify才能Pop,這樣導致了即使有資料也不能取出的問題。
5. 成員變數std::mutex要用mutable修飾。
mutable std::mutex mutex_;
mutable
的作用是突破const的限制,使得一個成員函式在const修飾的時候,依然可以更改mutable修飾的成員變數。因為成員變數mutex_,會不斷地加鎖解鎖,所以互斥元必須是可變的。
6. 使用引用摺疊來簡化程式碼。
template <class Element>
void Push(Element&& element) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::forward<Element>(element));
not_empty_cv_.notify_one();
}
這段程式碼可以替換為:
void Push(const T& t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(t);
not_empty_cv_.notify_one();
}
void Push(T&& t) {
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(std::move(t));
not_empty_cv_.notify_one();
}