1. 程式人生 > >【原始碼剖析】threadpool —— 基於 pthread 實現的簡單執行緒池

【原始碼剖析】threadpool —— 基於 pthread 實現的簡單執行緒池

部落格新地址:https://github.com/AngryHacker/articles/issues/1#issue-369867252

執行緒池介紹

執行緒池可以說是專案中經常會用到的元件,在這裡假設讀者都有一定的多執行緒基礎,如果沒有的話不妨在這裡進行了解:POSIX 多執行緒基礎

執行緒池是什麼?我的簡單理解是有一組預先派生的執行緒,然後有一個管理員來管理和排程這些執行緒,你只需不斷把需要完成的任務交給他,他就會排程執行緒的資源來幫你完成。

那麼管理員是怎麼做的呢?一種簡單的方式就是,管理員管理一個任務的佇列,如果收到新的任務,就把任務加到佇列尾。每個執行緒盯著佇列,如果佇列非空,就去佇列頭拿一個任務來處理(每個任務只能被一個執行緒拿到),處理完了就繼續去佇列取任務。如果沒有任務了,執行緒就休眠,直到任務佇列不為空。如果這個管理員更聰明一點,他可能會在沒有任務或任務少的時候減少執行緒的數量,任務處理不過來的時候增加執行緒的數量,這樣就實現了資源的動態管理。

那麼任務是什麼呢?以後臺伺服器為例,每一個使用者的請求就是一個任務,執行緒不斷的在請求佇列裡取出請求,完成後繼續處理下一個請求。

簡單圖示為:
threadpool

執行緒池有一個好處就是減少執行緒建立和銷燬的時間,在任務處理時間比較短的時候這個好處非常顯著,可以提升任務處理的效率。

執行緒池實現

這裡介紹的是執行緒池的一個簡單實現,在建立的時候預先派生指定數量的執行緒,然後去任務佇列取新增進來的任務進行處理就好。

作者說之後會新增更多特性,我們作為學習之後就以這個版本為準就好了。

專案主頁:threadpool

資料結構

主要有兩個自定義的資料結構

threadpool_task_t

用於儲存一個等待執行的任務。一個任務需要指明:要執行的對應函式及函式的引數。所以這裡的 struct 裡有函式指標和 void 指標。

typedef struct {
    void (*function)(void *);
    void *argument;
} threadpool_task_t;
thread_pool_t

一個執行緒池的結構。因為是 C 語言,所以這裡任務佇列是用陣列,並維護佇列頭和佇列尾來實現。

struct threadpool_t {
  pthread_mutex_t lock;     /* 互斥鎖 */
  pthread_cond_t notify;
/* 條件變數 */ pthread_t *threads; /* 執行緒陣列的起始指標 */ threadpool_task_t *queue; /* 任務佇列陣列的起始指標 */ int thread_count; /* 執行緒數量 */ int queue_size; /* 任務佇列長度 */ int head; /* 當前任務佇列頭 */ int tail; /* 當前任務佇列尾 */ int count; /* 當前待執行的任務數 */ int shutdown; /* 執行緒池當前狀態是否關閉 */ int started; /* 正在執行的執行緒數 */ };

函式

對外介面
  • threadpool_t *threadpool_create(int thread_count, int queue_size, int flags); 建立執行緒池,用 thread_count 指定派生執行緒數,queue_size 指定任務佇列長度,flags 為保留引數,未使用。
  • int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags); 新增需要執行的任務。第二個引數為對應函式指標,第三個為對應函式引數。flags 未使用。
  • int threadpool_destroy(threadpool_t *pool, int flags); 銷燬存在的執行緒池。flags 可以指定是立刻結束還是平和結束。立刻結束指不管任務佇列是否為空,立刻結束。平和結束指等待任務佇列的任務全部執行完後再結束,在這個過程中不可以新增新的任務。
內部輔助函式
  • static void *threadpool_thread(void *threadpool); 執行緒池每個執行緒所執行的函式。
  • int threadpool_free(threadpool_t *pool); 釋放執行緒池所申請的記憶體資源。

執行緒池使用

編譯

參考專案根目錄下的 Makefile, 直接用 make 編譯。

測試用例

專案提供了三個測試用例(見 threadpool/test/),我們可以以此來學習執行緒池的用法並測試是否正常工作。這裡提供其中一個:

#define THREAD 32
#define QUEUE  256

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>

#include "threadpool.h"

int tasks = 0, done = 0;
pthread_mutex_t lock;

