簡單C++執行緒池
Java 中有一個很方便的 ThreadPoolExecutor,可以用做執行緒池。想找一下 C++ 的類似設施,尤其是能方便理解底層原理可上手的。網上找到的 demo,基本都是介紹的 projschj 的C++11執行緒池。這份原始碼最後的commit日期是2014年,現在是2021年了,本文將在閱讀原始碼的基礎上,對這份程式碼進行一些改造。關於執行緒池,目前網上講解最好的一篇文章是這篇 Java執行緒池實現原理及其在美團業務中的實踐,值得一讀。
改造後的原始碼在 https://gitee.com/zhcpku/ThreadPool 進行提供。
projschj 的程式碼
1. 資料結構
主要包含兩個部分,一組執行執行緒、一個任務佇列。執行執行緒空閒時,總是從任務佇列中取出任務執行。具體執行邏輯後面會進行解釋。
class ThreadPool {
// ...
private:
using task_type = std::function<void()>;
// need to keep track of threads so we can join them
std::vector<std::thread> workers;
// the task queue
std::queue<task_type> tasks;
};
2. 同步機制
這裡包括一把鎖、一個條件變數,還有一個bool變數:
- 鎖用於保護任務佇列、條件變數、bool變數的訪問;
- 條件變數用於喚醒執行緒,通知任務到來、或者執行緒池停用;
- bool變數用於停用執行緒池;
class ThreadPool {
// ...
private:
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
3. 執行緒池啟動
啟動執行緒池,首先要做的是構造指定數量的執行緒出來,然後讓每個執行緒開始執行。
對於每個執行緒,執行邏輯是一樣的:嘗試從任務佇列中獲取任務並執行,如果拿不到任務、並且執行緒池沒有被停用,則睡眠等待。
這裡執行緒等待任務使用的是條件變數,而不是訊號量或者自旋鎖等其他設施,是為了讓執行緒睡眠,避免CPU空轉浪費。
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t thread_num)
: stop(false)
{
for (size_t i = 0; i < thread_num; ++i) {
workers.emplace_back([this] {
for (;;) {
task_type task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(
lock, [this] { return this->stop || !this->tasks.empty(); });
if (this->stop && this->tasks.empty()) {
return;
}
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
});
}
}
4.停用執行緒池
執行緒的停用,需要讓每一個執行緒停下來,並且等到每個執行緒都停止再退出主執行緒才是比較安全的操作。
停止分三步:設定停止標識、通知到每一個執行緒(睡眠的執行緒需要喚醒)、等到每一個執行緒停止。
// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread& worker : workers) {
worker.join();
}
}
5. 提交新任務
這是整個執行緒池的核心,也是寫的最複雜,用C++新特性最多的地方,包括但不限於:
自動型別推導、變長模板函式、右值引用、完美轉發、原地構造、智慧指標、future、bind ……
順帶提一句,要是早有變長模板引數,std::min / std::max 也不至於只能比較兩個數大小,再多就得用大括號包起來作為 initialize_list 傳進去了。
這裡提交任務時,由於我們的任務型別定義為一個無參無返回值的函式物件,所以需要先通過 std::bind 把函式及其引數打包成一個 對應型別的可呼叫物件,返回值將通過 future 非同步獲取。然後是要把這個任務插入任務佇列末尾,因為任務佇列被多執行緒併發訪問,所以需要加鎖。
另外需要處理的兩個情況,一個是執行緒睡眠時,新入隊任務需要主要喚醒執行緒;另一個是執行緒池要停用時,入隊操作是非法的。
// add new work item to the pool
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if (stop) {
throw std::runtime_error("enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return res;
}
改造
以上程式碼已經足以闡釋執行緒池基本原理了,以下改進主要從可靠性、易用性、使用場景等方面進行改進。
1. non-copyable
執行緒池本身應該是不可複製的,這裡我們通過刪除拷貝建構函式和賦值操作符,以及其對用的右值引用版本來實現:
class ThreadPool {
// ...
private:
// non-copyable
ThreadPool(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;
};
2. default-thread-number
除了手動指定執行緒個數,更合適的做法是主動探測CPU支援的物理執行緒數,並以此作為執行執行緒個數:
class ThreadPool {
public:
explicit ThreadPool(size_t thread_num = std::thread::hardware_concurrency());
size_t ThreadCount() { return workers.size(); }
// ...
};
3. 延遲建立執行緒
執行緒不必一次就創建出來,可以等到任務到來的時候再建立,降低資源佔用。
// TBD
4. 臨時執行緒數量擴充
執行緒池的應用場景主要針對的是CPU密集型應用,但是遇到IO密集型場景,也要保證可用性。如果我們的執行緒個數固定的話,會出現一些問題,比如:
- 幾個IO任務佔據了執行緒,並且進入了睡眠,這個時候CPU空閒,但是後面的任務卻得不到處理,任務佇列越來越長;
- 幾個執行緒在睡眠等待某個訊號或者資源,但是這個訊號或資源的提供者是任務佇列中的某個任務,沒有空閒執行緒,提供者永遠提供此訊號或資源。
因此我們需要一種機制,臨時擴充執行緒數量,從執行緒池中的睡眠執行緒手中“搶回”CPU。
其實,更好的解決辦法是改造執行緒池,使用固定個數的執行緒,然後把任務打包到協程中執行,當遇到IO的時候協程主動讓出CPU,這樣其他任務就能上CPU運行了。畢竟,多執行緒擅長處理的是CPU密集型任務,多協程才是處理IO密集型任務的。…… 這不就是協程庫了嘛!比如 libco、libgo 就是這種解決方案。
// TBD
5. 執行緒池停用啟動
上面的執行緒池,其啟動停止時機分別是構造和析構的時候,還是太粗糙了。我們為其提供手動啟動、停止的函式,並支援停止之後重新啟動:
// TBD
總結
不幹了,2021年了,研究協程庫去了!
參考文獻