1. 程式人生 > >c++11實現一個半同步半非同步執行緒池

c++11實現一個半同步半非同步執行緒池

在處理大量併發任務的時候,如果按照傳統的方式,一個請求一個執行緒來處理請求任務,大量的執行緒建立和銷燬將消耗過多的系統資源,還增加了執行緒上下文切換的開銷,而通過執行緒池技術就可以很好的解決這些問題,執行緒池技術通過在系統中預先建立一定數量的執行緒,當任務請求到來時從執行緒池中分配一個預先建立的執行緒去處理任務,執行緒在完成任務之後還可以重用,不會銷燬,而是等待下次任務的到來.

分層

  • 半同步半非同步執行緒池分為三層:

    1. 同步服務層: 它處理來自上層的任務請求,上層的請求可能是併發的,這些請求不是馬上就會被處理的,而是將這些任務放到一個同步排隊層中,等待處理.

    2. 同步排隊層: 來自上層的任務請求都會加到排隊層中等待處理.

    3. 非同步服務層: 這一層中會有多個執行緒同時處理排隊層中的任務,非同步服務層從同步排隊層中取出任務並行的處理.

這裡寫圖片描述

執行緒池實現

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
#include <memory>
#include <atomic>
#include <functional>

using namespace std;

/********************************同步佇列******************************/
template <typename T> class SyncQueue { public: SyncQueue(int maxSize): m_maxSize(maxSize), m_needStop(false) { } //新增事件 void Put(const T& x) { Add(x); } //新增事件 void Put(T && x) { //呼叫內部介面,進行完美轉發 Add(std::forward<T>(x)); } //從佇列中取事件,取所有事件
void Take(std::list<T> &list) { std::unique_lock<std::mutex> locker(m_mutex); //當不滿足任何一個則等待,但是若m_needStop為true是因為任務要終止了所以不阻塞 m_notEmpty.wait(locker, [this]{return (m_needStop || NotEmpty()); }); if (m_needStop) { return; } list = std::move(m_queue); m_notFull.notify_one(); } //取一個事件 void Take(T &t) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this]{return m_needStop || NotEmpty(); }); if (m_needStop) { return; } t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); } //終止同步佇列 void Stop() { { //鎖作用域就在這對大括號內 std::lock_guard<std::mutex> locker(m_mutex); //將終止標誌設為true m_needStop = true; } //喚醒所有程序一一終止 m_notFull.notify_all(); m_notEmpty.notify_all(); } //佇列為空 bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } //佇列為滿 bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } //佇列大小 size_t Size() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); } //佇列大小 int Count() { return m_queue.size(); } private: //佇列不為滿 bool NotFull() const { bool full = (m_queue.size() >= m_maxSize); if (full) { cout << "the queue is full, need wait..." << endl; } return !full; } //佇列不為空 bool NotEmpty() const { bool empty = m_queue.empty(); if (empty) { cout << "the queue is empty, need wait..., 非同步層的執行緒ID: " << this_thread::get_id() << endl; } return !empty; } //向佇列中新增事件,若不為滿且終止標誌為false則新增事件 template <typename F> void Add(F && x) { std::unique_lock<std::mutex> locker(m_mutex); //當不滿足任何一個則等待,但是若m_needStop為true是因為任務要終止了所以不阻塞 m_notFull.wait(locker, [this]{return m_needStop || NotFull(); }); if (m_needStop) { return; } m_queue.push_back(std::forward<F>(x)); m_notEmpty.notify_one(); } private: //緩衝區 std::list<T> m_queue; //互斥量 std::mutex m_mutex; //佇列不為空的條件變數 std::condition_variable m_notEmpty; //佇列不為滿的條件變數 std::condition_variable m_notFull; //任務佇列最大長度 int m_maxSize; //終止的標識,當為true時代表同步佇列要終止 bool m_needStop; }; /**************************執行緒池********************************/ //傳遞給同步佇列的最大個數 const int MaxTaskCount = 100; class ThreadPool { public: using Task = std::function<void()>; //建構函式,預設引數hardware_concurrency()獲取CPU核心數量 ThreadPool(int numThreads = std::thread::hardware_concurrency()):m_queue(MaxTaskCount) { cout << "numThreads: " << numThreads << endl; Start(numThreads); } ~ThreadPool() { Stop(); } //保證多執行緒環境下只調用一次StopThreadGroup函式 void Stop() { std::call_once(m_flag, [this]{ StopThreadGroup(); }); } //新增任務,右值完美轉發 void AddTask(Task && task) { m_queue.Put(std::forward<Task> (task)); } //新增任務 void AddTask(const Task && task) { m_queue.Put(task); } private: //建立numThreads個數的執行緒組 void Start(int numThreads) { m_running = true; for (int i = 0; i < numThreads; i++) { //多個執行緒依次的處理 m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this)); } } //取出任務佇列中的全部,依次執行 void RunInThread() { while (m_running) { std::list<Task> list; m_queue.Take(list); for (auto & task : list) { if (!m_running) { return ; } //執行任務 task(); } } } //終止所有任務的執行 void StopThreadGroup() { //終止同步佇列 m_queue.Stop(); m_running = false; for (auto thread : m_threadgroup) { if (thread) { thread->join(); } } m_threadgroup.clear(); } private: //處理任務的執行緒組 std::list<std::shared_ptr<std::thread>> m_threadgroup; //同步佇列 SyncQueue<Task> m_queue; //執行的標誌,flase代表終止 atomic_bool m_running; //保證在函式在多執行緒環境中只被呼叫一次 std::once_flag m_flag; }; int main() { ThreadPool pool; //pool.Start(2); std::thread thd1([&pool] { for (int i = 0; i < 10; i++) { auto thdId = this_thread::get_id(); pool.AddTask([thdId] { cout << "1.thread id: " << thdId << endl; }); } }); std::thread thd2([&pool] { for (int i = 0; i < 10; i++) { auto thdId = this_thread::get_id(); pool.AddTask([thdId] { cout << "2.thread id: " << thdId << endl; }); } }); this_thread::sleep_for(std::chrono::seconds(2)); getchar(); pool.Stop(); thd1.join(); thd2.join(); }

這裡寫圖片描述

物件池

  • 物件池對於建立開銷較大的物件來說很有意義,為了避免重複建立開銷較大的物件,可以通過物件池來優化.

  • 物件池的思路比較簡單,實現建立好一批物件,放到一個集合中,每當程式需要新的物件時,就從物件池中獲取,程式用完該物件後都會把該物件歸還給物件池.這樣會避免重複建立物件,提高程式效能.

#include <string>
#include <functional>
#include <memory>
#include <map>

using namespace std;

//要成為不可複製的類,典型的方法是將類的複製建構函式和賦值運算子設定為private或protected
//為了使ObjectPool為不可複製的類,我們定義了類NonCopyable,只需繼承起則可為不可複製的類
class NonCopyable
{
protected:
    NonCopyable() = default;
    ~NonCopyable() = default;
    NonCopyable(const NonCopyable&) = delete;
    NonCopyable& operator =(const NonCopyable &) = delete;
};

//物件最大個數
const int MaxObjectNum = 10;

template <typename T>
class ObjectPool : NonCopyable
{
    template <typename... Args>
    using Constructor = function<shared_ptr<T> (Args...)>;
private:
    //定義multimap型別的私有成員通過Constructor<Args...>型別獲得字串,則通過字串型別一對多的對應特定的物件.
    multimap<string, shared_ptr<T>> m_object_map;

public:
    //初始化建立物件
    template <typename... Args>
    void Init(size_t num, Args ...args)
    {
        if (num <= 0 || num > MaxObjectNum)
        {
            throw std::logic_error("Object num out of range");
        }

        //Init時的模板型別不同所得到的constructName字串不同
        //所以相同的初始化型別對應m_object_map中的first相同,不同型別的則不同
        auto constructName = typeid(Constructor<Args...>).name();
        //cout << "Init: " << constructName << endl;
        for (size_t i = 0; i < num; i++)
        {
            //刪除器中不直接刪除物件,而是回收到物件池中,以供下次使用
            m_object_map.emplace(constructName, 
                shared_ptr<T>(new T(std::forward<Args>(args)...), [this, constructName](T *p)
            {
                cout << "dis: " << constructName << endl;
                m_object_map.emplace(std::move(constructName),shared_ptr<T>(p));
            }));
        }
    }

    //從物件池獲取一個物件
    template <typename... Args>
    std::shared_ptr<T> Get()
    {
        string constructName = typeid(Constructor<Args...>).name();
        cout << constructName << endl;

        //通過Get的模板型別得到對應的字串,通過該字串找到所有該字串的對應
        auto range = m_object_map.equal_range(constructName);
        //從該型別對應的物件中獲取其中一個
        for (auto it = range.first; it != range.second; it++)
        {
            auto ptr = it -> second;
            m_object_map.erase(it);
            return ptr;
        } 

        return nullptr;
    }
};

#