void dummy_task(void *arg) {
    usleep(10000);
    pthread_mutex_lock(&lock);
    /* 記錄成功完成的任務數 */
    done++;
    pthread_mutex_unlock(&lock);
}

int main(int argc, char **argv)
{
    threadpool_t *pool;

    /* 初始化互斥鎖 */
    pthread_mutex_init(&lock, NULL);

    /* 斷言執行緒池建立成功 */
    assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);
    fprintf(stderr, "Pool started with %d threads and "
            "queue size of %d\n", THREAD, QUEUE);

    /* 只要任務佇列還沒滿,就一直新增 */
    while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {
        pthread_mutex_lock(&lock);
        tasks++;
        pthread_mutex_unlock(&lock);
    }

    fprintf(stderr, "Added %d tasks\n", tasks);

    /* 不斷檢查任務數是否完成一半以上,沒有則繼續休眠 */
    while((tasks / 2) > done) {
        usleep(10000);
    }
    /* 這時候銷燬執行緒池,0 代表 immediate_shutdown */
    assert(threadpool_destroy(pool, 0) == 0);
    fprintf(stderr, "Did %d tasks\n", done);

    return 0;
}

原始碼註釋

原始碼註釋一併放在 github, 點我。

threadpool.h

/*
 * Copyright (c) 2013, Mathias Brossard <[email protected]>.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_

#ifdef __cplusplus
/* 對於 C++ 編譯器,指定用 C 的語法編譯 */
extern "C" {
#endif

/**
 * @file threadpool.h
 * @brief Threadpool Header File
 */

 /**
 * Increase this constants at your own risk
 * Large values might slow down your system
 */
#define MAX_THREADS 64
#define MAX_QUEUE 65536

/* 簡化變數定義 */
typedef struct threadpool_t threadpool_t;

/* 定義錯誤碼 */
typedef enum {
    threadpool_invalid        = -1,
    threadpool_lock_failure   = -2,
    threadpool_queue_full     = -3,
    threadpool_shutdown       = -4,
    threadpool_thread_failure = -5
} threadpool_error_t;

typedef enum {
    threadpool_graceful       = 1
} threadpool_destroy_flags_t;

/* 以下是執行緒池三個對外 API */

/**
 * @function threadpool_create
 * @brief Creates a threadpool_t object.
 * @param thread_count Number of worker threads.
 * @param queue_size   Size of the queue.
 * @param flags        Unused parameter.
 * @return a newly created thread pool or NULL
 */
/**
 * 建立執行緒池,有 thread_count 個執行緒,容納 queue_size 個的任務佇列,flags 引數沒有使用
 */
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);

/**
 * @function threadpool_add
 * @brief add a new task in the queue of a thread pool
 * @param pool     Thread pool to which add the task.
 * @param function Pointer to the function that will perform the task.
 * @param argument Argument to be passed to the function.
 * @param flags    Unused parameter.
 * @return 0 if all goes well, negative values in case of error (@see
 * threadpool_error_t for codes).
 */
/**
 *  新增任務到執行緒池, pool 為執行緒池指標,routine 為函式指標, arg 為函式引數, flags 未使用
 */
int threadpool_add(threadpool_t *pool, void (*routine)(void *),
                   void *arg, int flags);

/**
 * @function threadpool_destroy
 * @brief Stops and destroys a thread pool.
 * @param pool  Thread pool to destroy.
 * @param flags Flags for shutdown
 *
 * Known values for flags are 0 (default) and threadpool_graceful in
 * which case the thread pool doesn't accept any new tasks but
 * processes all pending tasks before shutdown.
 */
/**
 * 銷燬執行緒池,flags 可以用來指定關閉的方式
 */
int threadpool_destroy(threadpool_t *pool, int flags);

#ifdef __cplusplus
}
#endif

#endif /* _THREADPOOL_H_ */

threadpool.c

/*
 * Copyright (c) 2013, Mathias Brossard <[email protected]>.
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions are
 * met:
 *
 *  1. Redistributions of source code must retain the above copyright
 *     notice, this list of conditions and the following disclaimer.
 *
 *  2. Redistributions in binary form must reproduce the above copyright
 *     notice, this list of conditions and the following disclaimer in the
 *     documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * @file threadpool.c
 * @brief Threadpool implementation file
 */

#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

#include "threadpool.h"

/**
 * 執行緒池關閉的方式
 */
typedef enum {
    immediate_shutdown = 1,
    graceful_shutdown  = 2
} threadpool_shutdown_t;

