1. 程式人生 > >Linux多執行緒實踐(9) --簡單執行緒池的設計與實現

Linux多執行緒實踐(9) --簡單執行緒池的設計與實現

執行緒池的技術背景

   在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,特別是一些很耗資源的物件建立和銷燬。如何利用已有物件來服務(不止一個不同的任務)就是一個需要解決的關鍵問題,其實這就是一些"池化資源"技術產生的原因。比如大家所熟悉的資料庫連線池正是遵循這一思想而產生的,本文將介紹的執行緒池技術同樣符合這一思想。

目前,一些著名的大公司都特別看好這項技術,並早已經在他們的產品中應用該技術。比如IBM的WebSphere,IONA的Orbix 2000在SUN的 Jini中,Microsoft的MTS(Microsoft Transaction Server 2.0),COM+等。

現在您是否也想在伺服器程式應用該項技術?

執行緒池技術如何提高伺服器程式的效能

我所提到伺服器程式是指能夠接受客戶請求並能處理請求的程式,而不只是指那些接受網路客戶請求的網路伺服器程式。

多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。但如果對多執行緒應用不當,會增加對單個任務的處理時間。可以舉一個簡單的例子:

假設在一臺伺服器完成一項任務的時間為T

T1 建立執行緒的時間                                 

T2 線上程中執行任務的時間,包括執行緒間同步所需時間 

T3 執行緒銷燬的時間                                 

顯然T = T1+T2+T3。注意這是一個極度簡化的假設。

可以看出T1,T3是多執行緒本身的帶來的開銷,我們渴望減少T1,T3所用的時間,從而減少T的時間。但一些執行緒的使用者並沒有注意到這一點,所以在程式中頻繁的建立或銷燬執行緒,這導致T1和T3在T中佔有相當比例(在傳統的多執行緒伺服器模型中是這樣實現的:一旦有個請求到達,就建立一個新的執行緒,由該執行緒執行任務,任務執行完畢之後,執行緒就退出。這就是"即時建立,即時銷燬"的策略。儘管與建立程序相比,建立執行緒的時間已經大大的縮短,但是如果提交給執行緒的任務是執行時間較短,而且執行次數非常頻繁,那麼伺服器就將處於一個不停的建立執行緒和銷燬執行緒的狀態。這筆開銷是不可忽略的,尤其是執行緒執行的時間非常非常短的情況。)。顯然這是突出了執行緒的弱點(T1,T3),而不是優點(併發性)。

執行緒池技術正是關注如何縮短或調整T1,T3時間的技術,從而提高伺服器程式效能的。它把T1,T3分別安排在伺服器程式的啟動和結束的時間段或者一些空閒的時間段(在應用程式啟動之後,就馬上建立一定數量的執行緒,放入空閒的佇列中。這些執行緒都是處於阻塞狀態,這些執行緒只佔一點記憶體,不佔用CPU。當任務到來後,執行緒池將選擇一個空閒的執行緒,將任務傳入此執行緒中執行。當所有的執行緒都處在處理任務的時候,執行緒池將自動建立一定的數量的新執行緒,用於處理更多的任務。執行任務完成之後執行緒並不退出,而是繼續線上程池中等待下一次任務。當大部分執行緒處於阻塞狀態時,執行緒池將自動銷燬一部分的執行緒,回收系統資源),這樣在伺服器程式處理客戶請求時,不會有T1,T3的開銷了。

執行緒池不僅調整T1,T3產生的時間段,而且它還顯著減少了建立執行緒的數目。再看一個例子:

假設一個伺服器一天要處理50000個請求,並且每個請求需要一個單獨的執行緒完成。我們比較利用執行緒池技術和不利於執行緒池技術的伺服器處理這些請求時所產生的執行緒總數。線上程池中,執行緒數一般是固定的,所以產生執行緒總數不會超過執行緒池中執行緒的數目或者上限(以下簡稱執行緒池尺寸),而如果伺服器不利用執行緒池來處理這些請求則執行緒總數為50000。一般執行緒池尺寸是遠小於50000。所以利用執行緒池的伺服器程式不會為了建立50000而在處理請求時浪費時間,從而提高效率

簡單執行緒池的實現

下面是一個簡單執行緒池的實現, 它所使用的方案如下:

1.程式啟動之前,初始化執行緒池,此時執行緒池中沒有任何執行緒, 需要呼叫addTask方法向執行緒池中新增任務;

2.如果此時執行緒池有空閒(處於等待)的執行緒, 就不會建立新的執行緒, 這樣就省去了T1, T3的時間;

3.如果此時執行緒池中沒有處於等待的執行緒(由於此時執行緒剛剛初始化, 此時執行緒池中肯定是沒有處於等待狀態的執行緒的)並且此時執行緒池中的執行緒數並沒有達到閾值, 才建立並啟動執行緒; 

4.如果此時執行緒池中的執行緒數已經達到閾值, 那就只能等待現在還執行任務的執行緒, 等到其執行完其當前正在執行任務, 然後才從任務佇列中將新任務取出然後執行;

