1. 程式人生 > >基於C++11實現執行緒池的工作原理.

基於C++11實現執行緒池的工作原理.

基於C++11實現執行緒池的工作原理.

文章目錄


不久前寫過一篇執行緒池,那時候剛用C++寫東西不久,很多C++標準庫裡面的東西沒怎麼用,今天基於C++11重新寫了一個執行緒池。

簡介

執行緒池(thread pool):一種執行緒的使用模式,執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。

執行緒池的組成

1、執行緒池管理器

建立一定數量的執行緒,啟動執行緒,調配任務,管理著執行緒池。
本篇執行緒池目前只需要啟動(start()),停止方法(stop()),及任務新增方法(addTask).
start()建立一定數量的執行緒池,進行執行緒迴圈.
stop()停止所有執行緒迴圈,回收所有資源.
addTask()新增任務.

2、工作執行緒

執行緒池中執行緒,線上程池中等待並執行分配的任務.
本篇選用條件變數實現等待與通知機制.

3、任務介面,

新增任務的介面,以供工作執行緒排程任務的執行。

4、任務佇列

用於存放沒有處理的任務。提供一種緩衝機制
同時任務佇列具有排程功能,高優先順序的任務放在任務佇列前面;本篇選用priority_queue 與pair的結合用作任務優先佇列的結構.

執行緒池工作的四種情況.

假設我們的執行緒池大小為3,任務佇列目前不做大小限制.

1、主程式當前沒有任務要執行,執行緒池中的任務佇列為空閒狀態.

此情況下所有工作執行緒處於空閒的等待狀態,任務緩衝佇列為空.

2、主程式新增小於等於執行緒池中執行緒數量的任務.

此情況基於情形1,所有工作執行緒已處在等待狀態,主執行緒開始新增三個任務,新增後通知(notif())喚醒執行緒池中的執行緒開始取(take())任務執行. 此時的任務緩衝佇列還是空。

3、主程式新增任務數量大於當前執行緒池中執行緒數量的任務.

此情況發生情形2後面,所有工作執行緒都在工作中,主執行緒開始新增第四個任務,新增後發現現線上程池中的執行緒用完了,於是存入任務緩衝佇列。工作執行緒空閒後主動從任務佇列取任務執行.

4、主程式新增任務數量大於當前執行緒池中執行緒數量的任務,且任務緩衝佇列已滿.

此情況發生情形3且設定了任務緩衝佇列大小後面,主程式新增第N個任務,新增後發現池子中的執行緒用完了,任務緩衝佇列也滿了,於是進入等待狀態、等待任務緩衝佇列中的任務騰空通知。
但是要注意這種情形會阻塞主執行緒,本篇暫不限制任務佇列大小,必要時再來優化.

實現

等待通知機制通過條件變數實現,Logger和CurrentThread,用於除錯,可以無視.

#ifndef _THREADPOOL_HH
#define _THREADPOOL_HH

#include <vector>
#include <utility>
#include <queue>
#include <thread>
#include <functional>
#include <mutex>

#include "Condition.hh"

class ThreadPool{
public:
  static const int kInitThreadsSize = 3;
  enum taskPriorityE { level0, level1, level2, };
  typedef std::function<void()> Task;
  typedef std::pair<taskPriorityE, Task> TaskPair;

  ThreadPool();
  ~ThreadPool();

  void start();
  void stop();
  void addTask(const Task&);
  void addTask(const TaskPair&);

private:
  ThreadPool(const ThreadPool&);//禁止複製拷貝.
  const ThreadPool& operator=(const ThreadPool&);

  struct TaskPriorityCmp
  {
    bool operator()(const ThreadPool::TaskPair p1, const ThreadPool::TaskPair p2)
    {
        return p1.first > p2.first; //first的小值優先
    }
  };

  void threadLoop();
  Task take();

  typedef std::vector<std::thread*> Threads;
  typedef std::priority_queue<TaskPair, std::vector<TaskPair>, TaskPriorityCmp> Tasks;

  Threads m_threads;
  Tasks m_tasks;

  std::mutex m_mutex;
  Condition m_cond;
  bool m_isStarted;
};

#endif

//Cpp

#include <assert.h>

#include "Logger.hh" // debug
#include "CurrentThread.hh" // debug
#include "ThreadPool.hh"

ThreadPool::ThreadPool()
  :m_mutex(),
  m_cond(m_mutex),
  m_isStarted(false)
{

}

ThreadPool::~ThreadPool()
{
  if(m_isStarted)
  {
    stop();
  }
}

void ThreadPool::start()
{
  assert(m_threads.empty());
  m_isStarted = true;
  m_threads.reserve(kInitThreadsSize);
  for (int i = 0; i < kInitThreadsSize; ++i)
  {
    m_threads.push_back(new std::thread(std::bind(&ThreadPool::threadLoop, this)));
  }

}

void ThreadPool::stop()
{
  LOG_TRACE << "ThreadPool::stop() stop.";
  {
    std::unique_lock<std::mutex> lock(m_mutex);
    m_isStarted = false;
    m_cond.notifyAll();
    LOG_TRACE << "ThreadPool::stop() notifyAll().";
  }

  for (Threads::iterator it = m_threads.begin(); it != m_threads.end() ; ++it)
  {
    (*it)->join();
    delete *it;
  }
  m_threads.clear();
}


