基於C++11 thread 實現執行緒池
阿新 • • 發佈:2018-12-22
轉自:https://blog.csdn.net/u013507368/article/details/48130151
這裡基於C++11 thread實現執行緒池,執行緒池不可拷貝。
1 nocopyable類
不可拷貝基類繼承它,派生類不可拷貝,實現如下
//nocopyable.h
#ifndef NOCOPYABLE_H #define NOCOPYABLE_H namespace fivestar { class nocopyable { private: nocopyable(const nocopyable& x) = delete; nocopyable& operator=(const nocopyable&x) = delete; public: nocopyable() = default; ~nocopyable() = default; }; } #endif // NOCOPYABLE_H
2 ThreadPool類
//ThreadPool.h
#ifndef THREADPOOL_H #define THREADPOOL_H #include <thread> #include <mutex> #include <functional> #include <string> #include <condition_variable> #include <deque> #include <vector> #include <memory> #include "nocopyable.h" namespace fivestar { class ThreadPool:public nocopyable { public: typedef std::function<void()> Task; explicit ThreadPool(const std::string &name = std::string()); ~ThreadPool(); void start(int numThreads);//設定執行緒數,建立numThreads個執行緒 void stop();//執行緒池結束 void run(const Task& f);//任務f線上程池中執行 void setMaxQueueSize(int maxSize) { _maxQueueSize = maxSize; }//設定任務佇列可存放最大任務數 private: bool isFull();//任務佇列是否已滿 void runInThread();//執行緒池中每個thread執行的function Task take();//從任務佇列中取出一個任務 std::mutex _mutex; std::condition_variable _notEmpty; std::condition_variable _notFull; std::string _name; std::vector<std::thread> _threads; std::deque<Task> _queue; size_t _maxQueueSize; bool _running; }; } #endif // THREADPOOL_H
//ThreadPool.cpp
#include "ThreadPool.h" #include <cassert> using namespace fivestar; using namespace std; ThreadPool::ThreadPool(const string &name): _name(name), _maxQueueSize(0), _running(false) { } ThreadPool::~ThreadPool() { if(_running) { stop(); } } void ThreadPool::start(int numThreads) { assert(_threads.empty()); _running = true; _threads.reserve(numThreads); for(int i = 0;i < numThreads;++i) { _threads.push_back(thread(&ThreadPool::runInThread,this)); } } void ThreadPool::stop() { { unique_lock<mutex> lock(_mutex); _running = false; _notEmpty.notify_all(); } for(size_t i = 0;i < _threads.size();++i) { _threads[i].join(); } } void ThreadPool::run(const Task &f) { if(_threads.empty()) { f(); } else { unique_lock<mutex> lock(_mutex); while(isFull()) { _notFull.wait(lock); } assert(!isFull()); _queue.push_back(f); _notEmpty.notify_one(); } } ThreadPool::Task ThreadPool::take() { unique_lock<mutex> lock(_mutex); while(_queue.empty() && _running) { _notEmpty.wait(lock); } Task task; if(!_queue.empty()) { task = _queue.front(); _queue.pop_front(); if(_maxQueueSize > 0) { _notFull.notify_one(); } } return task; } bool ThreadPool::isFull() { return _maxQueueSize > 0 && _queue.size() >= _maxQueueSize; } void ThreadPool::runInThread() { try { while(_running) { Task task = take(); if(task) { task(); } } } catch (const exception& ex) { fprintf(stderr, "exception caught in ThreadPool %s\n", _name.c_str()); fprintf(stderr, "reason: %s\n", ex.what()); abort(); } catch(...) { fprintf(stderr, "exception caught in ThreadPool %s\n", _name.c_str()); } }
注意:
1 .為執行緒池新增任務之前一定要呼叫setMaxQueueSize,設定任務佇列可存放的最大任務數,否則執行緒池退化為但執行緒
2 若不呼叫start建立執行緒,則執行緒池退化為單執行緒
#include <iostream>
#include "ThreadPool.h"
using namespace std;
void Test(int i)
{
printf("I love you %d time\n",i);
}
int main()
{
fivestar::ThreadPool threadPool;
threadPool.setMaxQueueSize(10);
threadPool.start(2);
for(int i = 0;i < 10;++i)
{
auto task = bind(Test,i);
threadPool.run(task);
}
getchar();
return 0;
}