1. 程式人生 > >執行緒池原理及C語言實現執行緒池

執行緒池原理及C語言實現執行緒池

備註:該執行緒池原始碼參考自傳直播客培訓視訊配套資料;
原始碼:https://pan.baidu.com/s/1zWuoE3q0KT5TUjmPKTb1lw 密碼:pp42
引言:執行緒池是一種多執行緒處理形式,大多用於高併發伺服器上,它能合理有效的利用高併發伺服器上的執行緒資源;

在Unix網路程式設計中,執行緒與程序用於處理各項分支子功能,我們通常的操作是:接收訊息 ==> 訊息分類 ==> 執行緒建立 ==> 傳遞訊息到子執行緒 ==> 執行緒分離 ==> 在子執行緒中執行任務 ==> 任務結束退出;
對大多數小型區域網的通訊來說,上述方法足夠滿足需求;但當我們的通訊範圍擴大到廣域網或大型區域網通訊中時,我們將面臨大量訊息頻繁請求伺服器;在這種情況下,建立與銷燬執行緒都已經成為一種奢侈的開銷,特別對於嵌入式伺服器來說更應保證記憶體資源的合理利用;
因此,執行緒池技術應運而生;執行緒池允許一個執行緒可以多次複用,且每次複用的執行緒內部的訊息處理可以不相同,將建立與銷燬的開銷省去而不必來一個請求開一個執行緒;

結構講解:
執行緒池是一個抽象的概念,其內部由任務佇列,一堆執行緒,管理者執行緒組成
這裡寫圖片描述
我們將以上圖為例,實現一個最基礎的執行緒池,接下來將分部分依次講解;講解順序為:1.執行緒池總體結構 2.執行緒陣列 3.任務佇列 4.管理者執行緒 5.使用執行緒池介面的例子

一、執行緒池總體結構

這裡講解執行緒池在邏輯上的結構體;看下方程式碼,該結構體threadpool_t中包含執行緒池狀態資訊,任務佇列資訊以及多執行緒操作中的互斥鎖;在任務結構體中包含了一個可以放置多種不同任務函式的函式指標,一個傳入該任務函式的void*型別的引數
注意:在使用時需要將你的訊息分類處理函式

裝入任務的(*function);然後放置到任務佇列並通知空閒執行緒;

執行緒池狀態資訊:描述當前執行緒池的基本資訊,如是否開啟、最小執行緒數、最大執行緒數、存活執行緒數、忙執行緒數、待銷燬執行緒數等… …
任務佇列資訊:描述當前任務佇列基本資訊,如最大任務數、佇列不為滿條件變數、佇列不為空條件變數等… …
多執行緒互斥鎖:保證在同一時間點上只有一個執行緒在任務佇列中取任務並修改任務佇列資訊、修改執行緒池資訊;
函式指標:在打包訊息階段,將分類後的訊息處理函式放在(*function);
void*型別引數:用於傳遞訊息處理函式需要的資訊;

