1. 程式人生 > >C++簡易執行緒池的實現

C++簡易執行緒池的實現

文章目錄

1.執行緒池基本瞭解

一種執行緒使用模式。執行緒過多會帶來排程開銷,進而影響快取區域性性和整體效能。而執行緒池維護著多個執行緒,等待著監督管理者分配可併發執行的任務。這避免了在處理短時間任務時建立與銷燬執行緒的代價。執行緒池不僅能夠保證核心的充分利用,還能防止過分排程。可用執行緒數量應該取決於可用的併發處理器、處理器核心、記憶體、網路sockets等的數量。

2.執行緒池的應用場景

1)需要大量的執行緒來完成任務,且完成任務的時間比較短。 WEB伺服器完成網頁請求這樣的任務,使用執行緒池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點選次數。 但對於長時間的任務,比如一個Telnet連線請求,執行緒池的優點就不明顯了。因為Telnet會話時間比執行緒的建立時間大多了。
2)對效能要求苛刻的應用,比如要求伺服器迅速響應客戶請求。
3) 接受突發性的大量請求,但不至於使伺服器因此產生大量執行緒的應用。突發性大量客戶請求,在沒有執行緒池情況下,將產生大量執行緒,雖然理論上大部分作業系統執行緒數目最大值不是問題,短時間內產生大量執行緒可能使記憶體到達極限,出現錯誤.

3.實現一個簡單的加法任務的執行緒池

1)首先我們建立一個任務類。

//回撥函式
typedef int(*cal_t)(int,int);

class Task{
  private:
    int x;//運算元1
    int y;//運算元2
    int z;//運算結果
    cal_t handler_task;//運算操作
  public:
    Task(int a,int b,cal_t handler_task_)//建構函式
    :x(a),y(b),handler_task(handler_task_)
    {

    }
   //執行操作
     void Run()
    {
      z = handler_task(x,y);
    }
   //將結果輸出
    void Show()
    {
      cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl;
    }

    ~Task()
    {

    }
};


2)執行緒池類

《1》執行緒池的成員

class  ThreadPool{
   private:
     queue<Task> Task_Queue;//將任務新增在一個佇列中
     bool IsStop;//執行緒池的工作狀態
     int ThreadNum;//執行緒池內的執行緒數量
     //為了執行緒安全所以需要互斥鎖與條件變數保護臨界資源
     pthread_mutex_t lock;
     pthread_cond_t cond;
};

《2》根據測試實現執行緒池內的成員函式

#include"ThreadPool.hpp"

#define NUM 5 //設定執行緒池的執行緒數量
//任務操作
int Add(int x,int y)
{
  return x + y;
}

int main()
{
//建立一個執行緒池類
  ThreadPool *tp =new ThreadPool(NUM);
  //執行緒池的初始化
  tp -> InitThreadPool();
//為了便於理解,我們使用while迴圈先一直往任務佇列新增任務。
  int count =1;
  while(1){
    sleep(1);
    Task t(count,count-1,Add);
    tp->AddTask(t);
    count++;
  }
  return 0;
}

a.執行緒池類的建構函式與解構函式

//建構函式初始化時,將IsStop狀態設定為fasle,否則執行緒池無法工作。
 ThreadPool(int num):ThreadNum(num),IsStop(false)
     {}

b.執行緒池的初始化

 void InitThreadPool()
     {
     //對互斥鎖以及條件變臉初始化。
       pthread_mutex_init(&lock,NULL);
       pthread_cond_init(&cond,NULL);
       //設定變數控制執行緒的建立
       int i = 0;
       for(;i<ThreadNum;i++)
       {
          pthread_t tid;
          pthread_create(&tid,NULL,route,(void *)this);
       }
     }

//解構函式完成互斥鎖以及條件變數的銷燬
~ThreadPool()
     {
          pthread_mutex_destroy(&lock);
          pthread_cond_destroy(&cond);
     }

c.初始化完後實現執行緒建立中的route函式

//C++中static函式內無this指標,C++類對執行緒操作要用static修飾
 static  void *route(void *arg)
     {
        ThreadPool *tp = (ThreadPool*)arg;
        //將執行緒自己分離出來
        pthread_detach(pthread_self());
        while(1){
        //為了保證執行緒安全,將任務佇列加鎖
          tp->LockQueue();
          //當任務佇列為空時,執行緒先進入等待狀態
          if(tp->IsEmpty()){
              tp->IdleThread();
          }
          //當佇列有任務時,建立一個任務類並得到該任務,也由此得出需要一個AddTask();成員函式。
          Task t = tp->GetTask();
          //取到會解鎖
          tp->UnlockQueue();
          //執行任務操作
          t.Run();
          //將任務結果顯示
          t.Show();
        }
     }

d.依次實LockQueue();IsEmpty();IDleThread();GetTask();UnlockQueue();

    //任務佇列加鎖
     void LockQueue()
     {
       pthread_mutex_lock(&lock);
     }
     //任務解鎖解鎖
     void UnlockQueue()
     {
       pthread_mutex_unlock(&lock);
     }
     //判斷任務是否為空
     bool IsEmpty()
     {
       return Task_Queue.size() == 0 ? true : false;
     }
     //加任務
     void AddTask(Task &t)
     {
      Task_Queue.push(t);
      //當得到一個任務時便傳送訊號給一個程序
      NoticOneThread();
      UnlockQueue();
     }
     //傳送訊號給一個程序
      void NoticOneThread()
     {
       pthread_cond_signal(&cond);
     }
        //等待佇列
     void IdleThread()
     {
       pthread_cond_wait(&cond,&lock);
     }
    //得到任務
     Task GetTask()
     {
       Task t = Task_Queue.front();
       Task_Queue.pop();
       return t;
     }

