1. 程式人生 > >muduo原始碼分析:ThreadPool 執行緒池的實現

muduo原始碼分析:ThreadPool 執行緒池的實現

原始碼:

https://github.com/chenshuo/muduo/blob/master/muduo/base/ThreadPool.h

https://github.com/chenshuo/muduo/blob/master/muduo/base/ThreadPool.cc

執行緒池ThreadPool用到了前面分析的Thread、MutexLock、Condition。ThreadPool可以設定工作執行緒的數量,並向任務佇列放入任務。放入到任務佇列中的任務將由某個工作執行緒執行。


ThreadPool.h

 public:
  typedef boost::function<void ()> Task;

  explicit ThreadPool(const string& nameArg = string("ThreadPool"));
  ~ThreadPool();

  // Must be called before start().
  void setMaxQueueSize(int maxSize) { maxQueueSize_ = maxSize; }	//設定執行緒池執行緒最大數目大小
  void setThreadInitCallback(const Task& cb)				//設定執行緒執行前的回撥函式
  { threadInitCallback_ = cb; }

  void start(int numThreads);        //啟動執行緒池,numThreads是執行緒池的容量
  void stop();                       //終止執行緒池

  const string& name() const
  { return name_; }

  size_t queueSize() const;

  // Could block if maxQueueSize > 0
  void run(const Task& f);
#ifdef __GXX_EXPERIMENTAL_CXX0X__
  void run(Task&& f);
#endif

成員變數

private:
  bool isFull() const REQUIRES(mutex_);			//判滿
  void runInThread();					//執行緒池的執行緒執行函式
  Task take();						//取任務函式

  mutable MutexLock mutex_;				//mutable表示在const函式也可以改變它
  Condition notEmpty_ GUARDED_BY(mutex_);		//任務佇列queue_不為空了,有任務可以執行了,進而喚醒等待的執行緒。
  Condition notFull_ GUARDED_BY(mutex_);		//任務佇列queue_不滿了,有空間可以使用了,進而喚醒等待的執行緒。
  string name_;
  Task threadInitCallback_;				//執行緒初始化回撥函式
  boost::ptr_vector<muduo::Thread> threads_;	        //工作執行緒容器(執行緒陣列)
  std::deque<Task> queue_ GUARDED_BY(mutex_);	        //任務佇列
  size_t maxQueueSize_;				        //佇列最大大小
  bool running_;					//執行緒池執行標誌

 

使用boost::ptr_vector存放Thead。

每個Task都是typedef boost::function<void ()> Task; 所有任務都放到queue_中。需要使用條件變數來維護執行緒將的同步,比如:通知其他執行緒有任務到來了,可以向任務佇列放任務了等等。 


 

ThreadPool::ThreadPool()

ThreadPool::ThreadPool(const string& nameArg)
  : mutex_(),
    notEmpty_(mutex_),
    notFull_(mutex_),
    name_(nameArg),
    maxQueueSize_(0),
    running_(false)
{
}

建構函式對成員變數進行初始化(使用初始化列表)。
 

ThreadPool::~ThreadPool()

ThreadPool::~ThreadPool()
{
  if (running_)
  {
    stop();
  }
}

解構函式會呼叫stop, 喚醒所有休眠的執行緒,然後等待所有執行緒處理完。

 

ThreadPool::stop()

void ThreadPool::stop()    //終止執行緒池
{
  { // new scope
  MutexLockGuard lock(mutex_); // ctor of MutexLockGuard will lock mutex_
  running_ = false;
  notEmpty_.notifyAll();     // 喚醒所有休眠的工作執行緒
  } // dtor of MutexLockGuard will unlock mutex_
  for_each(threads_.begin(),
           threads_.end(),
           boost::bind(&muduo::Thread::join, _1)); // 等待所有工作執行緒結束
}

 

ThreadPool::start()

void ThreadPool::start(int numThreads)    //引數為執行緒數量,會建立相應數量的執行緒,執行緒函式為ThreadPool::runInThread
{
  assert(threads_.empty());
  running_ = true;              //啟動標誌
  threads_.reserve(numThreads); // 保證threads_容量至少為numThreads
  for (int i = 0; i < numThreads; ++i)
  {
    char id[32];
    snprintf(id, sizeof id, "%d", i+1);

//建立工作執行緒並加入執行緒陣列,構造Thread(this.runInThread,name+id)並加入執行緒陣列。執行緒函式是ThreadPool::runInThread 
    threads_.push_back(new muduo::Thread(
          boost::bind(&ThreadPool::runInThread, this), name_+id));
    threads_[i].start();    //啟動每個執行緒,但是由於執行緒執行的函式是runInThread,所以會阻塞
  }
  if (numThreads == 0 && threadInitCallback_)
  {
    threadInitCallback_();
  }
}

