1. 程式人生 > >【Linux】執行緒池

【Linux】執行緒池

首先,執行緒池是什麼?顧名思義,就是把一堆開闢好的執行緒放在一個池子裡統一管理,就是一個執行緒池。

  其次,為什麼要用執行緒池,難道來一個請求給它申請一個執行緒,請求處理完了釋放執行緒不行麼?也行,但是如果建立執行緒和銷燬執行緒的時間比執行緒處理請求的時間長,而且請求很多的情況下,我們的CPU資源都浪費在了建立和銷燬執行緒上了,所以這種方法的效率比較低,於是,我們可以將若干已經建立完成的執行緒放在一起統一管理,如果來了一個請求,我們從執行緒池中取出一個執行緒來處理,處理完了放回池內等待下一個任務,執行緒池的好處是避免了繁瑣的建立和結束執行緒的時間,有效的利用了CPU資源。

功能:節省大量執行緒建立銷燬成本,避免執行緒建立過多導致程式出錯。

分類:

1.啟動時就建立固定數量的執行緒,一直處理請求------請求比較多,耗時較短。

2.啟動時不建立執行緒,當有請求時找空閒執行緒,如果沒有空閒執行緒,就建立一個工作執行緒,如果工作執行緒等待超時,則工作執行緒退出釋放資源。-------請求一般較少,耗時較長,防止峰值壓力。

執行緒池 = 多個執行緒+任務佇列

用第一種方式實現一個簡單的執行緒池:

#include<iostream>
#include<time.h>
#include<queue>
#include<stdlib.h>
#include<unistd.h>
#include<pthread.h>
using namespace std;


class Task
{
public:
void SetData(void* data)
{
_data = data;
}

bool Run()
{
srand(time(NULL));
int nsec = rand()%5;
cout <<"ID: " << pthread_self() << " run data:-------"
<<(char*)_data << "-------sleep " << nsec << endl;
sleep(nsec);
return true;
}
private:
void* _data;
};


class ThreadPool
{
public:
ThreadPool(int max_thread = 5, int max_queue = 10)
:_max_thread(max_thread)
,_cur_thread(max_thread)
,_stop_flag(0)
,_cap(max_queue)
{
pthread_mutex_init(&_mutex, NULL);
pthread_cond_init(&_empty,www.leyou2.net NULL);
pthread_cond_init(&_full, NULL);
}

~ThreadPool()
{
pthread_mutex_destroy(&_mutex);
pthread_cond_destroy(&_empty);
pthread_cond_destroy(&_full);
}

bool Init()
{
pthread_t tid;
for(int i = 0; i < _max_thread; ++i)
{
int ret = pthread_create(&tid, NULL, thr_start, (void*)this);
if(ret != 0)
{
cout << "pthread_create error" << endl;
return false;
}
pthread_detach(tid);
}
}
bool AddTask(Task* t)
{
pthread_mutex_lock(&_mutex);
while(Full())
{
pthread_cond_wait(&_full, &_mutex);
}

QueuePush(t);
pthread_cond_signal(&_empty);
pthread_mutex_unlock(&_mutex);
return true;
}
bool Stop()
{
pthread_mutex_lock(www.thd178.com/ &_mutex);
if(_stop_flag == 1)
{
pthread_mutex_unlock(&_mutex);
return false;
}
_stop_flag = 1;
while(_cur_thread > 0)
{
pthread_cond_broadcast(&_empty);
pthread_cond_wait(&_full, &_mutex);
}
pthread_mutex_unlock(&_mutex);
return false;
}
private:

bool Empty()
{
return _list.empty();
}

bool Full()
{
return (_cap == _list.size());
}

bool QueuePush(Task*www.michenggw.com task)
{
_list.push(task);
return true;
}

bool QueuePop(Task** task)
{
*task = _list.front();
_list.pop();
return true;
}

static void* thr_start(void *arg)
{
ThreadPool* p = (ThreadPool*)arg;

while(1)
{

pthread_mutex_lock(&p->_mutex);
//如果佇列為空,且不是退出狀態,陷入等待
while(p->Empty() && p->_stop_flag != 1)
pthread_cond_wait(&p->_empty, &p->_mutex);
//如果佇列為空,且處於退出狀態,則退出
//退出前當前執行緒數-1,解鎖
if(p->Empty() && p->_stop_flag == 1)
{
cout << "--------thread exit" << endl;
p->_cur_thread--;
pthread_mutex_unlock(&p->_mutex);
pthread_cond_signal(www.ysyl157.com&p->_full);
pthread_exit(NULL);
}
Task *task;
p->QueuePop(&task);
pthread_mutex_unlock(&p->_mutex);

pthread_cond_signal(&p->_full);
//任務的執行,需要放在解鎖外面,因為我們不知道任務需要執行多長時間
//如果先執行後解鎖,有可能其他執行緒一直無法獲取任務
task->Run();
}
return NULL;
}
private:
int _max_thread;//最大執行緒數量
int _cur_thread;//當前執行緒數量
int _stop_flag;//執行緒池中執行緒的退出標誌
size_t _cap; //佇列最大節點數
queue<Task*> _list;//執行緒池 任務佇列
pthread_mutex_t _mutex;
pthread_cond_t _empty;
pthread_cond_t _full;
};


int main()
{
ThreadPool pool;
pool.Init();
Task task[10];
char* c = "hello world!!!";
for(int i = 0; i < 10; ++i)
{
sleep(1);
task[i].SetData((void*)c);
cout << "add task: " << c <<endl;
pool.AddTask(&task[i]);
}
pool.Stop();
return 0;