/**
 *  @struct threadpool_task
 *  @brief the work struct
 *
 *  @var function Pointer to the function that will perform the task.
 *  @var argument Argument to be passed to the function.
 */
/**
 * 執行緒池一個任務的定義
 */

typedef struct {
    void (*function)(void *);
    void *argument;
} threadpool_task_t;

/**
 *  @struct threadpool
 *  @brief The threadpool struct
 *
 *  @var notify       Condition variable to notify worker threads.
 *  @var threads      Array containing worker threads ID.
 *  @var thread_count Number of threads
 *  @var queue        Array containing the task queue.
 *  @var queue_size   Size of the task queue.
 *  @var head         Index of the first element.
 *  @var tail         Index of the next element.
 *  @var count        Number of pending tasks
 *  @var shutdown     Flag indicating if the pool is shutting down
 *  @var started      Number of started threads
 */
/**
 * 執行緒池的結構定義
 *  @var lock         用於內部工作的互斥鎖
 *  @var notify       執行緒間通知的條件變數
 *  @var threads      執行緒陣列,這裡用指標來表示,陣列名 = 首元素指標
 *  @var thread_count 執行緒數量
 *  @var queue        儲存任務的陣列,即任務佇列
 *  @var queue_size   任務佇列大小
 *  @var head         任務佇列中首個任務位置(注:任務佇列中所有任務都是未開始執行的)
 *  @var tail         任務佇列中最後一個任務的下一個位置(注:佇列以陣列儲存,head 和 tail 指示佇列位置)
 *  @var count        任務佇列裡的任務數量,即等待執行的任務數
 *  @var shutdown     表示執行緒池是否關閉
 *  @var started      開始的執行緒數
 */
struct threadpool_t {
  pthread_mutex_t lock;
  pthread_cond_t notify;
  pthread_t *threads;
  threadpool_task_t *queue;
  int thread_count;
  int queue_size;
  int head;
  int tail;
  int count;
  int shutdown;
  int started;
};

/**
 * @function void *threadpool_thread(void *threadpool)
 * @brief the worker thread
 * @param threadpool the pool which own the thread
 */
/**
 * 執行緒池裡每個執行緒在跑的函式
 * 宣告 static 應該只為了使函式只在本檔案內有效
 */
static void *threadpool_thread(void *threadpool);

int threadpool_free(threadpool_t *pool);

threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
    if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {
        return NULL;
    }

    threadpool_t *pool;
    int i;

    /* 申請記憶體建立記憶體池物件 */
    if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
        goto err;
    }

    /* Initialize */
    pool->thread_count = 0;
    pool->queue_size = queue_size;
    pool->head = pool->tail = pool->count = 0;
    pool->shutdown = pool->started = 0;

    /* Allocate thread and task queue */
    /* 申請執行緒陣列和任務佇列所需的記憶體 */
    pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
    pool->queue = (threadpool_task_t *)malloc
        (sizeof(threadpool_task_t) * queue_size);

    /* Initialize mutex and conditional variable first */
    /* 初始化互斥鎖和條件變數 */
    if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
       (pthread_cond_init(&(pool->notify), NULL) != 0) ||
       (pool->threads == NULL) ||
       (pool->queue == NULL)) {
        goto err;
    }

    /* Start worker threads */
    /* 建立指定數量的執行緒開始執行 */
    for(i = 0; i < thread_count; i++) {
        if(pthread_create(&(pool->threads[i]), NULL,
                          threadpool_thread, (void*)pool) != 0) {
            threadpool_destroy(pool, 0);
            return NULL;
        }
        pool->thread_count++;
        pool->started++;
    }

    return pool;

 err:
    if(pool) {
        threadpool_free(pool);
    }
    return NULL;
}