執行緒池主要由兩個檔案組成, 一個threadpool.h標頭檔案和一個threadpool.cpp原始檔組成。原始碼中已有重要的註釋,就不加以分析了。

//ThreadPool設計
void *thread_routine(void *args);
class ThreadPool
{
    friend void *thread_routine(void *args);
private:
    //回撥函式型別
    typedef void *(*callback_t)(void *);
    //任務結構體
    struct task_t
    {
        callback_t run; //任務回撥函式
        void *args;     //任務函式引數
    };

public:
    ThreadPool(int _maxThreads = 36, unsigned int _waitSeconds = 2);
    ~ThreadPool();
    //新增任務介面
    void addTask(callback_t run, void *args);

private:
    void startTask();

private:
    Condition ready;                //任務準備就緒或執行緒池銷燬通知
    std::queue<task_t *> taskQueue; //任務佇列

    unsigned int maxThreads;        //執行緒池最多允許的執行緒數
    unsigned int counter;           //執行緒池當前執行緒數
    unsigned int idle;              //執行緒池空閒執行緒數
    unsigned int waitSeconds;       //執行緒可以等待的秒數
    bool         quit;              //執行緒池銷燬標誌
};
//建構函式
ThreadPool::ThreadPool(int _maxThreads, unsigned int _waitSeconds)
    : maxThreads(_maxThreads), counter(0), idle(0),
      waitSeconds(_waitSeconds), quit(false) {}
// 執行緒入口函式
// 這其實就相當於一個消費者執行緒, 不斷的消費任務(執行任務)
void *thread_routine(void *args)
{
    //將子執行緒設定成為分離狀態, 這樣主執行緒就可以不用jion
    pthread_detach(pthread_self());
    printf("*thread 0x%lx is starting...\n", (unsigned long)pthread_self());
    ThreadPool *pool = (ThreadPool *)args;

    //等待任務的到來, 然後執行任務
    while (true)
    {
        bool timeout = false;

        pool->ready.lock();
        //當處於等待的時候, 則說明空閒的執行緒多了一個
        ++ pool->idle;

        //pool->ready中的條件變數有三個作用:
        // 1.等待任務佇列中有任務到來
        // 2.等待執行緒池銷燬通知
        // 3.確保當等待超時的時候, 能夠將執行緒銷燬(執行緒退出)
        while (pool->taskQueue.empty() && pool->quit == false)
        {
            printf("thread 0x%lx is waiting...\n", (unsigned long)pthread_self());
            //等待waitSeconds
            if (0 != pool->ready.timedwait(pool->waitSeconds))
            {
                //如果等待超時
                printf("thread 0x%lx is wait timeout ...\n", (unsigned long)pthread_self());
                timeout = true;
                //break出迴圈, 繼續向下執行, 會執行到下面第1個if處
                break;
            }
        }
        //條件成熟(當等待結束), 執行緒開始執行任務或者是執行緒銷燬, 則說明空閒執行緒又少了一個
        -- pool->idle;

        // 狀態3.如果等待超時(一般此時任務佇列已經空了)
        if (timeout == true && pool->taskQueue.empty())
        {
            -- pool->counter;
            //解鎖然後跳出迴圈, 直接銷燬執行緒(退出執行緒)
            pool->ready.unlock();
            break;
        }

        // 狀態2.如果是等待到了執行緒的銷燬通知, 且任務都執行完畢了
        if (pool->quit == true && pool->taskQueue.empty())
        {
            -- pool->counter;
            //如果沒有執行緒了, 則給執行緒池傳送通知
            //告訴執行緒池, 池中已經沒有執行緒了
            if (pool->counter == 0)
                pool->ready.signal();
            //解鎖然後跳出迴圈
            pool->ready.unlock();
            break;
        }

        // 狀態1.如果是有任務了, 則執行任務
        if (!(pool->taskQueue.empty()))
        {
            //從隊頭取出任務進行處理
            ThreadPool::task_t *t = pool->taskQueue.front();
            pool->taskQueue.pop();

            //執行任務需要一定的時間
            //解鎖以便於其他的生產者可以繼續生產任務, 其他的消費者也可以消費任務
            pool->ready.unlock();
            //處理任務
            t->run(t->args);
            delete t;
        }
    }

    //跳出迴圈之後, 列印退出資訊, 然後銷燬執行緒
    printf("thread 0x%lx is exiting...\n", (unsigned long)pthread_self());
    pthread_exit(NULL);
}
//addTask函式
//新增任務函式, 類似於一個生產者, 不斷的將任務生成, 掛接到任務佇列上, 等待消費者執行緒進行消費
void ThreadPool::addTask(callback_t run, void *args)
{
    /** 1. 生成任務並將任務新增到"任務佇列"隊尾 **/
    task_t *newTask = new task_t {run, args};

    ready.lock();   //注意需要使用互斥量保護共享變數
    taskQueue.push(newTask);

    /** 2. 讓執行緒開始執行任務 **/
    startTask();
    ready.unlock();//解鎖以使任務開始執行
}
//執行緒啟動函式
void ThreadPool::startTask()
{
    // 如果有等待執行緒, 則喚醒其中一個, 讓它來執行任務
    if (idle > 0)
        ready.signal();
    // 沒有等待執行緒, 而且當前先執行緒總數尚未達到閾值, 我們就需要建立一個新的執行緒
    else if (counter < maxThreads)
    {
        pthread_t tid;
        pthread_create(&tid, NULL, thread_routine, this);
        ++ counter;
    }
}
//解構函式
ThreadPool::~ThreadPool()
{
    //如果已經呼叫過了, 則直接返回
    if (quit == true)
        return;

    ready.lock();
    quit = true;
    if (counter > 0)
    {
        //對於處於等待狀態, 則給他們傳送通知,
        //這些處於等待狀態的執行緒, 則會接收到通知,
        //然後直接退出
        if (idle > 0)
            ready.broadcast();

        //對於正處於執行任務的執行緒, 他們接收不到這些通知,
        //則需要等待他們執行完任務
        while (counter > 0)
            ready.wait();
    }
    ready.unlock();
}

