C++11多執行緒-條件變數(std::condition_variable)
前面我們介紹了執行緒(std::thread)和互斥量(std::mutex),互斥量是多執行緒間同時訪問某一共享變數時,保證變數可被安全訪問的手段。在多執行緒程式設計中,還有另一種十分常見的行為:執行緒同步。執行緒同步是指執行緒間需要按照預定的先後次序順序進行的行為。C++11對這種行為也提供了有力的支援,這就是條件變數。條件變數位於標頭檔案condition_variable下。本章我們將簡要介紹一下該類,在文章的最後我們會綜合運用std::mutex和std::condition_variable,實現一個chan類,該類可在多執行緒間安全的通訊,具有廣泛的應用場景。
1. std::condition_variable
條件變數提供了兩類操作:wait和notify。這兩類操作構成了多執行緒同步的基礎。
1.1 wait
wait是執行緒的等待動作,直到其它執行緒將其喚醒後,才會繼續往下執行。下面通過虛擬碼來說明其用法:
std::mutex mutex; std::condition_variable cv; // 條件變數與臨界區有關,用來獲取和釋放一個鎖,因此通常會和mutex聯用。 std::unique_lock lock(mutex); // 此處會釋放lock,然後在cv上等待,直到其它執行緒通過cv.notify_xxx來喚醒當前執行緒,cv被喚醒後會再次對lock進行上鎖,然後wait函式才會返回。 // wait返回後可以安全的使用mutex保護的臨界區內的資料。此時mutex仍為上鎖狀態 cv.wait(lock)
需要注意的一點是, wait有時會在沒有任何執行緒呼叫notify的情況下返回,這種情況就是有名的ofollow,noindex"> spurious wakeup 。因此當wait返回時,你需要再次檢查wait的前置條件是否滿足,如果不滿足則需要再次wait。wait提供了過載的版本,用於提供前置檢查。
template <typename Predicate> void wait(unique_lock<mutex> &lock, Predicate pred) { while(!pred()) { wait(lock); } }
除wait外, 條件變數還提供了wait_for和wait_until,這兩個名稱是不是看著有點兒眼熟,std::mutex也提供了_for和_until操作。在C++11多執行緒程式設計中,需要等待一段時間的操作,一般情況下都會有xxx_for和xxx_until版本。前者用於等待指定時長,後者用於等待到指定的時間。
1.2 notify
瞭解了wait,notify就簡單多了:喚醒wait在該條件變數上的執行緒。notify有兩個版本:notify_one和notify_all。
- notify_one 喚醒等待的一個執行緒,注意只喚醒一個。
- notify_all 喚醒所有等待的執行緒。使用該函式時應避免出現驚群效應 。
其使用方式見下例:
std::mutex mutex; std::condition_variable cv; std::unique_lock lock(mutex); // 所有等待在cv變數上的執行緒都會被喚醒。但直到lock釋放了mutex,被喚醒的執行緒才會從wait返回。 cv.notify_all(lock)
2. 執行緒間通訊 - chan的實現
有了上面的基礎我們就可以設計我們的執行緒間通訊工具"chan"了。我們的設計目標:
- 線上程間安全的傳遞資料。golang社群有一句經典的話:不要通過共享記憶體來通訊,要通過通訊來共享記憶體。
- 消除執行緒執行緒同步帶來的複雜性。
我們先來看一下chan的實際使用效果, 生產者-消費者(一個生產者,多個消費者)
#include <stdio.h> #include <thread> #include "chan.h"// chan的標頭檔案 using namespace std::chrono; // 消費資料 void consume(chan<int> ch, int thread_id) { int n; while(ch >> n) { printf("[%d] %d\n", thread_id, n); std::this_thread::sleep_for(milliseconds(100)); } } int main() { chan<int> chInt(3); // 消費者 std::thread consumers[5]; for (int i = 0; i < 5; i++) { consumers[i] = std::thread(consume, chInt, i+1); } // 生產資料 for (int i = 0; i < 16; i++) { chInt << i; } chInt.close();// 資料生產完畢 for (std::thread &thr: consumers) { thr.join(); } return 0; }
附: 原始碼
下面附上chan的實現,該程式碼在g++和vc 2015下均編譯通過,其它平臺未驗證。
// chan.simple.h #pragma once #include <condition_variable>// std::condition_variable #include <list>// std::list #include <mutex>// std::mutex template <typename T> class chan { class queue_t { mutable std::mutex mutex_; std::condition_variable cv_; std::list<T> data_; const size_t capacity_;// data_容量 const bool enable_overflow_; bool closed_ = false;// 佇列是否已關閉 size_t pop_count_ = 0;// 計數,累計pop的數量 public: queue_t(size_t capacity) : capacity_(capacity == 0 ? 1 : capacity), enable_overflow_(capacity == 0) { } bool is_empty() const { return data_.empty(); } size_t free_count() const { // capacity_為0時,允許放入一個,但_queue會處於overflow狀態 return capacity_ - data_.size(); } bool is_overflow() const { return enable_overflow_ && data_.size() >= capacity_; } bool is_closed() const { std::unique_lock<std::mutex> lock(this->mutex_); return this->closed_; } // close以後的入chan操作會返回false, 而出chan則在佇列為空後才返回false void close() { std::unique_lock<std::mutex> lock(this->mutex_); this->closed_ = true; if (this->is_overflow()) { // 消除溢位 this->data_.pop_back(); } this->cv_.notify_all(); } template <typename TR> bool pop(TR &data) { std::unique_lock<std::mutex> lock(this->mutex_); this->cv_.wait(lock, [&]() { return !is_empty() || closed_; }); if (this->is_empty()) { return false;// 已關閉 } data = this->data_.front(); this->data_.pop_front(); this->pop_count_++; if (this->free_count() == 1) { // 說明以前是full或溢位狀態 this->cv_.notify_all(); } return true; } template <typename TR> bool push(TR &&data) { std::unique_lock<std::mutex> lock(mutex_); cv_.wait(lock, [this]() { return free_count() > 0 || closed_; }); if (closed_) { return false; } data_.push_back(std::forward<TR>(data)); if (data_.size() == 1) { cv_.notify_all(); } // 當queue溢位,需等待queue回覆正常 if (is_overflow()) { const size_t old = this->pop_count_; cv_.wait(lock, [&]() { return old != pop_count_ || closed_; }); } return !this->closed_; } }; std::shared_ptr<queue_t> queue_; public: explicit chan(size_t capacity = 0) { queue_ = std::make_shared<queue_t>(capacity); } // 支援拷貝 chan(const chan &) = default; chan &operator=(const chan &) = default; // 支援move chan(chan &&) = default; chan &operator=(chan &&) = default; // 入chan,支援move語義 template <typename TR> bool operator<<(TR &&data) { return queue_->push(std::forward<TR>(data)); } // 出chan(支援相容型別的出chan) template <typename TR> bool operator>>(TR &data) { return queue_->pop(data); } // close以後的入chan操作返回false, 而出chan則在佇列為空後才返回false void close() { queue_->close(); } bool is_closed() const { return queue_->is_closed(); } };