1. 程式人生 > >執行緒池介紹與示例

執行緒池介紹與示例

對於執行緒任務比較輕量,執行緒請求又比較多的情況,頻繁的建立和銷燬執行緒是非常消耗資源且低效的。這時候,就輪到執行緒池技術大顯身手了。

執行緒池技術可以提高資源的利用率,即使面對突發性的大量請求,也不會產生大量執行緒,造成伺服器崩潰。

一般一個簡單執行緒池至少包含下列組成部分:

  1. 執行緒池管理器(ThreadPoolManager):用於建立並管理執行緒池。
  2. 工作執行緒(WorkThread): 執行緒池中執行緒。
  3. 任務介面(Task):每個任務必須實現的介面,以供工作執行緒排程任務的執行。
  4. 任務佇列:用於存放沒有處理的任務。提供一種緩衝機制。

執行緒相關介面:

複製程式碼
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

typedef struct
ThreadWorker { void* (*func)(void*); void* data; struct ThreadWorker* next; }ThreadWorker; typedef struct ThreadPool { unsigned int maxThreadCount; // 最大執行緒數限制 ThreadWorker* workerList; // 執行緒任務列表 pthread_t* threadIdList; // 執行緒ID列表 pthread_mutex_t threadLock; //
互斥鎖 pthread_cond_t threadCond; // 條件鎖 bool shutDown; // 是否銷燬執行緒池 }ThreadPool; static ThreadPool* lpThreadPool = NULL; void* ThreadPoolRoutine(void* data); void ThreadPoolInit(unsigned int threadCount) { lpThreadPool = (ThreadPool*)malloc(sizeof(ThreadPool)); lpThreadPool->maxThreadCount = threadCount; lpThreadPool->workerList = NULL; lpThreadPool->threadIdList = (pthread_t*)malloc(sizeof
(pthread_t) * threadCount); for (unsigned int i = 0; i < threadCount; ++i) { pthread_create(&(lpThreadPool->threadIdList[i]), NULL, ThreadPoolRoutine, NULL); } pthread_mutex_init(&(lpThreadPool->threadLock), NULL); pthread_cond_init(&(lpThreadPool->threadCond), NULL); lpThreadPool->shutDown = false; } // 銷燬執行緒池,等待佇列中的任務不會再被執行,但是正在執行的執行緒會一直把任務執行完後再退出 void ThreadPoolDestory() { if (lpThreadPool->shutDown) { return; } lpThreadPool->shutDown = true; // 喚醒所有等待執行緒,執行緒池要銷燬了 pthread_cond_broadcast(&(lpThreadPool->threadCond)); for (int i = 0; i < lpThreadPool->maxThreadCount; ++i) { pthread_join(lpThreadPool->threadIdList[i], NULL); } free(lpThreadPool->threadIdList); ThreadWorker* temp = NULL; while (lpThreadPool->workerList) { temp = lpThreadPool->workerList; lpThreadPool->workerList = lpThreadPool->workerList->next; free(temp); } pthread_mutex_destroy(&(lpThreadPool->threadLock)); pthread_cond_destroy(&(lpThreadPool->threadCond)); free(lpThreadPool); lpThreadPool = NULL; } void ThreadPoolAddWork(void* (*func)(void*), void* data) { ThreadWorker* worker = (ThreadWorker*)malloc(sizeof(ThreadWorker)); worker->func = func; worker->data = data; worker->next = NULL; pthread_mutex_lock(&(lpThreadPool->threadLock)); ThreadWorker* lpWorkerList = lpThreadPool->workerList; if (lpWorkerList) { while (lpWorkerList->next) { lpWorkerList = lpWorkerList->next; } lpWorkerList->next = worker; } else { lpThreadPool->workerList = worker; } pthread_mutex_unlock(&(lpThreadPool->threadLock)); // 喚醒執行緒,注意如果所有執行緒都在忙碌,這句沒有任何作用 pthread_cond_signal(&(lpThreadPool->threadCond)); printf ("signal thread %p\n", pthread_self ()); } void* ThreadPoolRoutine(void* data) { printf ("starting thread %p\n", pthread_self ()); while(true) { pthread_mutex_lock(&(lpThreadPool->threadLock)); // 如果等待佇列為0並且不銷燬執行緒池,則處於阻塞狀態; 注意pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖 while(lpThreadPool->workerList == NULL && !lpThreadPool->shutDown) { printf ("thread %p is waiting\n", pthread_self ()); pthread_cond_wait(&(lpThreadPool->threadCond), &(lpThreadPool->threadLock)); } if (lpThreadPool->shutDown) { pthread_mutex_unlock (&(lpThreadPool->threadLock)); printf ("thread %p will exit\n", pthread_self ()); pthread_exit (NULL); } else { ThreadWorker* temp = lpThreadPool->workerList; lpThreadPool->workerList = temp->next; pthread_mutex_unlock(&(lpThreadPool->threadLock)); (*(temp->func))(temp->data); free(temp); temp = NULL; } } pthread_exit (NULL); } void* myTestFunc(void* data) { printf ("threadID is %p, working on task %d\n", pthread_self (),*(int *)data); sleep (1); return NULL; } int main() { ThreadPoolInit(5); const int count = 20; int* taskIndex = (int*)malloc(sizeof(int) * count); for (int i = 0; i < count; ++i) { taskIndex[i] = i; ThreadPoolAddWork(myTestFunc, &taskIndex[i]); } sleep(3); while(true) { if (!lpThreadPool->workerList) { ThreadPoolDestory(); break; } } free(taskIndex); return 0; }
複製程式碼

