1. 程式人生 > >c語言執行緒池的簡單實現

c語言執行緒池的簡單實現

一、背景

在某種CPU密集型的應用場景中,處理計算任務耗時較多(如影象處理),考慮多核CPU的優勢,若能把計算分擔到多個執行緒中處理則能有效利用CPU;

但是,若過開啟過多的執行緒,執行緒建立銷燬、執行緒間切換所帶來的開銷也不容小覷;

二、相關知識

2.1 思路整理

對於這種場景,設計執行緒池對任務進行處理,即所有待處理的任務集中在佇列裡頭,N個執行緒輪流去取佇列進行計算;

2.2 佇列的實現

佇列使用之前的一篇文章實現的《鏈式佇列》,佇列資料為任務的回撥函式、任務的工作引數;

使用的介面如下:

佇列申請:queue_alloc
入列操作:queue_push
出列操作:queue_pop
佇列銷燬:queue_free

2.2 執行緒相關介面 互斥鎖初始化:pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutexattr_t *attr);
上鎖:pthread_mutex_lock(pthread_mutex_t *mutex);
解鎖:pthread_mutex_unlock(pthread_mutex_t *mutex);
條件變數初始化:pthread_cond_init(pthread_cond_t *cond,const pthread_cond_t *attr);
執行緒掛起等待:pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
喚醒單個:pthread_cond_signal(pthread_cond_t *cond);
全部喚醒:pthread_cond_broadcast(pthread_cond_t *cond);

三、實現

結構體的定義,struct tpool 為執行緒池的管理結構,struct routine 為待執行的任務單元:

#define MAX_THREAD_NUM 16
#define MAX_TASKITEM 1024

typedef struct tpool
{
    u8 enable;
    
    queue_t *queue;
    
    pthread_attr_t attr;
    pthread_mutex_t mutex;
    pthread_cond_t cond;
    pthread_t tids[MAX_THREAD_NUM];
    
    u16 num;
} tpool_t;

struct routine {
    void *args;
    void (*callback)(void *);
};
執行緒池的初始化,預先啟動MAX_THREAD_NUM個子執行緒,每個子執行緒就緒等待:
tpool_t *tpool_alloc(u16 num)
{
    int ret = FAILURE;
    int ix = 0;
    
    tpool_t *phead = NULL;
    
    if ( num == 0 || num > MAX_THREAD_NUM ) {
        goto _E1;
    }
    
    phead = calloc(1, sizeof(tpool_t));
    if ( !phead ) {
        goto _E1;
    }
    
    phead->enable = 1; 
    phead->num = num;
    phead->queue = queue_alloc(MAX_TASKITEM);
    if ( !phead->queue ) {
        goto _E2;
    }
    
    ret  = pthread_attr_init(&phead->attr);
    ret |= pthread_mutex_init(&phead->mutex, NULL);
    ret |= pthread_cond_init(&phead->cond, NULL);
    if ( SUCCESS != ret ) {
        goto _E3;
    }
    
    ret = pthread_attr_setdetachstate(&phead->attr, PTHREAD_CREATE_DETACHED);
    if ( SUCCESS != ret ) {
        goto _E4;
    }
    
    for ( ix = 0; ix < num; ix++ ) {
        ret = pthread_create(&phead->tids[ix], NULL, __worker, phead);
        if ( SUCCESS != ret ) {
            goto _E4;
        }
    }
    
    ret = SUCCESS;
    goto _E1;
    
_E4:
    pthread_mutex_destroy(&phead->mutex);
    pthread_cond_destroy(&phead->cond);
    pthread_attr_destroy(&phead->attr);
_E3:
    queue_free(&phead->queue, free);
_E2:
    FREE_POINTER(phead);
_E1:
    return phead;
}

子執行緒的實現如下,一個是在佇列為空的時候掛起休眠,被喚醒後取佇列中的工作單元進行呼叫,直到佇列空再次進入休眠;

由於佇列為共享資源,所以多執行緒操作下需要使用鎖進行保護;

static int __worker_routine(tpool_t *phead)
{
    struct routine *prt = NULL;
    
    pthread_mutex_lock(&phead->mutex);
    if ( queue_isempty(phead->queue) ) {
        printf("Thread #%u go sleep!\n", (u32)pthread_self());
        pthread_cond_wait(&phead->cond, &phead->mutex);
        printf("Thread #%u wakeup!\n", (u32)pthread_self());
    }
        
    prt = (struct routine *)queue_pop(phead->queue);
    pthread_mutex_unlock(&phead->mutex);
    
    if ( prt ) {
        prt->callback(prt->args);
        return SUCCESS;
    }
    return FAILURE;
}

