c++11多執行緒與執行緒池
阿新 • • 發佈:2018-12-09
最近需要開發一個高效能運算庫,涉及到c++多執行緒的應用,上次做類似的事情已經是4年多以前了,印象中還頗有些麻煩。悔當初做了就算了,也沒想著留點記錄什麼的。這次又研究了一番,發現用上c++11特性之後,現在已經比較簡單了,在此記錄一下。
最簡單的多執行緒情況,不涉及公共變數,各個執行緒之間獨立執行,主執行緒只負責傳入引數並接收執行結果。這種情況也是多執行緒效能最好的場景,因為不涉及鎖的問題。之前c++標準庫對多執行緒支援並不好,要麼用boost執行緒庫,要麼用作業系統提供的執行緒自己輪。自從c++11之後,標準庫對多執行緒支援好多了。下面給出一個簡單的多執行緒示例程式碼。
#include <thread> #include <future> #include <vector> #include <iostream> using namespace std; struct result { vector<double> x; double y; }; void f1(promise<result> &res) { vector<double> vec(2); vec[0] = 1.5; vec[1] = 2.2; res.set_value({ vec, 4.7 }); } double f2(double x) { return sin(x); } int main(int argc, char* argv[]) { promise<result> res1; packaged_task<double(double)> res2(f2); future<result> ft1 = res1.get_future(); future<double> ft2 = res2.get_future(); thread th1(f1, ref(res1)); // 執行緒引數均為copy,ref代表引用,必須顯式給出 thread th2(move(res2), 3.14/6); // 為了提高效能採用move ft1.wait(); ft2.wait(); result r1 = ft1.get(); double r2 = ft2.get(); th1.join(); // 這兩句對此例不必要,僅為了邏輯自洽給出 th2.join(); cout << "result: x = (" << r1.x[0] << ", " << r1.x[1] << "), y = " << r1.y << endl; cout << "sin(pi/6) = " << r2 << endl; return 0; }
這段程式碼分別用了promise和packaged_task兩種方式來新建執行緒並獲得返回值。注意新建的執行緒通過複製引數來實現,因此對於引用的引數要用std::ref來宣告,否則無法獲得返回值。
對於大規模的計算,通常都把計算劃分為小塊,然後塞給新執行緒。小塊計算的數量會很多,頻繁建立和銷燬執行緒的開銷也很大,因此通常的做法是利用執行緒池。顧名思義,執行緒池中有若干執行緒,當有新任務來,就把任務分給執行緒執行,執行結束後該執行緒再等候下一個任務。下面給出執行緒池的實現,
ThreadPool.hpp
#pragma once #ifndef THREAD_POOL_H #define THREAD_POOL_H #include <vector> #include <queue> #include <thread> #include <atomic> #include <condition_variable> #include <future> namespace ThreadPool { #define MAX_THREAD_NUM 8 //執行緒池,可以提交變參函式或lambda表示式的匿名函式執行,可以獲取執行返回值 //支援類成員函式,支援類靜態成員函式或全域性函式,Operator()函式等 class ThreadPool { typedef std::function<void()> Task; private: std::vector<std::thread> m_pool; // 執行緒池 std::queue<Task> m_tasks; // 任務佇列 std::mutex m_lock; // 同步鎖 std::condition_variable m_cv; // 條件阻塞 std::atomic<bool> m_isStoped; // 是否關閉提交 std::atomic<int> m_idleThreadNum; //空閒執行緒數量 public: ThreadPool(int size = MAX_THREAD_NUM) { size = size > MAX_THREAD_NUM ? MAX_THREAD_NUM : size; m_idleThreadNum = size; for (int i = 0; i < size; i++) { //初始化執行緒數量 m_pool.emplace_back(&ThreadPool::scheduler, this); } } ~ThreadPool() { Close(); while (!m_tasks.empty()) { m_tasks.pop(); } m_cv.notify_all(); // 喚醒所有執行緒執行 for (std::thread& thread : m_pool) { if (thread.joinable()) { thread.join(); // 等待任務結束,前提是執行緒一定會執行完 } } m_pool.clear(); } // 開啟執行緒池,重啟任務提交 void ReOpen() { if (m_isStoped) m_isStoped.store(false); m_cv.notify_all(); } // 關閉執行緒池,停止提交新任務 void Close() { if (!m_isStoped) m_isStoped.store(true); } // 判斷執行緒池是否被關閉 bool IsClosed() const { return m_isStoped.load(); } // 獲取當前任務佇列中的任務數 int GetTaskSize() { return m_tasks.size(); } // 獲取當前空閒執行緒數 int IdleCount() { return m_idleThreadNum; } // 提交任務並執行 // 呼叫方式為 std::future<returnType> var = threadpool.Submit(...) // var.get() 會等待任務執行完,並獲取返回值 // 其中 ... 可以直接用函式名+函式引數代替,例如 threadpool.Submit(f, 0, 1) // 但如果要呼叫類成員函式,則最好用如下方式 // threadpool.Submit(std::bind(&Class::Func, &classInstance)) 或 // threadpool.Submit(std::mem_fn(&Class::Func), &classInstance) template<class F, class... Args> auto Submit(F&& f, Args&&... args)->std::future<decltype(f(args...))> { using RetType = decltype(f(args...)); // typename std::result_of<F(Args...)>::type, 函式 f 的返回值型別 std::shared_ptr<std::packaged_task<RetType()>> task = std::make_shared<std::packaged_task<RetType()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<RetType> future = task->get_future(); // 封裝任務並新增到佇列 addTask([task](){ (*task)(); }); return future; } private: // 消費者 Task getTask() { std::unique_lock<std::mutex> lock(m_lock); // unique_lock 相比 lock_guard 的好處是:可以隨時 unlock() 和 lock() while (m_tasks.empty() && !m_isStoped) { m_cv.wait(lock); } // wait 直到有 task if (m_isStoped) { return Task(); } assert(!m_tasks.empty()); Task task = std::move(m_tasks.front()); // 取一個 task m_tasks.pop(); m_cv.notify_one(); return task; } // 生產者 void addTask(Task task) { std::lock_guard<std::mutex> lock{ m_lock }; //對當前塊的語句加鎖, lock_guard 是 mutex 的 stack 封裝類,構造的時候 lock(),析構的時候 unlock() m_tasks.push(task); m_cv.notify_one(); // 喚醒一個執行緒執行 } // 工作執行緒主迴圈函式 void scheduler() { while (!m_isStoped.load()) { // 獲取一個待執行的 task Task task(getTask()); if (task) { m_idleThreadNum--; task(); m_idleThreadNum++; } } } }; } #endif
該執行緒池的使用如下,
ThreadPool.cpp:
#include "ThreadPool.hpp" #include <iostream> struct gfun { int operator()(int n) { printf("%d hello, gfun ! %d\n", n, std::this_thread::get_id()); return 42; } }; class Test { public: int GetThreadId(std::string a, double b) { std::this_thread::sleep_for(std::chrono::milliseconds(10000)); std::thread::id i = std::this_thread::get_id(); std::cout << "In Test, thread id: " << i << std::endl; std::cout << "a: " << a.c_str() << ", b = " << b << std::endl; return i.hash(); } }; int main() { ThreadPool::ThreadPool worker{ 4 }; Test t; std::cout << "at the beginning: " << std::endl; std::cout << "idle threads: " << worker.IdleCount() << std::endl; std::cout << "tasks: " << worker.GetTaskSize() << std::endl; std::future<int> f1 = worker.Submit(std::bind(&Test::GetThreadId, &t, "123", 456.789)); std::cout << "after submit 1 task: " << std::endl; std::cout << "idle threads: " << worker.IdleCount() << std::endl; std::cout << "tasks: " << worker.GetTaskSize() << std::endl; std::future<int> f2 = worker.Submit(std::mem_fn(&Test::GetThreadId), &t, "789", 123.456); std::cout << "after submit 2 task: " << std::endl; std::cout << "idle threads: " << worker.IdleCount() << std::endl; std::cout << "tasks: " << worker.GetTaskSize() << std::endl; std::future<int> f3 = worker.Submit(gfun{}, 0); std::cout << "f1 = " << f1.get() << ", f2 = " << f2.get() << ", f3 = " << f3.get() << std::endl; std::cout << "after all task: " << std::endl; std::cout << "idle threads: " << worker.IdleCount() << std::endl; std::cout << "tasks: " << worker.GetTaskSize() << std::endl; return 0; }
注意上面的例子中,執行緒池的執行緒數是一開始定義好的,此後也不會改變。但實際上,執行緒池中的執行緒數可以根據任務的多少動態調節。對於大規模計算的需求而言,cpu核數是固定的,要計算的任務數基本也是固定的,所以沒必要這樣做;如果是伺服器處理網路請求,則可以採用執行緒數動態調節的方式,增加可擴充套件性。
完整的vs2013工程包可在如下地址下載