int threadpool_add(threadpool_t *pool, void (*function)(void *),
                   void *argument, int flags)
{
    int err = 0;
    int next;

    if(pool == NULL || function == NULL) {
        return threadpool_invalid;
    }

    /* 必須先取得互斥鎖所有權 */
    if(pthread_mutex_lock(&(pool->lock)) != 0) {
        return threadpool_lock_failure;
    }

    /* 計算下一個可以儲存 task 的位置 */
    next = pool->tail + 1;
    next = (next == pool->queue_size) ? 0 : next;

    do {
        /* Are we full ? */
        /* 檢查是否任務佇列滿 */
        if(pool->count == pool->queue_size) {
            err = threadpool_queue_full;
            break;
        }

        /* Are we shutting down ? */
        /* 檢查當前執行緒池狀態是否關閉 */
        if(pool->shutdown) {
            err = threadpool_shutdown;
            break;
        }

        /* Add task to queue */
        /* 在 tail 的位置放置函式指標和引數,新增到任務佇列 */
        pool->queue[pool->tail].function = function;
        pool->queue[pool->tail].argument = argument;
        /* 更新 tail 和 count */
        pool->tail = next;
        pool->count += 1;

        /* pthread_cond_broadcast */
        /*
         * 發出 signal,表示有 task 被新增進來了
         * 如果由因為任務佇列空阻塞的執行緒,此時會有一個被喚醒
         * 如果沒有則什麼都不做
         */
        if(pthread_cond_signal(&(pool->notify)) != 0) {
            err = threadpool_lock_failure;
            break;
        }
        /*
         * 這裡用的是 do { ... } while(0) 結構
         * 保證過程最多被執行一次,但在中間方便因為異常而跳出執行塊
         */
    } while(0);

    /* 釋放互斥鎖資源 */
    if(pthread_mutex_unlock(&pool->lock) != 0) {
        err = threadpool_lock_failure;
    }

    return err;
}

int threadpool_destroy(threadpool_t *pool, int flags)
{
    int i, err = 0;

    if(pool == NULL) {
        return threadpool_invalid;
    }

    /* 取得互斥鎖資源 */
    if(pthread_mutex_lock(&(pool->lock)) != 0) {
        return threadpool_lock_failure;
    }

    do {
        /* Already shutting down */
        /* 判斷是否已在其他地方關閉 */
        if(pool->shutdown) {
            err = threadpool_shutdown;
            break;
        }

        /* 獲取指定的關閉方式 */
        pool->shutdown = (flags & threadpool_graceful) ?
            graceful_shutdown : immediate_shutdown;

        /* Wake up all worker threads */
        /* 喚醒所有因條件變數阻塞的執行緒,並釋放互斥鎖 */
        if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
           (pthread_mutex_unlock(&(pool->lock)) != 0)) {
            err = threadpool_lock_failure;
            break;
        }

        /* Join all worker thread */
        /* 等待所有執行緒結束 */
        for(i = 0; i < pool->thread_count; i++) {
            if(pthread_join(pool->threads[i], NULL) != 0) {
                err = threadpool_thread_failure;
            }
        }
        /* 同樣是 do{...} while(0) 結構*/
    } while(0);

    /* Only if everything went well do we deallocate the pool */
    if(!err) {
        /* 釋放記憶體資源 */
        threadpool_free(pool);
    }
    return err;
}

int threadpool_free(threadpool_t *pool)
{
    if(pool == NULL || pool->started > 0) {
        return -1;
    }

    /* Did we manage to allocate ? */
    /* 釋放執行緒 任務佇列 互斥鎖 條件變數 執行緒池所佔記憶體資源 */
    if(pool->threads) {
        free(pool->threads);
        free(pool->queue);

        /* Because we allocate pool->threads after initializing the
           mutex and condition variable, we're sure they're
           initialized. Let's lock the mutex just in case. */
        pthread_mutex_lock(&(pool->lock));
        pthread_mutex_destroy(&(pool->lock));
        pthread_cond_destroy(&(pool->notify));
    }
    free(pool);
    return 0;
}


static void *threadpool_thread(void *threadpool)
{
    threadpool_t *pool = (threadpool_t *)threadpool;
    threadpool_task_t task;

    for(;;) {
        /* Lock must be taken to wait on conditional variable */
        /* 取得互斥鎖資源 */
        pthread_mutex_lock(&(pool->lock));

        /* Wait on condition variable, check for spurious wakeups.
           When returning from pthread_cond_wait(), we own the lock. */
        /* 用 while 是為了在喚醒時重新檢查條件 */
        while((pool->count == 0) && (!pool->shutdown)) {
            /* 任務佇列為空,且執行緒池沒有關閉時阻塞在這裡 */
            pthread_cond_wait(&(pool->notify), &(pool->lock));
        }

        /* 關閉的處理 */
        if((pool->shutdown == immediate_shutdown) ||
           ((pool->shutdown == graceful_shutdown) &&
            (pool->count == 0))) {
            break;
        }

        /* Grab our task */
        /* 取得任務佇列的第一個任務 */
        task.function = pool->queue[pool->head].function;
        task.argument = pool->queue[pool->head].argument;
        /* 更新 head 和 count */
        pool->head += 1;
        pool->head = (pool->head == pool->queue_size)