1. 程式人生 > >線程池的理解與簡單實現

線程池的理解與簡單實現

進行 常見 內存分配 RoCE 這一 釋放資源 管理系 延長 ESS

由於服務器的硬件資源“充裕”,那麽提高服務器性能的一個很直接的方法就是以空間換時間,即“浪費”服務器的硬件資源,以換取其運行效率。這就是池的概念。

池是一組資源的集合,這組資源在服務器啟動之初就被創建並初始化,這稱為靜態資源分配。

當服務器進入正式運行階段,即開始處理客戶請求的時候,如果它需要相關的資源,就可以直接從池中獲取,無需動態分配。很顯然,直接從池中取得所需資源比動態分配資源的速度要快得多,因為分配系統資源的系統調用都是很耗時的。

當服務器處理完一個客戶連接後,可以把相關的資源放回池中,無需執行系統調用來釋放資源。從最終效果來看,池相當於服務器管理系統資源的應用設施,它避免了服務器對內核的頻繁訪問。提高了效率。

池可以分為很多種,常見的有進程池,線城池,內存池。


內存池

內存池是一種內存分配方式。通常我們直接使用new、malloc等系統調用申請分配內存,這樣做的缺點在於:由於所申請內存塊的大小不定,當頻繁使用時會造成大量的內存碎片並進而降低性能。

內存池則是在真正使用內存之前,先申請分配一定數量的、大小相等的內存塊留作備用。當有新的內存需求時,就從內存池中分出一部分內存塊,若內存塊不夠再繼續申請新的內存。這樣做的一個顯著優點是,使得內存分配效率得到提升。


進程池&&線程池

在面向對象程序編程中,對象的創建與析構都是一個較為復雜的過程,較費時間,所以為了提高程序的運行效率盡可能減少創建和銷毀對象的次數,特別是一些很耗資源的對象創建和銷毀。
所以我們可以創建一個進程池(線程池),預先放一些進程(線程)進去,要用的時候就直接調用,用完之後再把進程歸還給進程池,省下創建刪除進程的時間,不過當然就需要額外的開銷了。
利用線程池與進程池可以使管理進程與線程的工作交給系統管理,不需要程序員對裏面的線程、進程進行管理。

以進程池為例

進程池是由服務器預先創建的一組子進程,這些子進程的數目在 3~10 個之間(當然這只是典型情況)。線程池中的線程數量應該和CPU數量差不多。

進程池中的所有子進程都運行著相同的代碼,並具有相同的屬性,比如優先級、 PGID 等。

當有新的任務來到時,主進程將通過某種方式選擇進程池中的某一個子進程來為之服務。相比於動態創建子進程,選擇一個已經存在的子進程的代價顯得小得多。至於主進程選擇哪個子進程來為新任務服務,則有兩種方法:

  • 主進程使用某種算法來主動選擇子進程。最簡單、最常用的算法是隨機算法和Round Robin(輪流算法)。
  • 主進程和所有子進程通過一個共享的工作隊列來同步,子進程都睡眠在該工作隊列上。當有新的任務到來時,主進程將任務添加到工作隊列中。這將喚醒正在等待任務的子進程,不過只有一個子進程將獲得新任務的“接管權”,它可以從工作隊列中取出任務並執行之,而其他子進程將繼續睡眠在工作隊列上。

當選擇好子進程後,主進程還需要使用某種通知機制來告訴目標子進程有新任務需要處理,並傳遞必要的數據。最簡單的方式是,在父進程和子進程之間預先建立好一條管道,然後通過管道來實現所有的進程間通信。在父線程和子線程之間傳遞數據就要簡單得多,因為我們可以把這些數據定義為全局,那麽它們本身就是被所有線程共享的。


線程池的應用

線程池主要用於
1、需要大量的線程來完成任務,且完成任務的時間比較短。
WEB服務器完成網頁請求這樣的任務,使用線程池技術是非常合適的。
因為單個任務小,而任務數量巨大,一個熱門網站的點擊次數會很多。
但對於長時間的任務,比如一個Telnet連接請求,線程池的優點就不明顯了。因為Telnet會話時間比線程的創建時間大多了。