static void *__worker(void *args)
{
    tpool_t *phead = (tpool_t *)args;
    
    if ( !args ) {
        return NULL;
    }
    
    while ( phead->enable ) {
        if ( SUCCESS != __worker_routine(phead) ) {
            printf("__worker_routine return, thread quit!\n");
            break;
        }
    }
    
    return NULL;
}
上述的工作子執行緒為消費者,還需要主執行緒去充當生產者,給工作執行緒投放工作任務;
int tpool_routine_add(tpool_t *phead, void (*callback)(void *), void *args)
{
    struct routine *prt = NULL;
    
    if ( !phead || !callback || !args ) {
        return FAILURE;
    }
    
    prt = (struct routine *)calloc(1, sizeof(struct routine));
    if ( !prt ) {
        return FAILURE;
    }
    prt->callback = callback;
    prt->args = args;
    
    
    pthread_mutex_lock(&phead->mutex);
    if ( SUCCESS != queue_push(phead->queue, NULL, prt) ) {
        FREE_POINTER(prt);
        pthread_mutex_unlock(&phead->mutex);
        return FAILURE;
    }
    pthread_cond_signal(&phead->cond);
    pthread_mutex_unlock(&phead->mutex);
    return SUCCESS;
}

執行緒池的銷燬,這個就是將執行緒使能位清空,然後等待所有子執行緒退出,然後銷燬相關成員;

注意該函式有可能阻塞;

int tpool_destory(tpool_t *phead)
{
    int ix = 0;
    
    if ( !phead ) {
        return FAILURE;
    }
    
    phead->enable = 0;
    
    pthread_mutex_lock(&phead->mutex);
    pthread_cond_broadcast(&phead->cond);
    pthread_mutex_unlock(&phead->mutex);
    
    for ( ix = 0; ix < phead->num; ix++ ) {
        pthread_join(phead->tids[ix], NULL);
    }
    
    pthread_mutex_destroy(&phead->mutex);
    pthread_cond_destroy(&phead->cond);
    pthread_attr_destroy(&phead->attr);
    
    return SUCCESS;
}

測試函式:

工作單元如下,使用休眠10秒模擬耗時操作:

struct item 
{
    int value;
};

void test_worker(void *args)
{
    struct item *pitem = (struct item *)args;
    
    if ( !args ) {
        printf("NULL\n");
        return;
    }
    
    printf("begin, %d\n", pitem->value);
    sleep(10);
    printf("end, %d\n", pitem->value);
    
    free(pitem);
}
int main()
{
    int ret = FAILURE;
    
    struct item *pitem = NULL;
    
    tpool_t *phead = NULL;
    
    ASSERT_FAIL(NULL, phead = tpool_alloc(10));
    
    sleep(2);
    printf("1\n");
    ASSERT_FAIL(NULL, pitem = (struct item *)calloc(1, sizeof(struct item)));
    pitem->value = 1;
    ASSERT(SUCCESS, ret = tpool_routine_add(phead, test_worker, pitem));
    
    printf("2\n");
    ASSERT_FAIL(NULL, pitem = (struct item *)calloc(1, sizeof(struct item)));
    pitem->value = 2;
    ASSERT(SUCCESS, ret = tpool_routine_add(phead, test_worker, pitem));
    
    printf("3\n");
    ASSERT_FAIL(NULL, pitem = (struct item *)calloc(1, sizeof(struct item)));
    pitem->value = 3;
    ASSERT(SUCCESS, ret = tpool_routine_add(phead, test_worker, pitem));
    sleep(2);
    
    printf("Close\n");
    ASSERT(SUCCESS, ret = tpool_destory(phead));
_E1:
    printf("Result: %s\n", ret ? "FAILURE" : "SUCCESS");
    return ret ? EXIT_FAILURE : EXIT_SUCCESS;
}

四、總結

測試結果如下:

結果表示,當工作子執行緒進行任務處理時(sleep10模擬),推送新的工作任務並不會打斷當前任務,而是由其他空閒執行緒被喚醒進行處理;

同時在發出退出訊號時,由於程式中實現的為等待當前任務結束後在退出,所以也有一定的延遲性;

參考文章:

[1] C語言實現簡單執行緒池,http://www.cnblogs.com/newth/archive/2012/05/09/2492459.html

[2] 互斥鎖和條件變數,http://www.cnblogs.com/zendu/p/4981480.html