1. 程式人生 > >[C++]C++ 100行實現執行緒池

[C++]C++ 100行實現執行緒池

一個100行左右的簡單執行緒池。用到了std::mutex和std::thread等新特性。

執行緒池模型

首先把每個函式抽象為一個任務(Task),任務的過程就是呼叫這個Task的run函式。
然後把執行緒池中的執行緒封裝為一個執行緒類(Thread),一直等待排程器分配任務(空閒狀態),如果有任務分配立即進入忙狀態。等任務執行結束再次變為空閒狀態。
最後是一個排程器類(TreadPool),包含任務佇列(隨時新增新任務),和一個包含了Thread的vector(執行緒池中的執行緒)。如果任務佇列非空,排程器每次從中取出一個任務,然後輪詢執行緒池,搜尋空閒執行緒並把這個任務交給執行緒。
模型如下圖所示:

程式碼實現

下面的程式碼實現了上述模型。其中Task類通過睡眠一定的秒數模擬任務,可以看到T1先執行完畢(1秒完畢),T2和T3在之後同時完畢,說明排程非常成功。

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 //linux, g++ -std=c++14 -o t *.cpp -pthread#include <queue>#include <iostream>#include <mutex>#include <thread>#include <vector>#include <unistd.h> class Task{private: int no;public: Task(int n){ no = n; } //可以繼承這個類重寫該方法執行任務 virtual void run(){ sleep(no); //構造時決定執行幾秒,模擬執行緒執行
std::cout << no << "T\n"; }};class Thread{private: std::thread _thread; bool _isfree; Task *_task; std::mutex _locker;public: //構造 Thread() : _isfree(true), _task(nullptr){ _thread = std::thread(&Thread::run, this); _thread.detach(); //放到後臺, join是等待執行緒結束 } //是否空閒 bool isfree(){ return _isfree; } //新增任務 void add_task(Task *task){ if(_isfree){ _locker.lock(); _task = task; _isfree = false; _locker.unlock(); } } //如果有任務則執行任務,否則自旋 void run(){ while(true){ if(_task){ _locker.lock(); _isfree = false; _task->run(); _isfree = true; _task = nullptr; _locker.unlock(); } } }};class ThreadPool{private: std::queue<Task *> task_queue; std::vector<Thread *> _pool; std::mutex _locker;public: //構造執行緒並後臺執行,預設數量為10 ThreadPool(int n = 10){ while(n--){ Thread *t = new Thread(); _pool.push_back(t); } std::thread main_thread(&ThreadPool::run, this); main_thread.detach(); } //釋放執行緒池 ~ThreadPool(){ for(int i = 0;i < _pool.size(); ++i){ delete _pool[i]; } } //新增任務 void add_task(Task *task){ _locker.lock(); task_queue.push(task); _locker.unlock(); } //輪詢 void run(){ while(true){ _locker.lock(); if(task_queue.empty()){ continue; } // 尋找空閒執行緒執行任務 for(int i = 0; i < _pool.size(); ++i){ if(_pool[i]->isfree()){ _pool[i]->add_task(task_queue.front()); task_queue.pop(); break; } } _locker.unlock(); } }};int main(){ ThreadPool tp(2); Task t1(1); Task t2(3); Task t3(2); tp.add_task(&t1); tp.add_task(&t2); tp.add_task(&t3); sleep(4); //等待排程器結束,不然會崩潰 return 0;}