1. 程式人生 > >用C++11實現一個有界的阻塞隊列

用C++11實現一個有界的阻塞隊列

ide true 多線程編程 from ces locker sid const read

對於一個無界的阻塞隊列而言,其實現非常簡單,即用一個鎖(鎖隊列)+ 一個條件變量(判空)即可。那麽對於一個有界阻塞隊列而言,其隊列的容量有上限,其實只要再加一個條件變量用來判斷是否滿即可。
綜上,我們需要

  • mutex: 保護隊列的讀寫操作
  • notEmptyCV: 條件變量,在take時wait, 在put之後notify
  • notFullCV: 條件變量, 在put時wait, 在take之後notify.

C++11提供了豐富的多線程編程庫,包括加鎖解鎖、同步原語等封裝。我的實現如下:

#pragma once
// Must use it higher than C++11

#include <condition_variable>
#include <mutex>
#include <queue>
#include <vector>
#include <assert.h>
#include <iostream>
#include <thread>

using namespace std::chrono_literals;

template<typename T>
class BoundedBlockingQueue {
public:
    // make class non-copyable
    BoundedBlockingQueue(const BoundedBlockingQueue<T>&) = delete;
    BoundedBlockingQueue& operator=(const BoundedBlockingQueue<T>&) = delete;

    explicit BoundedBlockingQueue<T>(size_t maxSize)
        : mtx_(),
        maxSize_(maxSize)
    {

    }

    void put(const T& x) {
    //  std::cout << std::this_thread::get_id() << " puting" << x << std::endl;
        std::unique_lock<std::mutex> locker(mtx_);
        notFullCV_.wait(locker, [this]() {return queue_.size() < maxSize_; });          
        assert(locker.owns_lock());
        assert(queue_.size() < maxSize_);

        queue_.push(x);
        notEmptyCV_.notify_one();
    }

    T take() {
    //  std::cout << std::this_thread::get_id() << " taking" << std::endl;
        std::unique_lock<std::mutex> locker(mtx_);
        notEmptyCV_.wait(locker, [this]() {return !queue_.empty(); });
        assert(locker.owns_lock());
        assert(!queue_.empty());

        T front(queue_.front());
        queue_.pop();
        notFullCV_.notify_one();

        return front;
    }

    // with time out
    // @param timeout: max wait time, ms
    // @param outRes: reference result if take successfully
    // @return take successfully or not
    bool take(int timeout, T& outRes) {
        std::unique_lock<std::mutex> locker(mtx_);
        notEmptyCV_.wait_for(locker, timeout*1ms, [this]() {return !queue_.empty(); });
        assert(locker.owns_lock());
        if(queue_.empty()) return false;
        
        outRes = queue_.front(); queue_.pop();
        notFullCV_.notify_one();

        return true;
    }

    // Checking BlockingQueue status from outside
    // DO NOT use it as internal call, which will cause DEADLOCK
    bool empty() const {
        std::unique_lock<std::mutex> locker(mtx_);
        return queue_.empty();
    }

    size_t size() const {
        std::unique_lock<std::mutex> locker(mtx_);
        return queue_.size();
    }

    size_t maxSize() const {
        return maxSize_;
    }

private:
    mutable std::mutex mtx_;
    std::condition_variable notEmptyCV_;
    std::condition_variable notFullCV_;
    size_t maxSize_;
    std::queue<T> queue_;
};

用C++11實現一個有界的阻塞隊列