1. 程式人生 > >c++11實現執行緒池

c++11實現執行緒池

測試程式
//
// main.cpp
//
#include <iostream> // std::cout, std::endl

#include <vector>   // std::vector
#include <string>   // std::string
#include <future>   // std::future
#include <thread>   // std::this_thread::sleep_for
#include <chrono>   // std::chrono::seconds

#include "ThreadPool.h"
int main() { // 建立一個能夠併發執行四個執行緒的執行緒池 ThreadPool pool(4); // 建立併發執行執行緒的結果列表 std::vector< std::future<std::string> > results; // 啟動八個需要執行的執行緒任務 for(int i = 0; i < 8; ++i) { // 將併發執行任務的返回值新增到結果列表中 results.emplace_back( // 將下面的列印任務新增到執行緒池中併發執行 pool.enqueue([i] { std
::cout << "hello " << i << std::endl; // 上一行輸出後, 該執行緒會等待1秒鐘 std::this_thread::sleep_for(std::chrono::seconds(1)); // 然後再繼續輸出並返回執行情況 std::cout << "world " << i << std::endl; return std::string
("---thread ") + std::to_string(i) + std::string(" finished.---"); }) ); } // 輸出執行緒任務的結果 for(auto && result: results) std::cout << result.get() << ' '; std::cout << std::endl; return 0; }

執行緒池類

// 
// ThreadPool.hpp
// ThreadPool
// 
// Original Author: Jakob Progsch, Václav Zeman
// Modified By:     https://www.shiyanlou.com
// Original Link:   https://github.com/progschj/ThreadPool
//

#ifndef ThreadPool_hpp
#define ThreadPool_hpp
#include <vector>               // std::vector
#include <queue>                // std::queue
#include <memory>               // std::make_shared
#include <stdexcept>            // std::runtime_error
#include <thread>               // std::thread
#include <mutex>                // std::mutex,        std::unique_lock
#include <condition_variable>   // std::condition_variable
#include <future>               // std::future,       std::packaged_task
#include <functional>           // std::function,     std::bind
#include <utility>              // std::move,         std::forward

class ThreadPool {
public:
    inline ThreadPool(size_t threads) : stop(false) {
         // 啟動 threads 數量的工作執行緒(worker)
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            // 此處的 lambda 表示式捕獲 this, 即執行緒池例項
            [this]
            {
                // 迴圈避免虛假喚醒
                for(;;)
                {
                    // 定義函式物件的容器, 儲存任意的返回型別為 void 引數表為空的函式
                    std::function<void()> task;

                    // 臨界區
                    {
                        // 建立互斥鎖
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        // 阻塞當前執行緒, 直到 condition_variable 被喚醒
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        // 如果當前執行緒池已經結束且等待任務佇列為空, 則應該直接返回
                        if(this->stop && this->tasks.empty())
                            return;
                        // 否則就讓任務佇列的隊首任務作為需要執行的任務出隊
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }
                    // 執行當前任務
                    task();
                }
            }
        );
    }
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type> {
          // 推導任務返回型別
    using return_type = typename std::result_of<F(Args...)>::type;

    // 獲得當前任務
    auto task = std::make_shared< std::packaged_task<return_type()> >(
        std::bind(std::forward<F>(f), std::forward<Args>(args)...)
    );
    // 獲得 std::future 物件以供實施執行緒同步
    std::future<return_type> res = task->get_future();

    // 臨界區
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 禁止線上程池停止後加入新的執行緒
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");
        // 將執行緒新增到執行任務佇列中
        tasks.emplace([task]{ (*task)(); });
    }
    // 通知一個正在等待的執行緒
    condition.notify_one();
    return res;
    }
    inline ~ThreadPool() {
// 臨界區
    {
        // 建立互斥鎖
        std::unique_lock<std::mutex> lock(queue_mutex);
        // 設定執行緒池狀態
        stop = true;
    }
    // 通知所有等待執行緒
    condition.notify_all();
    // 使所有非同步執行緒轉為同步執行, 此處迴圈為 c++11 新提供的迴圈語法 for(value:values)
    for(std::thread &worker: workers)
        worker.join();
    }
private:
    // 需要持續追蹤執行緒來保證可以使用 join
    std::vector< std::thread > workers;
    // 任務佇列
    std::queue< std::function<void()> > tasks;
    // 同步相關
    std::mutex queue_mutex;             // 互斥鎖
    std::condition_variable condition;  // 互斥條件變數
    // 停止相關
    bool stop;
};
#endif /* ThreadPool_hpp */