引數為執行緒數量,會建立相應數量的執行緒,執行體為ThreadPool::runInThread

void ThreadPool::runInThread()    //執行緒函式
{
  try
  {
    if (threadInitCallback_)    //如果設定了就執行,進行一些初始化設定
    {
      threadInitCallback_();
    }
    while (running_)        //當執行緒池啟動之後,就在while迴圈中不停地取任務執行
    {
      Task task(take());    //從任務佇列取出一個任務
      if (task)
      {
        task();            //執行該任務
      }
    }
  }
  catch (const Exception& ex)    //異常處理
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    fprintf(stderr, "stack trace: %s\n", ex.stackTrace());
    abort();
  }
  catch (const std::exception& ex)
  {
    fprintf(stderr, "exception caught in ThreadPool %s\n", name_.c_str());
    fprintf(stderr, "reason: %s\n", ex.what());
    abort();
  }
  catch (...)
  {
    fprintf(stderr, "unknown exception caught in ThreadPool %s\n", name_.c_str());
    throw; // rethrow
  }
}

獲取一個task

ThreadPool::Task ThreadPool::take()	//獲取一個task
{
  MutexLockGuard lock(mutex_);
  // always use a while-loop, due to spurious wakeup
  while (queue_.empty() && running_)
  {
    notEmpty_.wait();	//沒有任務,則等待(利用條件變數)
  }
  Task task;
  if (!queue_.empty())	//有任務了,就返回一個任務
  {
    task = queue_.front();	//獲取任務
    queue_.pop_front();
    if (maxQueueSize_ > 0)
    {
      notFull_.notify();	//當解決了一個任務之後,任務佇列肯定不是滿的,通知某個等待向佇列放入task執行緒。
    }
  }
  return task;			//返回任務
}

條件變數的wait操作使用while包裹,預防“虛假喚醒”(如被其他執行緒搶佔了)。

 


向執行緒池新增task

void ThreadPool::run(const Task& task)	//向執行緒池新增task
{
  if (threads_.empty()
  {
    task();			//如果沒有子執行緒,就在主執行緒中執行該task
  }
  else
  {
    MutexLockGuard lock(mutex_);
    while (isFull())	    //如果task佇列queue_滿了,就等待
    {
      notFull_.wait();
    }
    assert(!isFull());

    queue_.push_back(task);	//將任務加入佇列
    notEmpty_.notify();		//當添加了某個任務之後,任務佇列肯定不是空的,通知某個等待從queue_中取task的執行緒
  }
}

 

使用示例

struct Foo {
public:
    void DoWork() {
        std::cout << "run member function in thread:" << CurrentThread::tid() << std::endl;
    }
    void operator() (){
        std::cout << "run functor in thread:" << CurrentThread::tid() << std::endl;
    }
};

void Task1()
{
    std::cout << "function run in thread:"  << CurrentThread::tid() << std::endl;
}


int main()
{
    ThreadPool tp("TestThreadPool");
    tp.setMaxQueueSize(10);

    tp.start(4); // 啟動4個工作執行緒,啟動之後,由於任務佇列queue_為空,所以所有工作執行緒都休眠了
    tp.run(Task1); // 放入一個task,會喚醒某個工作執行緒

    Foo f;
    tp.run(boost::bind(&Foo::DoWork, &f));
    tp.run(f);

    tp.run( [](){ std::cout << "lambda function run in thread:" << CurrentThread::tid() << std::endl; });

    typedef void(*pFunc)();
    pFunc pf = Task1;
    tp.run(pf);
}

可以看到,ThreadPool可以很方便的將某個task放到任務佇列中,該task會由某個執行緒執行。task使用boost::function表示,可以方便地將函式指標、普通函式、成員函式(結合boost::bind)、lambda、過載了函式呼叫運算子‘()’的類的物件(這些統稱為可呼叫物件)放入到任務隊列當中,非常方便。