/*任務*/
typedef
struct { void *(*function)(void *); void *arg; } threadpool_task_t; /*執行緒池管理*/ struct threadpool_t{ pthread_mutex_t lock; /* 鎖住整個結構體 */ pthread_mutex_t thread_counter; /* 用於使用忙執行緒數時的鎖 */ pthread_cond_t queue_not_full; /* 條件變數,任務佇列不為滿 */ pthread_cond_t queue_not_empty; /* 任務佇列不為空 */ pthread_t *threads; /* 存放執行緒的tid,實際上就是管理了線 陣列 */ pthread_t admin_tid; /* 管理者執行緒tid */ threadpool_task_t *task_queue; /* 任務佇列 */ /*執行緒池資訊*/ int min_thr_num; /* 執行緒池中最小執行緒數 */ int max_thr_num; /* 執行緒池中最大執行緒數 */ int live_thr_num; /* 執行緒池中存活的執行緒數 */ int busy_thr_num; /* 忙執行緒,正在工作的執行緒 */ int wait_exit_thr_num; /* 需要銷燬的執行緒數 */ /*任務佇列資訊*/ int queue_front; /* 隊頭 */ int queue_rear; /* 隊尾 */ int queue_size; /* 存在的任務數 */ int queue_max_size; /* 佇列能容納的最大任務數 */ /*執行緒池狀態*/ int shutdown; /* true為關閉 */ };
**/*建立執行緒池*/**
threadpool_t *
threadpool_create(int min_thr_num, int max_thr_num, int queue_max_size)
{               /*  最小執行緒數           最大執行緒數         最大任務數*/
   int i;
   threadpool_t *pool = NULL;
   do
   {
      /* 執行緒池空間開闢 */
      if ((pool=(threadpool_t *)malloc(sizeof(threadpool_t))) == NULL)
      {
        printf("malloc threadpool false; \n");
        break;   
      }
      /*資訊初始化*/
      pool->min_thr_num = min_thr_num;
      pool->max_thr_num = max_thr_num;
      pool->busy_thr_num = 0;
      pool->live_thr_num = min_thr_num;
      pool->wait_exit_thr_num = 0;
      pool->queue_front = 0;
      pool->queue_rear = 0;
      pool->queue_size = 0;
      pool->queue_max_size = queue_max_size;
      pool->shutdown = false;

      /* 根據最大執行緒數,給工作執行緒陣列開空間,清0 */
      pool->threads = (pthread_t *)malloc(sizeof(pthread_t)*max_thr_num);
      if (pool->threads == NULL)
      {
         printf("malloc threads false;\n");
         break;
      }
      memset(pool->threads, 0, sizeof(pthread_t)*max_thr_num);

      /* 佇列開空間 */
      pool->task_queue = 
      (threadpool_task_t *)malloc(sizeof(threadpool_task_t)*queue_max_size);
      if (pool->task_queue == NULL)
      {
         printf("malloc task queue false;\n");
         break;
      }

      /* 初始化互斥鎖和條件變數 */
      if ( pthread_mutex_init(&(pool->lock), NULL) != 0           ||
           pthread_mutex_init(&(pool->thread_counter), NULL) !=0  || 
       pthread_cond_init(&(pool->queue_not_empty), NULL) !=0  ||
       pthread_cond_init(&(pool->queue_not_full), NULL) !=0)
      {
         printf("init lock or cond false;\n");
         break;
      }

      /* 啟動min_thr_num個工作執行緒 */
      for (i=0; i<min_thr_num; i++)
      {
         /* pool指向當前執行緒池  threadpool_thread函式在後面講解 */
         pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
         printf("start thread 0x%x... \n", (unsigned int)pool->threads[i]);
      }
      /* 管理者執行緒 admin_thread函式在後面講解 */
      pthread_create(&(pool->admin_tid), NULL, admin_thread, (void *)pool);

      return pool;
   } while(0);

   /* 釋放pool的空間 */
   threadpool_free(pool);
   return NULL;
}

二、執行緒陣列

執行緒陣列實際上是線上程池初始化時開闢的一段存放一堆執行緒tid的空間,在邏輯上形成一個池,裡面放置著提前建立的執行緒;這段空間中包含了正在工作的執行緒等待工作的執行緒(空閒執行緒),等待被銷燬的執行緒申明但沒有初始化的執行緒空間
這裡寫圖片描述