關於高階執行緒池的探討

簡單執行緒池存在一些問題,比如如果有大量的客戶要求伺服器為其服務,但由於執行緒池的工作執行緒是有限的,伺服器只能為部分客戶服務,其它客戶提交的任務,只能在任務佇列中等待處理。一些系統設計人員可能會不滿這種狀況,因為他們對伺服器程式的響應時間要求比較嚴格,所以在系統設計時可能會懷疑執行緒池技術的可行性,但是執行緒池有相應的解決方案。調整優化執行緒池尺寸是高階執行緒池要解決的一個問題。主要有下列解決方案:

方案一:動態增加工作執行緒

在一些高階執行緒池中一般提供一個可以動態改變的工作執行緒數目的功能,以適應突發性的請求。一旦請求變少了將逐步減少執行緒池中工作執行緒的數目。當然執行緒增加可以採用一種超前方式,即批量增加一批工作執行緒,而不是來一個請求才建立建立一個執行緒。批量建立是更加有效的方式。該方案還有應該限制執行緒池中工作執行緒數目的上限和下限。否則這種靈活的方式也就變成一種錯誤的方式或者災難,因為頻繁的建立執行緒或者短時間內產生大量的執行緒將會背離使用執行緒池原始初衷--減少建立執行緒的次數。

舉例:Jini中的TaskManager,就是一個精巧執行緒池管理器,它是動態增加工作執行緒的。SQL Server採用單程序(Single Process)多執行緒(Multi-Thread)的系統結構,1024個數量的執行緒池,動態執行緒分配,理論上限32767。

方案二:優化工作執行緒數目

如果不想線上程池應用複雜的策略來保證工作執行緒數滿足應用的要求,你就要根據統計學的原理來統計客戶的請求數目,比如高峰時段平均一秒鐘內有多少任務要求處理,並根據系統的承受能力及客戶的忍受能力來平衡估計一個合理的執行緒池尺寸。執行緒池的尺寸確實很難確定,所以有時乾脆用經驗值。

舉例:在MTS中執行緒池的尺寸固定為100。

方案三:一個伺服器提供多個執行緒池

   在一些複雜的系統結構會採用這個方案。這樣可以根據不同任務或者任務優先順序來採用不同執行緒池處理。

舉例:COM+用到了多個執行緒池。

這三種方案各有優缺點。在不同應用中可能採用不同的方案或者乾脆組合這三種方案來解決實際問題。

執行緒池技術適用範圍及應注意的問題

下面是我總結的一些執行緒池應用範圍,可能是不全面的。

執行緒池的應用範圍:

(1)需要大量的執行緒來完成任務,且完成任務的時間比較短。 WEB伺服器完成網頁請求這樣的任務,使用執行緒池技術是非常合適的。因為單個任務小,而任務數量巨大,你可以想象一個熱門網站的點選次數。 但對於長時間的任務,比如一個Telnet連線請求,執行緒池的優點就不明顯了。因為Telnet會話時間比執行緒的建立時間大多了。

(2)對效能要求苛刻的應用,比如要求伺服器迅速相應客戶請求。

   (3)接受突發性的大量請求,但不至於使伺服器因此產生大量執行緒的應用。突發性大量客戶請求,在沒有執行緒池情況下,將產生大量執行緒,雖然理論上大部分作業系統執行緒數目最大值不是問題,短時間內產生大量執行緒可能使記憶體到達極限,並出現"OutOfMemory"的錯誤。

結束語

本文只是簡單介紹執行緒池技術。可以看出執行緒池技術對於伺服器程式的效能改善是顯著的。執行緒池技術在伺服器領域有著廣泛的應用前景。希望這項技術能夠應用到您的多執行緒服務程式中。

注:這是網上一篇部落格的改造: 將Java版本的執行緒池改造成了基於Linux 的C++版本, 原文連結為:http://www.ibm.com/developerworks/cn/java/l-threadPool/, 如果讀者的興趣所在為Java, 請移步於此, 向您鄭重推薦, 這是一篇非常好的文章, 謝謝!