1. 程式人生 > >C++實現執行緒安全的佇列

C++實現執行緒安全的佇列

  C++標準庫已經提供了std::queue這一佇列容器,但不是執行緒安全的。std::queue這個容器已經提供了pop(),push(),empty()等這些讀寫操作容器的函式,只要在這些函式上面加個鎖,就可以使其執行緒安全。
  在C++原有容器上面進行簡單封裝即可實現一個執行緒安全的佇列,實現程式碼如下:

#include <iostream>
#include <string>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <memory>
template<class T, class Container = std::queue<T>> class ThreadSafeQueue { public: ThreadSafeQueue() = default; template <class Element> void Push(Element&& element) { std::lock_guard<std::mutex> lock(mutex_); queue_.push(std::forward<Element>(element)); not_empty_cv_.notify_one(); } void
WaitAndPop(T& t) { std::unique_lock<std::mutex> lock(mutex_); not_empty_cv_.wait(lock, []() { return !queue_.empty(); }); t = std::move(queue_.front()); queue_.pop() } std::shared_ptr<T> WaitAndPop() { std::unique_lock<std::mutex> lock(mutex_); not_empty_cv_.wait(lock, [this
]() { return !queue_.empty(); }); std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front()); queue_.pop(); return t_ptr; } bool TryPop(T& t) { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return false; } t = std::move(queue_.front()); queue_.pop() return true; } std::shared_ptr<T> TryPop() { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return std::shared_ptr<T>(); } t = std::move(queue_.front()); std::shared_ptr<T> t_ptr = std::make_shared<T>(queue_.front()); queue_.pop(); return t_ptr; } bool IsEmpty() const { std::lock_guard<std::mutex> lock(mutex_); return queue_.empty(); } private: ThreadSafeQueue(const ThreadSafeQueue&) = delete; ThreadSafeQueue& operator=(const ThreadSafeQueue&) = delete; ThreadSafeQueue(ThreadSafeQueue&&) = delete; ThreadSafeQueue& operator=(ThreadSafeQueue&&) = delete; private: Container queue_; std::condition_variable not_empty_cv_; mutable std::mutex mutex_; };

程式碼分析:

1. 條件變數std::condition_variable的使用。

  使用條件變數的原因是為了實現WaitAndPop()這個函式。這個函式的作用是如果容器中有資料則進行Pop,如果沒有資料則進行等待。
每次在Pop資料的時候都會呼叫條件變數 not_empty_cv_.wait();,如果滿足wait()的條件則程式阻塞到這裡,等待其他執行緒Push資料之後進行not_empty_cv_.notify_one();來喚醒wait()。

2. 條件變數wait函式的作用。

  void WaitAndPop(T& t) {
    std::unique_lock<std::mutex> lock(mutex_);
    not_empty_cv_.wait(lock, []() {
      return !queue_.empty();
    });

    t = std::move(queue_.front());
    queue_.pop()
  }

  在上述程式碼中,wait()之前首先會使用std::unique_lock獲取互斥元mutex_,然後當代碼阻塞到wait()這裡的時候,Push函式也要獲取互斥元mutex_才能插入資料,這裡看起來十分奇怪。實際上wait()函式將執行緒阻塞到這裡的時候,會解鎖互斥元,所以其他執行緒仍然可以正常獲取mutex_。當其他執行緒進行notify_one()的時候,會喚醒剛才阻塞等待的執行緒,該執行緒會重新上鎖,然後判斷跳出wait()的條件是否滿足,如果滿足則跳出wait(),進行後面操作,否則繼續進行阻塞等待的動作。

3. 使用wait()時需要傳入鎖std::unique_lock。

  給條件變數wait()函式傳入的鎖是std::unique_lock,而不是std::lock_guard。因為執行緒在執行wait()函式的時候,如果進入等待,就要解鎖,被喚醒後又會重新加鎖,std::lock_guard功能比較簡單,不能滿足wait()的要求,所以要用std::unique_lock。

4. 使用wait()時需要傳入判斷條件,來防止假喚醒。

    std::unique_lock<std::mutex> lock(mutex_);
    not_empty_cv_.wait(lock, []() {
      return !queue_.empty();
    })

  這裡的判斷條件是lambda表示式[](){ return !queue_.empty(); }這個匿名函式就是用來判斷佇列是否不為空。這個條件判斷其實就是退出等待的條件。wait(lock, 退出等待條件函式),這條語句實際就是英文的表達習慣wait until ...,意思是執行緒進行等待直到滿足退出等待條件為止
  如果這裡沒有加判斷條件而是直接呼叫wait(lock);,這樣會導致兩個嚴重的問題:

  1. 假喚醒問題,這種問題導致容器中還沒有資料就進行了Pop。
  2. 如果容器中已經有了資料,在Pop的時候還要等wait()收到notify才能Pop,這樣導致了即使有資料也不能取出的問題。

5. 成員變數std::mutex要用mutable修飾。

  mutable std::mutex mutex_;

  mutable的作用是突破const的限制,使得一個成員函式在const修飾的時候,依然可以更改mutable修飾的成員變數。因為成員變數mutex_,會不斷地加鎖解鎖,所以互斥元必須是可變的。

6. 使用引用摺疊來簡化程式碼。

  template <class Element>
  void Push(Element&& element) {
    std::lock_guard<std::mutex> lock(mutex_);
    queue_.push(std::forward<Element>(element));
    not_empty_cv_.notify_one();
  }

這段程式碼可以替換為:

void Push(const T& t) {
  std::lock_guard<std::mutex> lock(mutex_);
  queue_.push(t);
  not_empty_cv_.notify_one();
}

void Push(T&& t) {
  std::lock_guard<std::mutex> lock(mutex_);
  queue_.push(std::move(t));
  not_empty_cv_.notify_one();
}