1. 程式人生 > >c++11執行緒池實現

c++11執行緒池實現

咳咳。c++11 加入了執行緒庫,從此告別了標準庫不支援併發的歷史。然而 c++ 對於多執行緒的支援還是比較低階,稍微高階一點的用法都需要自己去實現,譬如執行緒池、訊號量等。執行緒池(thread pool)這個東西,在面試上多次被問到,一般的回答都是:“管理一個任務佇列,一個執行緒佇列,然後每次取一個任務分配給一個執行緒去做,迴圈往復。” 貌似沒有問題吧。但是寫起程式來的時候就出問題了。

廢話不多說,先上實現,然後再囉嗦。(dont talk, show me ur code !)

  1. #ifndef ILOVERS_THREAD_POOL_H
  2. #define ILOVERS_THREAD_POOL_H
  3. #include<iostream>
  4. #include<functional>
  5. #include<thread>
  6. #include<condition_variable>
  7. #include<future>
  8. #include<atomic>
  9. #include<vector>
  10. #include<queue>
  11. // 名稱空間
  12. namespace ilovers {
  13. classTaskExecutor;
  14. }
  15. class ilovers::TaskExecutor{
  16. usingTask= std::function<void()>;
  17. private:
  18. // 執行緒池
  19. std::vector<std::thread> pool;
  20. // 任務佇列
  21. std::queue<Task> tasks;
  22. // 同步
  23. std::mutex m_task;
  24. std::condition_variable cv_task;
  25. // 是否關閉提交
  26. std::atomic<bool> stop;
  27. public:
  28. // 構造
  29. TaskExecutor(size_t size =4): stop {false}{
  30. size = size <1?1: size;
  31. for(size_t i =0; i<
    size;++i){
  32. pool.emplace_back(&TaskExecutor::schedual,this);// push_back(std::thread{...})
  33. }
  34. }
  35. // 析構
  36. ~TaskExecutor(){
  37. for(std::thread& thread : pool){
  38. thread.detach();// 讓執行緒“自生自滅”
  39. //thread.join(); // 等待任務結束, 前提:執行緒一定會執行完
  40. }
  41. }
  42. // 停止任務提交
  43. void shutdown(){
  44. this->stop.store(true);
  45. }
  46. // 重啟任務提交
  47. void restart(){
  48. this->stop.store(false);
  49. }
  50. // 提交一個任務
  51. template<class F,class...Args>
  52. auto commit(F&& f,Args&&... args)->std::future<decltype(f(args...))>{
  53. if(stop.load()){// stop == true ??
  54. throw std::runtime_error("task executor have closed commit.");
  55. }
  56. usingResType=decltype(f(args...));// typename std::result_of<F(Args...)>::type, 函式 f 的返回值型別
  57. auto task = std::make_shared<std::packaged_task<ResType()>>(
  58. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  59. );// wtf !
  60. {// 新增任務到佇列
  61. std::lock_guard<std::mutex> lock {m_task};
  62. tasks.emplace([task](){// push(Task{...})
  63. (*task)();
  64. });
  65. }
  66. cv_task.notify_all();// 喚醒執行緒執行
  67. std::future<ResType> future = task->get_future();
  68. return future;
  69. }
  70. private:
  71. // 獲取一個待執行的 task
  72. Task get_one_task(){
  73. std::unique_lock<std::mutex> lock {m_task};
  74. cv_task.wait(lock,[this](){return!tasks.empty();});// wait 直到有 task
  75. Task task {std::move(tasks.front())};// 取一個 task
  76. tasks.pop();
  77. return task;
  78. }
  79. // 任務排程
  80. void schedual(){
  81. while(true){
  82. if(Task task = get_one_task()){
  83. task();//
  84. }else{
  85. // return; // done
  86. }
  87. }
  88. }
  89. };
  90. #endif
  91. void f()
  92. {
  93. std::cout <<"hello, f !"<< std::endl;
  94. }
  95. struct G{
  96. intoperator()(){
  97. std::cout <<"hello, g !"<< std::endl;
  98. return42;
  99. }
  100. };
  101. int main()
  102. try{
  103. ilovers::TaskExecutor executor {10};
  104. std::future<void> ff = executor.commit(f);
  105. std::future<int> fg = executor.commit(G{});
  106. std::future<std::string> fh = executor.commit([]()->std::string { std::cout <<"hello, h !"<< std::endl;return"hello,fh !";});
  107. executor.shutdown();
  108. ff.get();
  109. std::cout << fg.get()<<" "<< fh.get()<< std::endl;
  110. std::this_thread::sleep_for(std::chrono::seconds(5));
  111. executor.restart();// 重啟任務
  112. executor.commit(f).get();//
  113. std::cout <<"end..."<< std::endl;
  114. return0