用C++11實現一個有界的阻塞隊列
阿新 • • 發佈:2018-08-27
ide true 多線程編程 from ces locker sid const read
對於一個無界的阻塞隊列而言,其實現非常簡單,即用一個鎖(鎖隊列)+ 一個條件變量(判空)即可。那麽對於一個有界阻塞隊列而言,其隊列的容量有上限,其實只要再加一個條件變量用來判斷是否滿即可。
綜上,我們需要
- mutex: 保護隊列的讀寫操作
- notEmptyCV: 條件變量,在take時wait, 在put之後notify
- notFullCV: 條件變量, 在put時wait, 在take之後notify.
C++11提供了豐富的多線程編程庫,包括加鎖解鎖、同步原語等封裝。我的實現如下:
#pragma once // Must use it higher than C++11 #include <condition_variable> #include <mutex> #include <queue> #include <vector> #include <assert.h> #include <iostream> #include <thread> using namespace std::chrono_literals; template<typename T> class BoundedBlockingQueue { public: // make class non-copyable BoundedBlockingQueue(const BoundedBlockingQueue<T>&) = delete; BoundedBlockingQueue& operator=(const BoundedBlockingQueue<T>&) = delete; explicit BoundedBlockingQueue<T>(size_t maxSize) : mtx_(), maxSize_(maxSize) { } void put(const T& x) { // std::cout << std::this_thread::get_id() << " puting" << x << std::endl; std::unique_lock<std::mutex> locker(mtx_); notFullCV_.wait(locker, [this]() {return queue_.size() < maxSize_; }); assert(locker.owns_lock()); assert(queue_.size() < maxSize_); queue_.push(x); notEmptyCV_.notify_one(); } T take() { // std::cout << std::this_thread::get_id() << " taking" << std::endl; std::unique_lock<std::mutex> locker(mtx_); notEmptyCV_.wait(locker, [this]() {return !queue_.empty(); }); assert(locker.owns_lock()); assert(!queue_.empty()); T front(queue_.front()); queue_.pop(); notFullCV_.notify_one(); return front; } // with time out // @param timeout: max wait time, ms // @param outRes: reference result if take successfully // @return take successfully or not bool take(int timeout, T& outRes) { std::unique_lock<std::mutex> locker(mtx_); notEmptyCV_.wait_for(locker, timeout*1ms, [this]() {return !queue_.empty(); }); assert(locker.owns_lock()); if(queue_.empty()) return false; outRes = queue_.front(); queue_.pop(); notFullCV_.notify_one(); return true; } // Checking BlockingQueue status from outside // DO NOT use it as internal call, which will cause DEADLOCK bool empty() const { std::unique_lock<std::mutex> locker(mtx_); return queue_.empty(); } size_t size() const { std::unique_lock<std::mutex> locker(mtx_); return queue_.size(); } size_t maxSize() const { return maxSize_; } private: mutable std::mutex mtx_; std::condition_variable notEmptyCV_; std::condition_variable notFullCV_; size_t maxSize_; std::queue<T> queue_; };
用C++11實現一個有界的阻塞隊列