void ThreadPool::threadLoop()
{
  LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " start.";
  while(m_isStarted)
  {
    Task task = take();
    if(task)
    {
      task();
    }
  }
  LOG_TRACE << "ThreadPool::threadLoop() tid : " << CurrentThread::tid() << " exit.";
}

void ThreadPool::addTask(const Task& task)
{
  std::unique_lock<std::mutex> lock(m_mutex);
  /*while(m_tasks.isFull())
    {//when m_tasks have maxsize
      cond2.wait();
    }
  */
  TaskPair taskPair(level2, task);
  m_tasks.push(taskPair);
  m_cond.notify();
}

void ThreadPool::addTask(const TaskPair& taskPair)
{
  std::unique_lock<std::mutex> lock(m_mutex);
  /*while(m_tasks.isFull())
    {//when m_tasks have maxsize
      cond2.wait();
    }
  */
  m_tasks.push(taskPair);
  m_cond.notify();
}

ThreadPool::Task ThreadPool::take()
{
  std::unique_lock<std::mutex> lock(m_mutex);
  //always use a while-loop, due to spurious wakeup
  while(m_tasks.empty() && m_isStarted)
  {
    LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wait.";
    m_cond.wait(lock);
  }

  LOG_TRACE << "ThreadPool::take() tid : " << CurrentThread::tid() << " wakeup.";

  Task task;
  Tasks::size_type size = m_tasks.size();
  if(!m_tasks.empty() && m_isStarted)
  {
    task = m_tasks.top().second;
    m_tasks.pop();
    assert(size - 1 == m_tasks.size());
    /*if (TaskQueueSize_ > 0)
    {
      cond2.notify();
    }*/
  }

  return task;

}

測試程式

start() 、stop()

測試執行緒池基本的建立退出工作,及檢測資源是否正常回收.

int main()
{
  {
  ThreadPool threadPool;
  threadPool.start();

  getchar();
  }

  getchar();

  return 0;
}
./test.out 
2018-11-25 16:50:36.054805 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3680 start.
2018-11-25 16:50:36.054855 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3680 wait.
2018-11-25 16:50:36.055633 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3679 start.
2018-11-25 16:50:36.055676 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3679 wait.
2018-11-25 16:50:36.055641 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3681 start.
2018-11-25 16:50:36.055701 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3681 wait.
2018-11-25 16:50:36.055736 [TRACE] [ThreadPool.cpp:53] [threadLoop] ThreadPool::threadLoop() tid : 3682 start.
2018-11-25 16:50:36.055746 [TRACE] [ThreadPool.cpp:72] [take] ThreadPool::take() tid : 3682 wait.

2018-11-25 16:51:01.411792 [TRACE] [ThreadPool.cpp:36] [stop] ThreadPool::stop() stop.
2018-11-25 16:51:01.411863 [TRACE] [ThreadPool.cpp:39] [stop] ThreadPool::stop() notifyAll().
2018-11-25 16:51:01.411877 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3680 wakeup.
2018-11-25 16:51:01.411883 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3680 exit.
2018-11-25 16:51:01.412062 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3682 wakeup.
2018-11-25 16:51:01.412110 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3682 exit.
2018-11-25 16:51:01.413052 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3679 wakeup.
2018-11-25 16:51:01.413098 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3679 exit.
2018-11-25 16:51:01.413112 [TRACE] [ThreadPool.cpp:76] [take] ThreadPool::take() tid : 3681 wakeup.
2018-11-25 16:51:01.413141 [TRACE] [ThreadPool.cpp:62] [threadLoop] ThreadPool::threadLoop() tid : 3681 exit.

addTask()、PriorityTaskQueue

測試新增任務介面,及優先任務佇列.

主執行緒首先添加了5個普通任務、 1s後新增一個高優先順序任務,當前3個執行緒中的最先一個空閒後,會最先執行後面新增的priorityFunc().

std::mutex g_mutex;

void priorityFunc()
{
  for (int i = 1; i < 4; ++i)
  {
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::lock_guard<std::mutex> lock(g_mutex);
      LOG_DEBUG << "priorityFunc() [" << i << "at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
  }

}

void testFunc()
{
  // loop to print character after a random period of time
  for (int i = 1; i < 4; ++i)
  {
      std::this_thread::sleep_for(std::chrono::seconds(1));
      std::lock_guard<std::mutex> lock(g_mutex);
      LOG_DEBUG << "testFunc() [" << i << "] at thread [ " << CurrentThread::tid() << "] output";// << std::endl;
  }

}


int main()
{
  ThreadPool threadPool;
  threadPool.start();

  for(int i = 0; i < 5 ; i++)
    threadPool.addTask(testFunc);

  std::this_thread::sleep_for(std::chrono::seconds(1));

  threadPool.addTask(ThreadPool::TaskPair(ThreadPool::level0, priorityFunc));

  getchar();
  return 0;
}
./test.out 
2018-11-25 18:24:20.886837 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4121 start.
2018-11-25 18:24:20.886893 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid : 4121 wakeup.
2018-11-25 18:24:20.887580 [TRACE] [ThreadPool.cpp:56] [threadLoop] ThreadPool::threadLoop() tid : 4120 start.
2018-11-25 18:24:20.887606 [TRACE] [ThreadPool.cpp:103] [take] ThreadPool::take() tid :