任務介面是為所有任務提供統一的介面,以便工作執行緒處理。如上例中的myTestFunc.

pthread_cond_wait:

pthread_cond_wait所做的第一件事就是同時對互斥物件解鎖(於是其它執行緒可以修改已連結列表),並等待條件 mycond 發生(這樣當 pthread_cond_wait接收到另一個執行緒的“訊號”時,它將甦醒)。現在互斥物件已被解鎖,其它執行緒可以訪問和修改已連結列表,可能還會新增項。 【要求解鎖並阻塞是一個原子操作

pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) 函式返回時,互斥鎖 mutex 將處於鎖定狀態。因此之後如果需要對臨界區資料進行重新訪問,則沒有必要對 mutex 就行重新加鎖。但是,隨之而來的問題是,每次條件等待以後需要加入一步手動的解鎖操作。

簡單的說,pthread_cond_wait執行時會自動解鎖互斥物件,返回時會自動加鎖互斥物件。

pthread_cond_timedwait:

int pthread_cond_timedwait(pthread_cond_t *restrict cond, 
              pthread_mutex_t *restrict mutex, 
              const struct timespec *restrict abstime);

引數 abstime 在這裡用來表示和超時時間相關的一個引數,但是需要注意的是它所表示的是一個絕對時間,而不是一個時間間隔數值,只有當系統的當前時間達到或者超過 abstime 所表示的時間時,才會觸發超時事件。

假設我們指定相對的超時時間引數如 dwMilliseconds (單位毫秒)來呼叫和超時相關的函式,這樣就需要將 dwMilliseconds 轉化為 Linux 下的絕對時間引數 abstime 使用。常用的轉換方法如下所示:

複製程式碼
/* get the current time */ 
    struct timeval now; 
    gettimeofday(&now, NULL); 
    
    /* add the offset to get timeout value */ 
    abstime ->tv_nsec = now.tv_usec * 1000 + (dwMilliseconds % 1000) * 1000000; 
    abstime ->tv_sec = now.tv_sec + dwMilliseconds / 1000;
複製程式碼

正確銷燬執行緒並釋放資源:

Linux man page :“When a joinable thread terminates, its memory resources (thread descriptor and stack) are not deallocated until another thread performs pthread_join on it. Therefore, pthread_join must be called once for each joinable thread created to avoid memory leaks.”

Linux 平臺預設情況下,雖然各個執行緒之間是相互獨立的,一個執行緒的終止不會去通知或影響其他的執行緒。但是已經終止的執行緒的資源並不會隨著執行緒的終止而得到釋放,我們需要呼叫 pthread_join() 來獲得另一個執行緒的終止狀態並且釋放該執行緒所佔的資源。

int pthread_join(pthread_t th, void **thread_return);

呼叫該函式的執行緒將掛起,等待 th 所表示的執行緒的結束。 thread_return 是指向執行緒 th 返回值的指標。需要注意的是 th 所表示的執行緒必須是 joinable 的,即處於非 detached(遊離)狀態;並且只可以有唯一的一個執行緒對 th 呼叫 pthread_join() 。如果 th 處於 detached 狀態,那麼對 th 的 pthread_join() 呼叫將返回錯誤。

如果你壓根兒不關心一個執行緒的結束狀態,那麼也可以將一個執行緒設定為 detached 狀態,從而來讓作業系統在該執行緒結束時來回收它所佔的資源。將一個執行緒設定為 detached 狀態可以通過兩種方式來實現。一種是呼叫 pthread_detach() 函式,可以將執行緒 th 設定為 detached 狀態。

另一種方法是在建立執行緒時就將它設定為 detached 狀態,首先初始化一個執行緒屬性變數,然後將其設定為 detached 狀態,最後將它作為引數傳入執行緒建立函式 pthread_create(),這樣所創建出來的執行緒就直接處於 detached 狀態。

總之為了在使用 pthread 時避免執行緒的資源線上程結束時不能得到正確釋放,從而避免產生潛在的記憶體洩漏問題,在對待執行緒結束時,要確保該執行緒處於 detached 狀態,否著就需要呼叫 pthread_join() 函式來對其進行資源回收。

執行緒池的改進思考:

1. 適當動態增減執行緒池中的執行緒數量。

2. 是否可以為執行緒增加優先順序的概念。