2、對性能要求苛刻的應用,比如要求服務器迅速響應客戶請求。

3、接受突發性的大量請求,但不至於使服務器因此產生大量線程的應用。


線程池&&進程池的好處

進程池進程池減少了創建,歸還的時間。提高了效率。


用C++模擬線程池

Linux系統下用C語言創建的一個線程池。線程池會維護一個任務鏈表(每個CThread_worker結構就是一個任務)。
pool_init()函數預先創建好max_thread_num個線程,每個線程執thread_routine ()函數。該函數中

1 while (pool->cur_queue_size == 0)
2 {
3       pthread_cond_wait (&(pool->queue_ready),&(pool->queue_lock));
4 }

表示如果任務鏈表中沒有任務,則該線程出於阻塞等待狀態。否則從隊列中取出任務並執行。
pool_add_worker()函數向線程池的任務鏈表中加入一個任務,加入後通過調用pthread_cond_signal (&(pool->queue_ready))喚醒一個出於阻塞狀態的線程(如果有的話)。
pool_destroy ()函數用於銷毀線程池,線程池任務鏈表中的任務不會再被執行,但是正在運行的線程會一直把任務運行完後再退出。

具體代碼:

#include <stdio.h>  
#include <stdlib.h>  
#include <unistd.h>  
#include <sys/types.h>  
#include <pthread.h>  
#include <assert.h>  

/* 
*線程池裏所有運行和等待的任務都是一個CThread_worker 
*由於所有任務都在鏈表裏,所以是一個鏈表結構 
*/  
typedef struct worker  
{  
    /*回調函數,任務運行時會調用此函數,註意也可聲明成其它形式*/  
    void *(*process) (void *arg);  
    void *arg;/*回調函數的參數*/  
    struct worker *next;    
} CThread_worker;  

/*線程池結構*/  
typedef struct  
{  
    pthread_mutex_t queue_lock;  
    pthread_cond_t queue_ready;  

    /*鏈表結構,線程池中所有等待任務*/  
    CThread_worker *queue_head;  

    /*是否銷毀線程池*/  
    int shutdown;  
    pthread_t *threadid;  
    /*線程池中允許的活動線程數目*/  
    int max_thread_num;  
    /*當前等待隊列的任務數目*/  
    int cur_queue_size;  

} CThread_pool;  


int pool_add_worker (void *(*process) (void *arg), void *arg);  
void *thread_routine (void *arg);  


static CThread_pool *pool = NULL;  
void pool_init (int max_thread_num)  
{  
    pool = (CThread_pool *) malloc (sizeof (CThread_pool));  

    pthread_mutex_init (&(pool->queue_lock), NULL);  
    pthread_cond_init (&(pool->queue_ready), NULL);  

    pool->queue_head = NULL;  

    pool->max_thread_num = max_thread_num;  
    pool->cur_queue_size = 0;  

    pool->shutdown = 0;  

    pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));  
    int i = 0;  
    for (i = 0; i < max_thread_num; i++)  
    {   
        pthread_create (&(pool->threadid[i]), NULL, thread_routine,NULL);  
    }  
}  



/*向線程池中加入任務*/  
int pool_add_worker (void *(*process) (void *arg), void *arg)  
{  
    /*構造一個新任務*/  
    CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));  
    newworker->process = process;  
    newworker->arg = arg;  
    newworker->next = NULL;/*別忘置空*/  

    pthread_mutex_lock (&(pool->queue_lock));  
    /*將任務加入到等待隊列中*/  
    CThread_worker *member = pool->queue_head;  
    if (member != NULL)  
    {  
        while (member->next != NULL)  
            member = member->next;  
        member->next = newworker;  
    }  
    else  
    {  
        pool->queue_head = newworker;  
    }  

    assert (pool->queue_head != NULL);  

    pool->cur_queue_size++;  
    pthread_mutex_unlock (&(pool->queue_lock));  
    /*好了,等待隊列中有任務了,喚醒一個等待線程; 
    註意如果所有線程都在忙碌,這句沒有任何作用*/  
    pthread_cond_signal (&(pool->queue_ready));  
    return 0;  
}  