e.暫停分析
貌似我們已經實現了一個加法的簡易執行緒池,首先建立一個執行緒池物件,然後進行初始化,建立設定數的執行緒,並且它們都處於等待狀態,當有任務時,它們依次被喚醒,執行任務。但是我們測試用的是while(1)假如沒有了任務,執行緒便一直處於了等待狀態不會退出,於是我們還要實現一個Stop()成員函式,若停止了廣播NoticAllThread()中的執行緒讓他們依次退出,並修改IdleThread()以及AddTask()函式進行IsStop條件判斷。

 void Stop()
     {
       LockQueue();
       IsStop = true;
       UnlockQueue();
       while(ThreadNum > 0){
         NoticAllThread();
       }
     }
 void IdleThread()
     {
       if(IsStop){
         UnlockQueue();
         ThreadNum--;
         pthread_exit((void *)0);
         cout<<"pthread "<<pthread_self() <<"quit"<<endl;
         return ;
       }
       pthread_cond_wait(&cond,&lock);
     }
       void AddTask(Task &t)
     {
       if(IsStop)
       {
         UnlockQueue();
         return ;
       }
      Task_Queue.push(t);
      NoticOneThread();
      UnlockQueue();
     }
    
     void NoticAllThread()
     {
       pthread_cond_broadcast(&cond);
     }


3) 完整程式碼以及測試

標頭檔案

#ifndef __THREADPOLL_HPP__
#define __THREADPOLL_HPP__ 

#include<iostream>
#include<pthread.h>
#include<unistd.h>
#include<queue>

using namespace std;


typedef int(*cal_t)(int,int);

class Task{
  private:
    int x;
    int y;
    int z;
    cal_t handler_task;
  public:
    Task(int a,int b,cal_t handler_task_)
    :x(a),y(b),handler_task(handler_task_)
    {

    }

    void Run()
    {
      z = handler_task(x,y);
    }

    void Show()
    {
      cout<<"thread : "<<pthread_self()<<"Task finish,result is : "<<z<<endl;
    }

    ~Task()
    {

    }
};

class  ThreadPool{
   private:
     queue<Task> Task_Queue;
     bool IsStop;
     int ThreadNum;
     pthread_mutex_t lock;
     pthread_cond_t cond;



  private:
    static  void *route(void *arg)
     {
        ThreadPool *tp = (ThreadPool*)arg;
        pthread_detach(pthread_self());
        while(1){
          tp->LockQueue();
          if(tp->IsEmpty()){
              tp->IdleThread();
          }
          Task t = tp->GetTask();
          tp->UnlockQueue();
          
          t.Run();
          t.Show();
        }
     }

     void NoticOneThread()
     {
       pthread_cond_signal(&cond);
     }

     void NoticAllThread()
     {
       pthread_cond_broadcast(&cond);
     }
   public:
     ThreadPool(int num):ThreadNum(num),IsStop(false)
     {}

     void InitThreadPool()
     {
       pthread_mutex_init(&lock,NULL);
       pthread_cond_init(&cond,NULL);
       int i = 0;
      
       
       for(;i<ThreadNum;i++)
       {
          pthread_t tid;
          pthread_create(&tid,NULL,route,(void *)this);
       }
     }

     ~ThreadPool()
     {
          pthread_mutex_destroy(&lock);
          pthread_cond_destroy(&cond);
     }
    

     void LockQueue()
     {
       pthread_mutex_lock(&lock);
     }

     void UnlockQueue()
     {
       pthread_mutex_unlock(&lock);
     }
     
     bool IsEmpty()
     {
       return Task_Queue.size() == 0 ? true : false;
     }

     void AddTask(Task &t)
     {
       if(IsStop)
       {
         UnlockQueue();
         return ;
       }
      Task_Queue.push(t);
      NoticOneThread();
      UnlockQueue();
     }
        
     void IdleThread()
     {
       if(IsStop){
         UnlockQueue();
         ThreadNum--;
         pthread_exit((void *)0);
         cout<<"pthread "<<pthread_self() <<"quit"<<endl;
         return ;
       }
       pthread_cond_wait(&cond,&lock);
     }

     Task GetTask()
     {
       Task t = Task_Queue.front();
       Task_Queue.pop();
       return t;
     }

     void Stop()
     {
       LockQueue();
       IsStop = true;
       UnlockQueue();
       while(ThreadNum > 0){
         NoticAllThread();
       }
     }

};



#endif 

測試檔案
我們只設置九個任務。

#include"ThreadPool.hpp"

#define NUM 5

int Add(int x,int y)
{
  return x + y;
}

int main()
{
  ThreadPool *tp =new ThreadPool(NUM);
  tp -> InitThreadPool();

  int count =1;
  int u = 0;
  for(;u<1;u++){
    sleep(1);
    Task t(count,count-1,Add);
    tp->AddTask(t);
    count++;
  }
  return 0;
}

結果
在gdb下觀察:
1)首先建立5個執行緒。
在這裡插入圖片描述
2)5個執行緒桉序執行任務。
在這裡插入圖片描述
3)沒有任務了執行緒退出。
在這裡插入圖片描述