1. 程式人生 > >用C++實現簡單執行緒池

用C++實現簡單執行緒池

執行緒池

什麼是執行緒池

  • 在使用執行緒的CS模型中,伺服器端每接收到一個客戶端請求,都會為客戶端建立執行緒資源,當有大量突發性請求時,伺服器來不及為每個客戶端建立執行緒。執行緒每次的建立與銷燬都會耗費伺服器大量資源與時間,可以在伺服器一開始就建立好一堆執行緒,等到客戶端請求來臨直接讓這些執行緒進行處理,這就是執行緒池。
  • 執行緒池是一種多執行緒的處理模式,線上程池啟動之後,向執行緒池中新增任務,執行緒池中的執行緒將會自動處理這些任務

適用場景

  • 需要大量的執行緒來完成任務,並且每個任務的時間比較短。在WEB伺服器上適用執行緒池是非常有必要的,單個任務量小,任務量大。
  • 對效能比較苛刻,需要立即做出應答。
  • 接收突發性的大量請求,但是伺服器不至於產生大量執行緒。

簡單實現

任務模組:適用setData為每個物件設定任務,使用Run來執行任務佇列。

struct Task{
	public:
		void *_data;
	public:
		bool setData(void *data){
			_data = data;
		}

		bool Run(){
			srand(time(NULL));
			int sec = rand() % 3;
			printf("id:%p -- run data -- %s -- sleep:%d\n"
, pthread_self(), _data, sec); sleep(sec); } };

執行緒池模組

class pthreadPool{
	private:
		int _max_thr; //max pthread
		int _cur_thr; //now pthread
		bool _stop_flag; //is stop
		std::queue<Task*> _queue;
		int _cap; //queue capacity
		pthread_mutex_t _lock;
		pthread_cond_t _full;
		pthread_cond_t _empty;
};
  • 使用STL中的佇列來模擬實現任務佇列,_max_thr表示當前執行緒池中最大的執行緒數,
  • _cur_thr是當前的執行緒的數量,_stop_flag是否停止標誌,_cap是佇列的最大長度。

建構函式:對相關互斥鎖和條件變數進行初始化

pthreadPool(int max_thr = 5, int max_queue = 10)
			: _max_thr(max_thr)
			, _cur_thr(0)
			, _stop_flag(false)
			, _cap(max_queue)
		{
			pthread_mutex_init(&_lock, NULL);
			pthread_cond_init(&_full, NULL);
			pthread_cond_init(&_empty, NULL);
		}

初始化執行緒池:建立執行緒池支援的最大個數個執行緒

bool initPool(){
	pthread_t tid;
	int i, ret;
	for(i = 0; i < _max_thr; i++){
		ret = pthread_create(&tid, NULL, thrStart, (void*)this);
		if(ret != 0){
			std::cerr << "pthread_create error" << std::endl;	
			return false;
		}
		pthread_detach(tid);
	}
}

向執行緒池中新增任務

bool addTask(Task *task){
	pthread_mutex_lock(&_lock);
	while(queueIsFull()){
		pthread_cond_wait(&_full, &_lock);
	}
	_queue.push(task);
	pthread_cond_signal(&_empty);
	pthread_mutex_unlock(&_lock);
	return true;
}
  • 在每次向執行緒池中新增任務時,為了防止其他執行緒獲得CPU排程產生的資料安全問題,都需要先對其進行加鎖操作
  • 如果任務佇列滿了就需要等待在_full條件變數上,此處要用while迴圈來判斷,而不能用if
  • 佇列不滿,向佇列中新增任務,新增完畢之後向等待在_empty上的執行緒做出通知,最後釋放鎖,新增完畢

執行緒處理函式

static void *thrStart(void *arg){
	pthreadPool *pool = (pthreadPool*)arg;
	while(1){
		pthread_mutex_lock(&pool->_lock);
		//queue is empty, wait
		while(pool->queueIsEmpty() && !(pool->_stop_flag)){
			pthread_cond_wait(&pool->_empty, &pool->_lock);
		}
		
		if(pool->queueIsEmpty() && pool->_stop_flag){
			std::cout << "-pthread eixt-" << std::endl;
			pool->_cur_thr--;
			pthread_mutex_unlock(&pool->_lock);
			pthread_cond_signal(&pool->_full);
			pthread_exit(NULL);
		}
		
		Task* task;
		pool->queuePop(&task);
		pthread_mutex_unlock(&pool->_lock);
		pthread_cond_signal(&pool->_full);
		task->Run();
	}
}
  • 此處執行緒處理函式宣告為靜態函式,所以在建立執行緒時需要把隱含的this指標傳遞過來
  • 在進行操作之前先進行加鎖操作,如果任務佇列為空並且不退出狀態就等待在_empty
  • 取出任務之後,因為不知道任務具體執行的時間,為了防止死鎖,所以先進行解鎖操作,通知等待在_full上的執行緒可以新增任務
  • 最後執行任務。

判斷是否停止執行緒池

bool stop(){
	pthread_mutex_lock(&_lock);
	if(_stop_flag){
		pthread_mutex_unlock(&_lock);
		return false;
	}
	
	_stop_flag = true;
	while(_cur_thr > 0){
		pthread_cond_broadcast(&_empty);
		pthread_cond_wait(&_full, &_lock);
	}
	pthread_mutex_unlock(&_lock);
	return false;
}