/*工作執行緒*/
void *
threadpool_thread(void *threadpool)
{
  threadpool_t *pool = (threadpool_t *)threadpool;
  threadpool_task_t task;

  while (true)
  {
    pthread_mutex_lock(&(pool->lock));

    /* 無任務則阻塞在 “任務佇列不為空” 上,有任務則跳出 */
    while ((pool->queue_size == 0) && (!pool->shutdown))
    { 
       printf("thread 0x%x is waiting \n", (unsigned int)pthread_self());
       pthread_cond_wait(&(pool->queue_not_empty), &(pool->lock));

       /* 判斷是否需要清除執行緒,自殺功能 */
       if (pool->wait_exit_thr_num > 0)
       {
          pool->wait_exit_thr_num--;
          /* 判斷執行緒池中的執行緒數是否大於最小執行緒數,是則結束當前執行緒 */
          if (pool->live_thr_num > pool->min_thr_num)
          {
             printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
             pool->live_thr_num--;
             pthread_mutex_unlock(&(pool->lock));
             pthread_exit(NULL);//結束執行緒
          }
       }
    }

    /* 執行緒池開關狀態 */
    if (pool->shutdown) //關閉執行緒池
    {
       pthread_mutex_unlock(&(pool->lock));
       printf("thread 0x%x is exiting \n", (unsigned int)pthread_self());
       pthread_exit(NULL); //執行緒自己結束自己
    }

    //否則該執行緒可以拿出任務
    task.function = pool->task_queue[pool->queue_front].function; //出隊操作
    task.arg = pool->task_queue[pool->queue_front].arg;

    pool->queue_front = (pool->queue_front + 1) % pool->queue_max_size;  //環型結構
    pool->queue_size--;

    //通知可以新增新任務
    pthread_cond_broadcast(&(pool->queue_not_full));

    //釋放執行緒鎖
    pthread_mutex_unlock(&(pool->lock));

    //執行剛才取出的任務
    printf("thread 0x%x start working \n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter));            //鎖住忙執行緒變數
    pool->busy_thr_num++;
    pthread_mutex_unlock(&(pool->thread_counter));

    (*(task.function))(task.arg);                           //執行任務

    //任務結束處理
    printf("thread 0x%x end working \n", (unsigned int)pthread_self());
    pthread_mutex_lock(&(pool->thread_counter));
    pool->busy_thr_num--;
    pthread_mutex_unlock(&(pool->thread_counter));
  }

  pthread_exit(NULL);
}

三、任務佇列

任務佇列的存在形式與執行緒陣列相似;線上程池初始化時根據傳入的最大任務數開闢空間;當伺服器前方後請求到來後,分類並打包訊息成為任務,將任務放入任務佇列並通知空閒執行緒來取;不同之處在於任務佇列有明顯的先後順序,先進先出而執行緒陣列中的執行緒則是一個競爭關係去拿到互斥鎖爭取任務
這裡寫圖片描述

/*向執行緒池的任務佇列中新增一個任務*/
int 
threadpool_add_task(threadpool_t *pool, void *(*function)(void *arg), void *arg)
{
   pthread_mutex_lock(&(pool->lock));

   /*如果佇列滿了,呼叫wait阻塞*/
   while ((pool->queue_size == pool->queue_max_size) && (!pool->shutdown))
      pthread_cond_wait(&(pool->queue_not_full), &(pool->lock));

   /*如果執行緒池處於關閉狀態*/
   if (pool->shutdown)
   {
      pthread_mutex_unlock(&(pool->lock));
      return -1;
   }

   /*清空工作執行緒的回撥函式的引數arg*/
   if (pool->task_queue[pool->queue_rear].arg != NULL)
   {
      free(pool->task_queue[pool->queue_rear].arg);
      pool->task_queue[pool->queue_rear].arg = NULL;
   }

   /*新增任務到任務佇列*/
   pool->task_queue[pool->queue_rear].function = function;
   pool->task_queue[pool->queue_rear].arg = arg;
   pool->queue_rear = (pool->queue_rear + 1) % pool->queue_max_size;  /* 邏輯環  */
   pool->queue_size++;

   /*新增完任務後,佇列就不為空了,喚醒執行緒池中的一個執行緒*/
   pthread_cond_signal(&(pool->queue_not_empty));
   pthread_mutex_unlock(&(pool->lock));

   return 0;
}

四、管理者執行緒

作為執行緒池的管理者,該執行緒的主要功能包括:檢查執行緒池內執行緒的存活狀態,工作狀態;負責根據伺服器當前的請求狀態去動態的增加或刪除執行緒,保證執行緒池中的執行緒數量維持在一個合理高效的平衡上;
說到底,它就是一個單獨的執行緒,定時的去檢查,根據我們的一個維持平衡演算法去增刪執行緒;

/*管理執行緒*/
void *
admin_thread(void *threadpool)
{
   int i;
   threadpool_t *pool = (threadpool_t *)threadpool;
   while (!pool->shutdown)
   {
      printf("admin -----------------\n");
      sleep(DEFAULT_TIME);                             /*隔一段時間再管理*/
      pthread_mutex_lock(&(pool->lock));               /*加鎖*/ 
      int queue_size = pool->queue_size;               /*任務數*/
      int live_thr_num = pool->live_thr_num;           /*存活的執行緒數*/
      pthread_mutex_unlock(&(pool->lock));             /*解鎖*/

      pthread_mutex_lock(&(pool->thread_counter));
      int busy_thr_num = pool->busy_thr_num;           /*忙執行緒數*/  
      pthread_mutex_unlock(&(pool->thread_counter));

      printf("admin busy live -%d--%d-\n", busy_thr_num, live_thr_num);
      /*建立新執行緒 實際任務數量大於 最小正在等待的任務數量,存活執行緒數小於最大執行緒數*/
      if (queue_size >= MIN_WAIT_TASK_NUM && live_thr_num <= pool->max_thr_num)
      {
         printf("admin add-----------\n");
         pthread_mutex_lock(&(pool->lock));
         int add=0;

         /*一次增加 DEFAULT_THREAD_NUM 個執行緒*/
         for (i=0; i<pool->max_thr_num && add<DEFAULT_THREAD_NUM 
              && pool->live_thr_num < pool->max_thr_num; i++)
         {
            if (pool->threads[i] == 0 || !is_thread_alive(pool->threads[i]))
           {
              pthread_create(&(pool->threads[i]), NULL, threadpool_thread, (void *)pool);
              add++;
              pool->live_thr_num++;
              printf("new thread -----------------------\n");
           }
         }

         pthread_mutex_unlock(&(pool->lock));
      }

      /*銷燬多餘的執行緒 忙執行緒x2 都小於 存活執行緒,並且存活的大於最小執行緒數*/
      if ((busy_thr_num*2) < live_thr_num  &&  live_thr_num > pool->min_thr_num)
      {
         // printf("admin busy --%d--%d----\n", busy_thr_num, live_thr_num);
         /*一次銷燬DEFAULT_THREAD_NUM個執行緒*/
         pthread_mutex_lock(&(pool->lock));
         pool->wait_exit_thr_num = DEFAULT_THREAD_NUM;
         pthread_mutex_unlock(&(pool->lock));

         for (i=0; i<DEFAULT_THREAD_NUM; i++)
        {
           //通知正在處於空閒的執行緒,自殺
           pthread_cond_signal(&(pool->queue_not_empty));
           printf("admin cler --\n");
        }
      }

   }

   return NULL;


/*執行緒是否存活*/
int 
is_thread_alive(pthread_t tid)
{
   int kill_rc = pthread_kill(tid, 0);     //傳送0號訊號,測試是否存活
   if (kill_rc == ESRCH)  //執行緒不存在
   {
      return false;
   }
   return true;
}

五、釋放

/*釋放執行緒池*/
int 
threadpool_free(threadpool_t *pool)
{
   if (pool == NULL)
     return -1;
   if (pool->task_queue)
      free(pool->task_queue);
   if (pool->threads)
   {
      free(pool->threads);
      pthread_mutex_lock(&(pool->lock));               /*先鎖住再銷燬*/
      pthread_mutex_destroy(&(pool->lock));
      pthread_mutex_lock(&(pool->thread_counter));
      pthread_mutex_destroy(&(pool->thread_counter));
      pthread_cond_destroy(&(pool->queue_not_empty));
      pthread_cond_destroy(&(pool->queue_not_full));
   }
   free(pool);
   pool = NULL;

   return 0;
}
/*銷燬執行緒池*/
int 
threadpool_destroy(threadpool_t *pool)
{
   int i;
   if (pool == NULL)
   {
     return -1;
   }
   pool->shutdown = true;

   /*銷燬管理者執行緒*/
   pthread_join(pool->admin_tid, NULL);

   //通知所有執行緒去自殺(在自己領任務的過程中)
   for (i=0; i<pool->live_thr_num; i++)
   {
     pthread_cond_broadcast(&(pool->queue_not_empty));
   }

   /*等待執行緒結束 先是pthread_exit 然後等待其結束*/
   for (i=0; i<pool->live_thr_num; i++)
   {
     pthread_join(pool->threads[i], NULL);
   }

   threadpool_free(pool);
   return 0;
}

六、介面

   /* 執行緒池初始化,其管理者執行緒及工作執行緒都會啟動 */
    threadpool_t *thp = threadpool_create(10, 100, 100);
    printf("threadpool init ... ... \n");

   /* 接收到任務後新增 */
   threadpool_add_task(thp, do_work, (void *)p);

   // ... ...

   /* 銷燬 */
   threadpool_destroy(thp);

感謝你看完了這篇文章,希望你的問題得以解答