c++11 實現半同步半非同步執行緒池
阿新 • • 發佈:2018-12-31
感受:
隨著深入學習,現代c++給我帶來越來越多的驚喜…
c++真的變強大了。
半同步半非同步執行緒池:
其實很好理解,分為三層
同步層:通過IO複用或者其他多執行緒多程序等不斷的將待處理事件新增到佇列中,這個過程是同步進行的。
佇列層:所有待處理事件都會放到這裡。上一層事件放到這裡,下一層從這裡獲取事件
非同步層:事先建立好執行緒,讓執行緒不斷的去處理佇列層的任務,上層不關心這些,它只負責把任務放到佇列裡,所以對上層來說這裡是非同步的。
補充下思路:
主要是後兩層
佇列層:c++11 通過std::function可以將函式封裝為物件,那麼我們一個函式也就是一個任務,通過vector或list等容器來儲存這些”任務”來供後面存取。因為會出現競爭資源的問題,所以我們要加鎖,並且通過條件變數的條件來喚醒其他阻塞在鎖上的執行緒,當然你想避免執行緒阻塞浪費資源可以用帶時間的鎖std::time_mutex。
非同步層:c++11 將執行緒也封裝為了物件,那麼我們建立一個容器儲存執行緒物件,讓他們去佇列層取任務並執行,執行完並不結束該執行緒而是歸還給容器(執行緒池)。
看張圖:
如果你不熟悉c++11的內容
以下文章僅供參考
c++11 多執行緒
程式碼:
同步佇列:
#include <list>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
template <typename T>
class SynQueue
{
public:
SynQueue(int maxsize):
m_maxSize(maxsize), m_needStop(false) { }
//新增事件,左值拷貝和右值移動
void Put(const T&x)
{
//呼叫private內部介面Add
Add(x);
}
void Put(T &&x)
{
Add(x);
}
//從佇列中取事件,取所有事件
void Take(std::list<T> &list)
{
//有wait方法必須用unique_lock
//unique_lock有定時等待等功能,lock_guard就僅僅是RAII手法的互斥鎖
//但unique_lock的效能稍低於lock_guard
std::unique_lock<std::mutex> locker(m_mutex);
//滿足條件則喚醒,不滿足阻塞
m_notEmpty.wait(locker, [this]
{ return m_needStop || NotEmpty(); });
if(m_needStop)
return;
list = std::move(m_queue);
//喚醒其他阻塞在互斥鎖的執行緒
m_notFull.notify_one();
}
//取一個事件
void Take(T &t)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notEmpty.wait(locker, [this]
{ return m_needStop || NotEmpty(); });
if(m_needStop)
return;
t = m_queue.front();
m_queue.pop_front();
m_notFull.notify_one();
t();
}
//停止所有執行緒在同步佇列中的讀取
void Stop()
{
{
std::lock_guard<std::mutex> locker(m_mutex);
m_needStop = true;
}
m_notFull.notify_all();
m_notEmpty.notify_all();
}
//佇列為空
bool Empty()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.empty();
}
//佇列為滿
bool Full()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size() == m_maxSize;
}
//佇列大小
size_t Size()
{
std::lock_guard<std::mutex> locker(m_mutex);
return m_queue.size();
}
private:
//往佇列裡新增事件,事件是範型的,c++11我們可以把函式通過std::function封裝為物件。
template<typename F>
void Add(F &&x)
{
std::unique_lock<std::mutex> locker(m_mutex);
m_notFull.wait(locker, [this] {
return m_needStop || NotFull() ; });
if(m_needStop)
return;
m_queue.push_back(std::forward<F>(x));
m_notEmpty.notify_one();
}
//佇列未滿
bool NotFull() const
{
bool full = m_queue.size() >= m_maxSize;
if(full)
std::cout << "緩衝區滿了...請等待" << std::endl;
return !full;
}
//佇列不為空
bool NotEmpty() const
{
bool empty = m_queue.empty();
if(empty)
{
std::cout << "緩衝區空了...請等待" << std::endl;
std::cout << "執行緒ID:" << std::this_thread::get_id() << std::endl;
}
return !empty;
}
private:
std::mutex m_mutex; //互斥鎖
std::list<T> m_queue; //佇列,存放任務
std::condition_variable m_notEmpty; //佇列不為空的條件變數
std::condition_variable m_notFull; //佇列不為滿的條件變數
int m_maxSize; //任務佇列最大長度
bool m_needStop; //終止標識
};
執行緒池:
#include <stdio.h>
#include <stdlib.h>
#include <assert.h>
#include <unistd.h>
#include "SynQueue.h"
#include <functional>
#include <thread>
#include <memory>
#include <atomic>
const int MaxTaskCount = 100;
class ThreadPool
{
public:
//規定任務型別為void(),我們可以通過c++11 不定引數模板來實現一個可接受任何函式的範型函式模板,這樣就是一個可以接受任何任務的任務隊列了。
using Task = std::function<void()>;
//hardware_concurrency檢測硬體效能,給出預設執行緒數
ThreadPool(int numThreads = std::thread::hardware_concurrency()):
m_queue(MaxTaskCount)
{
//初始化執行緒,並通過shared_ptr來管理
Start(numThreads);
}
//銷燬執行緒池
~ThreadPool(void)
{
Stop();
}
//終止所有執行緒,call_once保證函式只調用一次
void Stop()
{
std::call_once(m_flag, [this] { StopThreadGroup(); });
}
//新增任務,普通版本和右值引用版本
void AddTask(const Task& task)
{
m_queue.Put(task);
}
void AddTask(Task && task)
{
m_queue.Put(std::forward<Task>(task));
}
private:
//停止執行緒池
void StopThreadGroup()
{
m_queue.Stop();
m_running = false;
for(auto thread : m_threadgroup)
{
if(thread)
thread->join();
}
m_threadgroup.clear();
}
void Start(int numThreads)
{
m_running = true;
for(int i = 0; i < numThreads; ++i)
{
//智慧指標管理,並給出構建執行緒的引數,執行緒呼叫函式和引數
std::cout << "Init create thread pool" << std::endl;
m_threadgroup.push_back(std::make_shared<std::thread>(&ThreadPool::RunInThread, this));
}
}
//一次取出佇列中全部事件
void RunInThread_list()
{
while(m_running)
{
std::list<Task> list;
std::cout << "take " << std::endl;
m_queue.Take(list);
for(auto &task : list)
{
if(!m_running)
return;
task();
}
}
}
//一次只取一個事件
void RunInThread()
{
std::cout << m_queue.Size() << std::endl;
while(m_running)
{
Task task;
if(!m_running)
return;
m_queue.Take(task);
}
}
private:
//執行緒池
std::list<std::shared_ptr<std::thread>> m_threadgroup;
//任務佇列
SynQueue<Task>m_queue;
//原子布林值
std::atomic_bool m_running;
//輔助變數->call_once
std::once_flag m_flag;
};
int main(int argc, char *argv[])
{
ThreadPool pool(2);
//建立執行緒向任務佇列新增任務
std::thread thd1([&pool]{
for(int i = 0; i < 10; i++)
{
auto thdId = std::this_thread::get_id();
pool.AddTask([thdId](){
std::cout << thdId << " thread execute task" << std::endl;
});
}
});
std::this_thread::sleep_for(std::chrono::seconds(2));
pool.Stop();
thd1.join();
return EXIT_SUCCESS;
}
參考書籍:
深入應用c++11
完