1. 程式人生 > >c++11 實現半同步半非同步執行緒池

c++11 實現半同步半非同步執行緒池

感受

隨著深入學習,現代c++給我帶來越來越多的驚喜…
c++真的變強大了。

半同步半非同步執行緒池:

其實很好理解,分為三層
同步層:通過IO複用或者其他多執行緒多程序等不斷的將待處理事件新增到佇列中,這個過程是同步進行的。
佇列層:所有待處理事件都會放到這裡。上一層事件放到這裡,下一層從這裡獲取事件
非同步層:事先建立好執行緒,讓執行緒不斷的去處理佇列層的任務,上層不關心這些,它只負責把任務放到佇列裡,所以對上層來說這裡是非同步的。

補充下思路:
主要是後兩層

佇列層:c++11 通過std::function可以將函式封裝為物件,那麼我們一個函式也就是一個任務,通過vector或list等容器來儲存這些”任務”來供後面存取。因為會出現競爭資源的問題,所以我們要加鎖,並且通過條件變數的條件來喚醒其他阻塞在鎖上的執行緒,當然你想避免執行緒阻塞浪費資源可以用帶時間的鎖std::time_mutex。

非同步層:c++11 將執行緒也封裝為了物件,那麼我們建立一個容器儲存執行緒物件,讓他們去佇列層取任務並執行,執行完並不結束該執行緒而是歸還給容器(執行緒池)。

看張圖:
這裡寫圖片描述

如果你不熟悉c++11的內容
以下文章僅供參考
c++11 多執行緒

程式碼

同步佇列:

#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

template
<typename T> class SynQueue { public: SynQueue(int maxsize): m_maxSize(maxsize), m_needStop(false) { } //新增事件,左值拷貝和右值移動 void Put(const T&x) { //呼叫private內部介面Add Add(x); } void Put(T &&x) { Add(x); } //從佇列中取事件,取所有事件
void Take(std::list<T> &list) { //有wait方法必須用unique_lock //unique_lock有定時等待等功能,lock_guard就僅僅是RAII手法的互斥鎖 //但unique_lock的效能稍低於lock_guard std::unique_lock<std::mutex> locker(m_mutex); //滿足條件則喚醒,不滿足阻塞 m_notEmpty.wait(locker, [this] { return m_needStop || NotEmpty(); }); if(m_needStop) return; list = std::move(m_queue); //喚醒其他阻塞在互斥鎖的執行緒 m_notFull.notify_one(); } //取一個事件 void Take(T &t) { std::unique_lock<std::mutex> locker(m_mutex); m_notEmpty.wait(locker, [this] { return m_needStop || NotEmpty(); }); if(m_needStop) return; t = m_queue.front(); m_queue.pop_front(); m_notFull.notify_one(); t(); } //停止所有執行緒在同步佇列中的讀取 void Stop() { { std::lock_guard<std::mutex> locker(m_mutex); m_needStop = true; } m_notFull.notify_all(); m_notEmpty.notify_all(); } //佇列為空 bool Empty() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.empty(); } //佇列為滿 bool Full() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size() == m_maxSize; } //佇列大小 size_t Size() { std::lock_guard<std::mutex> locker(m_mutex); return m_queue.size(); } private: //往佇列裡新增事件,事件是範型的,c++11我們可以把函式通過std::function封裝為物件。 template<typename F> void Add(F &&x) { std::unique_lock<std::mutex> locker(m_mutex); m_notFull.wait(locker, [this] { return m_needStop || NotFull() ; }); if(m_needStop) return; m_queue.push_back(std::forward<F>(x)); m_notEmpty.notify_one(); } //佇列未滿 bool NotFull() const { bool full = m_queue.size() >= m_maxSize; if(full) std::cout << "緩衝區滿了...請等待" << std::endl; return !full; } //佇列不為空 bool NotEmpty() const { bool empty = m_queue.empty(); if(empty) { std::cout << "緩衝區空了...請等待" << std::endl; std::cout << "執行緒ID:" << std::this_thread::get_id() << std::endl; } return !empty; } private: std::mutex m_mutex; //互斥鎖 std::list<T> m_queue; //佇列,存放任務 std::condition_variable m_notEmpty; //佇列不為空的條件變數 std::condition_variable m_notFull; //佇列不為滿的條件變數 int m_maxSize; //任務佇列最大長度 bool m_needStop; //終止標識 };

執行緒池:

#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include "SynQueue.h"
#include <functional>
#include <thread>
#include <memory>
#include <atomic>

const int MaxTaskCount = 100;

class ThreadPool
{
    public:
        //規定任務型別為void(),我們可以通過c++11 不定引數模板來實現一個可接受任何函式的範型函式模板,這樣就是一個可以接受任何任務的任務隊列了。
        using Task = std::function<void()>;
        //hardware_concurrency檢測硬體效能,給出預設執行緒數
        ThreadPool(int numThreads = std::thread::hardware_concurrency()):
            m_queue(MaxTaskCount) 
        {
            //初始化執行緒,並通過shared_ptr來管理
            Start(numThreads);
        }

        //銷燬執行緒池
        ~ThreadPool(void)
        {
            Stop();
        }

        //終止所有執行緒,call_once保證函式只調用一次
        void Stop()
        {
            std::call_once(m_flag, [this] { StopThreadGroup(); });
        }

        //新增任務,普通版本和右值引用版本
        void AddTask(const Task& task)
        {
            m_queue.Put(task);
        }
        void AddTask(Task && task)
        {
            m_queue.Put(std::forward<Task>(task));
        }

    private:

        //停止執行緒池
        void StopThreadGroup()
        {
            m_queue.Stop();
            m_running = false;
            for(auto thread : m_threadgroup)
            {
                if(thread)
                    thread->join();
            }
            m_threadgroup.clear();
        }


        void Start(int numThreads)
        {
            m_running = true;
            for(int i = 0; i < numThreads; ++i)
            {
                //智慧指標管理,並給出構建執行緒的引數,執行緒呼叫函式和引數
                std::cout << "Init create thread pool" << std::endl;
                m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
            }
        }

        //一次取出佇列中全部事件
        void RunInThread_list()
        {
            while(m_running)
            {
                std::list<Task> list;
                std::cout << "take " << std::endl;
                m_queue.Take(list);
                for(auto &task : list)
                {
                    if(!m_running)
                        return;
                    task();
                }
            }

        }

        //一次只取一個事件
        void RunInThread()
        {
            std::cout << m_queue.Size() << std::endl;
            while(m_running)
            {
                Task task;
                if(!m_running)
                    return;
                m_queue.Take(task);
            }
        }

    private:
        //執行緒池
        std::list<std::shared_ptr<std::thread>> m_threadgroup;
        //任務佇列
        SynQueue<Task>m_queue;
        //原子布林值
        std::atomic_bool m_running;
        //輔助變數->call_once
        std::once_flag m_flag;
};

int main(int argc, char *argv[])
{
    ThreadPool pool(2);

    //建立執行緒向任務佇列新增任務
    std::thread thd1([&pool]{
            for(int i = 0; i < 10; i++)
            {
                auto thdId = std::this_thread::get_id();
                pool.AddTask([thdId](){
                    std::cout << thdId << " thread execute task" << std::endl;
                    });
            }
        });


    std::this_thread::sleep_for(std::chrono::seconds(2));
    pool.Stop();
    thd1.join();

    return EXIT_SUCCESS;
}

這裡寫圖片描述

參考書籍:
深入應用c++11