1. 程式人生 > >muduo_base程式碼剖析之BlockingQueue、BoundedBlockingQueue

muduo_base程式碼剖析之BlockingQueue、BoundedBlockingQueue

BlockingQueue.h 無界佇列

程式碼功能:使用條件變數+互斥鎖,實現無界佇列(即生產者消費者模型),保證對臨界資源(佇列)的訪問是安全的
在這裡插入圖片描述

BlockingQueue類的原始碼
template<typename T>
class BlockingQueue : noncopyable
{
private:
  mutable MutexLock mutex_; //互斥鎖,保護共享的佇列queue_
  Condition         notEmpty_ GUARDED_BY(mutex_); //條件變數
  std::deque<T>     queue_ GUARDED_BY
(mutex_); //無界緩衝區佇列queue_ public: BlockingQueue(): mutex_(),notEmpty_(mutex_),queue_() {} void put(const T& x) { MutexLockGuard lock(mutex_); //修改佇列前,先加鎖 queue_.push_back(x); //向佇列中新增元素 notEmpty_.notify(); //通知當前佇列不為空,take()方法不阻塞 } void put(T&& x) { MutexLockGuard lock
(mutex_); queue_.push_back(std::move(x)); notEmpty_.notify(); } T take() //取出對頭元素 { MutexLockGuard lock(mutex_); // always use a while-loop, due to spurious wakeup while (queue_.empty()) //如果佇列是空 { notEmpty_.wait(); //則一直等待,直到佇列不為空 } assert(!queue_.empty()); //再次判斷佇列不為空
T front(std::move(queue_.front())); //取出佇列中的頭元素,賦值給front變數 queue_.pop_front(); //彈出front return std::move(front); //返回值為取出的對頭元素front } size_t size() const //返回佇列的大小 { MutexLockGuard lock(mutex_); return queue_.size(); } };

示例程式碼:BlockingQueue_test.cc

#include <muduo/base/BlockingQueue.h>
#include <muduo/base/CountDownLatch.h>
#include <muduo/base/Thread.h>

#include <memory>
#include <string>
#include <vector>
#include <stdio.h>
#include <unistd.h>

class Test
{
public:
  Test(int numThreads): latch_(numThreads)
  {
    for (int i = 0; i < numThreads; ++i) 
    {
      char name[32];
      snprintf(name, sizeof name, "work thread %d", i);
      //建立執行緒,繫結threadFunc執行緒函式
      threads_.emplace_back(new muduo::Thread(std::bind(&Test::threadFunc, this), muduo::string(name)));
    }
    for (auto& thr : threads_) 
    {
      thr->start(); //讓執行緒函式threadFunc跑起來
    }
  }

  void run(int times) //只有主執行緒生產產品,主執行緒是生產者
  {
    printf("waiting for count down latch\n");
	
	//1.主執行緒將一直阻塞在count_>0條件上,直到count_!>0
	//2.每個子執行緒啟動後,都會呼叫countDown()函式將count_--
	
    latch_.wait(); 
	//3.當count減為0時,wait被喚醒,繼續執行下面的程式碼
	
    printf("all threads started\n");
    for (int i = 0; i < times; ++i)
    {
      char buf[32];
      snprintf(buf, sizeof buf, "hello %d", i);
      queue_.put(buf); //主執行緒向queue_中放元素
      printf("tid=%d, put data = %s, size = %zd\n", muduo::CurrentThread::tid(), buf, queue_.size());
    }
  }

  void joinAll()
  {
    for (size_t i = 0; i < threads_.size(); ++i)
    {
      queue_.put("stop");
    }

    for (auto& thr : threads_)
    {
      thr->join();
    }
  }

private:
  void threadFunc() 
  {
    printf("tid=%d, %s started\n",
           muduo::CurrentThread::tid(),
           muduo::CurrentThread::name());

    latch_.countDown(); //每個執行緒都對latch_.count_--
    bool running = true;
    while (running)
    {
      //queue_.take():使用條件變數控制佇列,當佇列為空時,一直阻塞
      std::string d(queue_.take()); 
      printf("tid=%d, get data = %s, size = %zd\n", muduo::CurrentThread::tid(), d.c_str(), queue_.size());
      running = (d != "stop");
    }

    printf("tid=%d, %s stopped\n",
           muduo::CurrentThread::tid(),
           muduo::CurrentThread::name());
  }

  muduo::BlockingQueue<std::string> queue_;
  muduo::CountDownLatch latch_;
  std::vector<std::unique_ptr<muduo::Thread>> threads_; // 執行緒容器
};

void testMove()
{
  muduo::BlockingQueue<std::unique_ptr<int>> queue;
  queue.put(std::unique_ptr<int>(new int(42)));
  std::unique_ptr<int> x = queue.take();
  printf("took %d\n", *x);
  *x = 123;
  queue.put(std::move(x));
  std::unique_ptr<int> y = queue.take();
  printf("took %d\n", *y);
}

int main()
{
  Test t(5);
  t.run(100); 
  t.joinAll();

  testMove();

  printf("number of created threads %d\n", muduo::Thread::numCreated());
}

BoundedBlockingQueue 有界環形佇列

有界緩衝區:與無界緩衝區相比,多了一個條件變數notFull成員,並且使用boost庫的環形緩衝區。
在這裡插入圖片描述

template<typename T>
class BoundedBlockingQueue : noncopyable
{
private:
  mutable MutexLock          mutex_;
  Condition                  notEmpty_ ;
  Condition                  notFull_ ;
  boost::circular_buffer<T>  queue_ ; // 環形佇列
  
public:
  explicit BoundedBlockingQueue(int maxSize) //maxSize環形佇列容量
    : mutex_(),notEmpty_(mutex_),notFull_(mutex_),queue_(maxSize)
  {}

  void put(const T& x)
  {
    MutexLockGuard lock(mutex_);
    while (queue_.full())
    {
      notFull_.wait();
    }
    assert(!queue_.full());
    queue_.push_back(x);
    notEmpty_.notify();
  }

  T take()
  {
    MutexLockGuard lock(mutex_);
    while (queue_.empty())
    {
      notEmpty_.wait();
    }
    assert(!queue_.empty());
    T front(queue_.front());
    queue_.pop_front();
    notFull_.notify();
    return front;
  }

  bool empty() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.empty();
  }

  bool full() const
  {
    MutexLockGuard lock(mutex_);
    return queue_.full();
  }

  size_t size() const //有效元素個數
  {
    MutexLockGuard lock(mutex_);
    return queue_.size();
  }

  size_t capacity() const //佇列容量
  {
    MutexLockGuard lock(mutex_);
    return queue_.capacity();
  }
};