/*銷毀線程池,等待隊列中的任務不會再被執行,但是正在運行的線程會一直 
把任務運行完後再退出*/  
int pool_destroy ()  
{  
    if (pool->shutdown)  
        return -1;/*防止兩次調用*/  
    pool->shutdown = 1;  

    /*喚醒所有等待線程,線程池要銷毀了*/  
    pthread_cond_broadcast (&(pool->queue_ready));  

    /*阻塞等待線程退出,否則就成僵屍了*/  
    int i;  
    for (i = 0; i < pool->max_thread_num; i++)  
        pthread_join (pool->threadid[i], NULL);  
    free (pool->threadid);  

    /*銷毀等待隊列*/  
    CThread_worker *head = NULL;  
    while (pool->queue_head != NULL)  
    {  
        head = pool->queue_head;  
        pool->queue_head = pool->queue_head->next;  
        free (head);  
    }  
    /*條件變量和互斥量也別忘了銷毀*/  
    pthread_mutex_destroy(&(pool->queue_lock));  
    pthread_cond_destroy(&(pool->queue_ready));  

    free (pool);  
    /*銷毀後指針置空是個好習慣*/  
    pool=NULL;  
    return 0;  
}  



void * thread_routine (void *arg)  
{  
    printf ("starting thread 0x%x\n", pthread_self ());  
    while (1)  
    {  
        pthread_mutex_lock (&(pool->queue_lock));  
        /*如果等待隊列為0並且不銷毀線程池,則處於阻塞狀態; 註意 
        pthread_cond_wait是一個原子操作,等待前會解鎖,喚醒後會加鎖*/  
        while (pool->cur_queue_size == 0 && !pool->shutdown)  
        {  
            printf ("thread 0x%x is waiting\n", pthread_self ());  
            pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));  
        }  

        /*線程池要銷毀了*/  
        if (pool->shutdown)  
        {  
            /*遇到break,continue,return等跳轉語句,千萬不要忘記先解鎖*/  
            pthread_mutex_unlock (&(pool->queue_lock));  
            printf ("thread 0x%x will exit\n", pthread_self ());  
            pthread_exit (NULL);  
        }  

        printf ("thread 0x%x is starting to work\n", pthread_self ());  

        /*assert是調試的好幫手*/  
        assert (pool->cur_queue_size != 0);  
        assert (pool->queue_head != NULL);  

        /*等待隊列長度減去1,並取出鏈表中的頭元素*/  
        pool->cur_queue_size--;  
        CThread_worker *worker = pool->queue_head;  
        pool->queue_head = worker->next;  
        pthread_mutex_unlock (&(pool->queue_lock));  

        /*調用回調函數,執行任務*/  
        (*(worker->process)) (worker->arg);  
        free (worker);  
        worker = NULL;  
    }  
    /*這一句應該是不可達的*/  
    pthread_exit (NULL);  
}  

//    下面是測試代碼  

void * myprocess (void *arg)  
{  
    printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);  
    sleep (1);/*休息一秒,延長任務的執行時間*/  
    return NULL;  
}  

int main (int argc, char **argv)  
{  
    pool_init (3);/*線程池中最多三個活動線程*/  

    /*連續向池中投入10個任務*/  
    int *workingnum = (int *) malloc (sizeof (int) * 10);  
    int i;  
    for (i = 0; i < 10; i++)  
    {  
        workingnum[i] = i;  
        pool_add_worker (myprocess, &workingnum[i]);  
    }  
    /*等待所有任務完成*/  
    sleep (5);  
    /*銷毀線程池*/  
    pool_destroy ();  

    free (workingnum);  
    return 0;  
}  

  

線程池的理解與簡單實現