c++11執行緒池實現
阿新 • • 發佈:2019-01-14
咳咳。c++11 加入了執行緒庫,從此告別了標準庫不支援併發的歷史。然而 c++ 對於多執行緒的支援還是比較低階,稍微高階一點的用法都需要自己去實現,譬如執行緒池、訊號量等。執行緒池(thread pool)這個東西,在面試上多次被問到,一般的回答都是:“管理一個任務佇列,一個執行緒佇列,然後每次取一個任務分配給一個執行緒去做,迴圈往復。” 貌似沒有問題吧。但是寫起程式來的時候就出問題了。
廢話不多說,先上實現,然後再囉嗦。(dont talk, show me ur code !)
- #ifndef ILOVERS_THREAD_POOL_H
- #define ILOVERS_THREAD_POOL_H
- #include<iostream>
- #include<functional>
- #include<thread>
- #include<condition_variable>
- #include<future>
- #include<atomic>
- #include<vector>
- #include<queue>
- // 名稱空間
- namespace ilovers {
- classTaskExecutor;
- }
- class ilovers::TaskExecutor{
- usingTask= std::function<void()>;
- private:
- // 執行緒池
- std::vector<std::thread> pool;
- // 任務佇列
- std::queue<Task> tasks;
- // 同步
- std::mutex m_task;
- std::condition_variable cv_task;
- // 是否關閉提交
- std::atomic<bool> stop;
- public:
- // 構造
- TaskExecutor(size_t size =4): stop {false}{
- size = size <1?1: size;
- for(size_t i =0; i<
- pool.emplace_back(&TaskExecutor::schedual,this);// push_back(std::thread{...})
- }
- }
- // 析構
- ~TaskExecutor(){
- for(std::thread& thread : pool){
- thread.detach();// 讓執行緒“自生自滅”
- //thread.join(); // 等待任務結束, 前提:執行緒一定會執行完
- }
- }
- // 停止任務提交
- void shutdown(){
- this->stop.store(true);
- }
- // 重啟任務提交
- void restart(){
- this->stop.store(false);
- }
- // 提交一個任務
- template<class F,class...Args>
- auto commit(F&& f,Args&&... args)->std::future<decltype(f(args...))>{
- if(stop.load()){// stop == true ??
- throw std::runtime_error("task executor have closed commit.");
- }
- usingResType=decltype(f(args...));// typename std::result_of<F(Args...)>::type, 函式 f 的返回值型別
- auto task = std::make_shared<std::packaged_task<ResType()>>(
- std::bind(std::forward<F>(f), std::forward<Args>(args)...)
- );// wtf !
- {// 新增任務到佇列
- std::lock_guard<std::mutex> lock {m_task};
- tasks.emplace([task](){// push(Task{...})
- (*task)();
- });
- }
- cv_task.notify_all();// 喚醒執行緒執行
- std::future<ResType> future = task->get_future();
- return future;
- }
- private:
- // 獲取一個待執行的 task
- Task get_one_task(){
- std::unique_lock<std::mutex> lock {m_task};
- cv_task.wait(lock,[this](){return!tasks.empty();});// wait 直到有 task
- Task task {std::move(tasks.front())};// 取一個 task
- tasks.pop();
- return task;
- }
- // 任務排程
- void schedual(){
- while(true){
- if(Task task = get_one_task()){
- task();//
- }else{
- // return; // done
- }
- }
- }
- };
- #endif
- void f()
- {
- std::cout <<"hello, f !"<< std::endl;
- }
- struct G{
- intoperator()(){
- std::cout <<"hello, g !"<< std::endl;
- return42;
- }
- };
- int main()
- try{
- ilovers::TaskExecutor executor {10};
- std::future<void> ff = executor.commit(f);
- std::future<int> fg = executor.commit(G{});
- std::future<std::string> fh = executor.commit([]()->std::string { std::cout <<"hello, h !"<< std::endl;return"hello,fh !";});
- executor.shutdown();
- ff.get();
- std::cout << fg.get()<<" "<< fh.get()<< std::endl;
- std::this_thread::sleep_for(std::chrono::seconds(5));
- executor.restart();// 重啟任務
- executor.commit(f).get();//
- std::cout <<"end..."<< std::endl;
- return0