1. 程式人生 > >C++11併發學習之六:執行緒池的實現

C++11併發學習之六:執行緒池的實現

為什麼要使用執行緒池?
       目前的大多數網路伺服器,包括Web伺服器、Email伺服器以及資料庫伺服器等都具有一個共同點,就是單位時間內必須處理數目巨大的連線請求,但處理時間卻相對較短。
       傳統多執行緒方案中我們採用的伺服器模型則是一旦接受到請求之後,即建立一個新的執行緒,由該執行緒執行任務。任務執行完畢後,執行緒退出,這就是是“即時建立,即時銷燬”的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數極其頻繁,那麼伺服器將處於不停的建立執行緒,銷燬執行緒的狀態。
我們將傳統方案中的執行緒執行過程分為三個過程:T1、T2、T3。
T1:執行緒建立時間
T2:執行緒執行時間,包括執行緒的同步等時間
T3:執行緒銷燬時間
       那麼我們可以看出,執行緒本身的開銷所佔的比例為(T1+T3) / (T1+T2+T3)。如果執行緒執行的時間很短的話,這比開銷可能佔到20%-50%左右。如果任務執行時間很長的話,這筆開銷將是不可忽略的。
       除此之外,執行緒池能夠減少建立的執行緒個數。通常執行緒池所允許的併發執行緒是有上界的,如果同時需要併發的執行緒數超過上界,那麼一部分執行緒將會等待。而傳統方案中,如果同時請求數目為2000,那麼最壞情況下,系統可能需要產生2000個執行緒。儘管這不是一個很大的數目,但是也有部分機器可能達不到這種要求。
       因此執行緒池的出現正是著眼於減少執行緒本身帶來的開銷。執行緒池採用預建立的技術,在應用程式啟動之後,將立即建立一定數量的執行緒(N1),放入空閒佇列中。這些執行緒都是處於阻塞(Suspended)狀態,不消耗CPU,但佔用較小的記憶體空間。當任務到來後,緩衝池選擇一個空閒執行緒,把任務傳入此執行緒中執行。當N1個執行緒都在處理任務後,緩衝池自動建立一定數量的新執行緒,用於處理更多的任務。在任務執行完畢後執行緒也不退出,而是繼續保持在池中等待下一次的任務。當系統比較空閒時,大部分執行緒都一直處於暫停狀態,執行緒池自動銷燬一部分執行緒,回收系統資源。
      基於這種預建立技術,執行緒池將執行緒建立和銷燬本身所帶來的開銷分攤到了各個具體的任務上,執行次數越多,每個任務所分擔到的執行緒本身開銷則越小,不過我們另外可能需要考慮進去執行緒之間同步所帶來的開銷
執行緒池適合場景


       事實上,執行緒池並不是萬能的。它有其特定的使用場合。執行緒池致力於減少執行緒本身的開銷對應用所產生的影響,這是有前提的,前提就是執行緒本身開銷與執行緒執行任務相比不可忽略。如果執行緒本身的開銷相對於執行緒任務執行開銷而言是可以忽略不計的,那麼此時執行緒池所帶來的好處是不明顯的,比如對於FTP伺服器以及Telnet伺服器,通常傳送檔案的時間較長,開銷較大,那麼此時,我們採用執行緒池未必是理想的方法,我們可以選擇“即時建立,即時銷燬”的策略。
總之執行緒池通常適合下面的幾個場合:
(1)單位時間內處理任務頻繁而且任務處理時間短
(2)對實時性要求較高。如果接受到任務後在建立執行緒,可能滿足不了實時要求,因此必須採用執行緒池進行預建立。

程式碼非常的簡潔,只有一個頭檔案ThreadPool.h,這裡貼出來作為備份。

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};
 
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

// the destructor joins all threads
inline ThreadPool::~ThreadPool()
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        stop = true;
    }
    condition.notify_all();
    for(std::thread &worker: workers)
        worker.join();
}

#endif

基本使用方法

#include <iostream>
#include "ThreadPool.h"

int main()
{
    // create thread pool with 4 worker threads
    ThreadPool pool(4);

    // enqueue and store future
    auto result = pool.enqueue([](int answer) { return answer; }, 42);

    // get result from future, print 42
    std::cout << result.get() << std::endl; 
}

另一個例子

#include <iostream>
#include "ThreadPool.h"

void func()
{
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout<<"worker thread ID:"<<std::this_thread::get_id()<<std::endl;
}

int main()
{
    ThreadPool pool(4);
    while(1)
    {
       pool.enqueue(fun);
    }
}


可以看出,四個執行緒都在執行。但是如果把func()中的延時放在main()的while迴圈中,就只有一個執行緒在運行了。

執行緒池,最簡單的就是生產者消費者模型了。池裡的每條執行緒,都是消費者,他們消費並處理一個個的任務,而任務佇列就相當於生產者了。

執行緒池最簡單的形式是含有一個固定數量的工作執行緒來處理任務,典型的數量是std::thread::hardware_concurrency()