1. 程式人生 > >[C++]固定大小執行緒池

[C++]固定大小執行緒池

執行緒池模型

執行緒池是併發程式設計中常用的模型。
執行緒是一種非常寶貴的資源,建立、銷燬執行緒都是非常消耗時間的操作,所以我們的一個思路是在程式start up的時候,建立一個儲存有多個執行緒的快取,這樣程式執行時就不會頻繁的發生建立和銷燬執行緒的操作,從而提高了併發的效率。
儲存有多個執行緒的這個快取,我們一般稱其為執行緒池(ThreadPool)。如果執行緒池的中執行緒的數量可以動態變化,我們稱其為動態大小的執行緒池,這裡討論並實現的是固定大小的執行緒池

執行緒池中維護多個執行緒(thread)的同時,維護一個任務佇列。所謂的任務佇列,就是需要我們去併發執行的一個個的任務,說的通俗一點,就是等待執行的一個個函式。一旦任務佇列不空,取出一個任務給定某個空閒的執行緒去執行該任務。
這是一個典型的生產者和消費者的模型,所以我們要用到mutex和conditon variable原語。除此之外,我們還應該對“任務”有一個合理的抽象。
所以執行緒池這個類的資料成員應該有如下幾個:

1.mutex_t            mutex
2.condition_t        cond
3.vector_t<thread_t> threadVec
4.queue_t<task_t>    taskQ

其中thread_t在C++11中可以使用std::thread來替代,可是這裡的task_t我們還沒有定義。
實際上,task_t是對某個執行過程的抽象,所以我們可以用C++11中的function語義來替代:

using task_t = function<void()>

資料成員定義完畢,那麼一個執行緒池類應該支援什麼樣的操作呢。

1.Start()
2.Run(Task)
3.Stop()

Start

Start操作是抽象了初始化的操作,在構造出一個執行緒池物件之後,我們需要對taskQ和threadVec做出初始化操作。taskQ的初始化實際上就是置空,threadVec的初始化有些複雜,我們需要建立一些std::thread的物件。
如果你瞭解thread的建立你可能會有一個困惑,建立thread時需要給定一個函式指標,意味著該thread將併發執行該函式指標指向的函式,那麼這裡的這個執行緒需要執行的函式是什麼呢?
我們再來明確一下執行緒池的作用,一旦任務佇列不空,我們就取出一個任務扔給某個空閒的執行緒,讓該執行緒去執行任務。
我們稱這些執行緒為worker,對應生產者消費者模型中的消費者,它消費生產佇列(taskQ)中的物品(某個具體的task),所以建立thread給定的函式指標指向的就是:消費者從生產佇列取出task並執行的這個過程。

Start()
{
    for(i < nThreadCount)
        create thread(fetch_and_consume)
        threadVec.push(thread)
        thread.run()
 }

fetch_and_consume()
{
    lock(mutex)
    take one task
    execute the task
 }

Run(task)

這個函式抽象的是生產者向生產佇列中加入task的過程,暴露給使用執行緒池的使用者使用。

Run(task)
{
    lock(mutex)
    taskQ.Add(task)
    cond.notify()
 }  

Stop()

Stop會停止執行緒池的運作,一般線上程池的解構函式中主動呼叫,為了防止執行緒池析構時各worker執行緒還沒完成他們的task,所以我們一般會在Stop中join各個worker執行緒。

Stop()
{
    for all worker in threadVec
        worker.join()
 }

實現

#ifndef  SIXDAY_THREAD_POOL_H
#define  SIXDAY_THREAD_POOL_H

#include <thread>
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <cstdint>

namespace sixday
{
    class FixSizeThreadPool
    {
    public:

        using Task = std::function<void()>;

    public:

        explicit FixSizeThreadPool(int32_t nFixedSize);
        ~FixSizeThreadPool();

        FixSizeThreadPool() = delete;
        FixSizeThreadPool(const FixSizeThreadPool&) = delete;
        FixSizeThreadPool(const FixSizeThreadPool&&) = delete;
        FixSizeThreadPool& operator=(const FixSizeThreadPool&) = delete;
        FixSizeThreadPool& operator=(const FixSizeThreadPool&&) = delete;

        void Start();
        void Run(const Task& task);
        void Stop();

    private:

        std::vector<std::thread> m_Worker;

        std::queue<Task> m_TaskQ;

        std::mutex m_Mutex;

        std::condition_variable m_Cond;

        bool m_bIsRunning;

        int32_t m_nWorkerCount;

        void RunInThread();

        Task Take();
    };
}

#endif // ! SIXDAY_THREAD_POOL_H
#include "ThreadPool.h"
#include <cassert>

namespace sixday
{
    FixSizeThreadPool::FixSizeThreadPool(int32_t nFixedSize)
    {
        assert(nFixedSize > 0);
        m_Worker.reserve(nFixedSize);
        m_nWorkerCount = nFixedSize;
        m_bIsRunning = false;
    }

    FixSizeThreadPool::~FixSizeThreadPool()
    {
        if (m_bIsRunning)
        {
            Stop();
        }
    }

    void FixSizeThreadPool::Start()
    {
        assert(m_nWorkerCount > 0);
        m_bIsRunning = true;
        for (int32_t i = 0; i < m_nWorkerCount; ++i)
        {
            auto func = std::bind(&FixSizeThreadPool::RunInThread, this);
            m_Worker.push_back(std::thread(func));
        }
    }

    void FixSizeThreadPool::Run(const Task & task)
    {   
        std::unique_lock<std::mutex> lock(m_Mutex);
        m_TaskQ.push(task);
        m_Cond.notify_one();
    }

    void FixSizeThreadPool::Stop()
    {
        {
            std::unique_lock<std::mutex> lock(m_Mutex);
            m_bIsRunning = false;
            m_Cond.notify_all();
        }

        for (auto& thread : m_Worker)
        {
            thread.join();
        }
    }

    void FixSizeThreadPool::RunInThread()
    {
        while (m_bIsRunning)
        {
            Task task = Take();
            if (task != nullptr)
            {
                task();
            }
        }
    }

    FixSizeThreadPool::Task FixSizeThreadPool::Take()
    {
        std::unique_lock<std::mutex> lock(m_Mutex);
        while (m_TaskQ.empty() && !m_bIsRunning)
        {
            m_Cond.wait(lock);
        }

        Task task = nullptr;
        if (!m_TaskQ.empty())
        {
            task = m_TaskQ.front();
            m_TaskQ.pop();
        }
        return task;
    }
}

//Main.cpp

#include "CountDownLatch.h"
#include "ThreadPool.h"
#include <cstdio>
using namespace sixday;

static const int32_t LoopMax = 1000000;
void PrintHello()
{
    for(int32_t i = 0 ; i < LoopMax; ++i)
        printf("hello\n");
}

void PrintWorld()
{
    for (int32_t i = 0; i < LoopMax; ++i)
        printf("world\n");
}

void PrintSay()
{
    for (int32_t i = 0; i < LoopMax; ++i)
        printf("say\n");
}

void PrintName()
{
    for (int32_t i = 0; i < LoopMax; ++i)
        printf("fancy\n");
}

int main()
{
    CountDownLatch latch(1);

    FixSizeThreadPool threadpool(5);
    threadpool.Start();
    threadpool.Run(PrintHello);
    threadpool.Run(PrintWorld);
    threadpool.Run(PrintName);
    threadpool.Run(PrintSay);

    latch.Wait();
    return 0;
}