【原始碼剖析】threadpool —— 基於 pthread 實現的簡單執行緒池
部落格新地址:https://github.com/AngryHacker/articles/issues/1#issue-369867252
執行緒池介紹
執行緒池可以說是專案中經常會用到的元件,在這裡假設讀者都有一定的多執行緒基礎,如果沒有的話不妨在這裡進行了解:POSIX 多執行緒基礎。
執行緒池是什麼?我的簡單理解是有一組預先派生的執行緒,然後有一個管理員來管理和排程這些執行緒,你只需不斷把需要完成的任務交給他,他就會排程執行緒的資源來幫你完成。
那麼管理員是怎麼做的呢?一種簡單的方式就是,管理員管理一個任務的佇列,如果收到新的任務,就把任務加到佇列尾。每個執行緒盯著佇列,如果佇列非空,就去佇列頭拿一個任務來處理(每個任務只能被一個執行緒拿到),處理完了就繼續去佇列取任務。如果沒有任務了,執行緒就休眠,直到任務佇列不為空。如果這個管理員更聰明一點,他可能會在沒有任務或任務少的時候減少執行緒的數量,任務處理不過來的時候增加執行緒的數量,這樣就實現了資源的動態管理。
那麼任務是什麼呢?以後臺伺服器為例,每一個使用者的請求就是一個任務,執行緒不斷的在請求佇列裡取出請求,完成後繼續處理下一個請求。
簡單圖示為:
執行緒池有一個好處就是減少執行緒建立和銷燬的時間,在任務處理時間比較短的時候這個好處非常顯著,可以提升任務處理的效率。
執行緒池實現
這裡介紹的是執行緒池的一個簡單實現,在建立的時候預先派生指定數量的執行緒,然後去任務佇列取新增進來的任務進行處理就好。
作者說之後會新增更多特性,我們作為學習之後就以這個版本為準就好了。
專案主頁:threadpool
資料結構
主要有兩個自定義的資料結構
threadpool_task_t
用於儲存一個等待執行的任務。一個任務需要指明:要執行的對應函式及函式的引數。所以這裡的 struct 裡有函式指標和 void 指標。
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
thread_pool_t
一個執行緒池的結構。因為是 C 語言,所以這裡任務佇列是用陣列,並維護佇列頭和佇列尾來實現。
struct threadpool_t {
pthread_mutex_t lock; /* 互斥鎖 */
pthread_cond_t notify; /* 條件變數 */
pthread_t *threads; /* 執行緒陣列的起始指標 */
threadpool_task_t *queue; /* 任務佇列陣列的起始指標 */
int thread_count; /* 執行緒數量 */
int queue_size; /* 任務佇列長度 */
int head; /* 當前任務佇列頭 */
int tail; /* 當前任務佇列尾 */
int count; /* 當前待執行的任務數 */
int shutdown; /* 執行緒池當前狀態是否關閉 */
int started; /* 正在執行的執行緒數 */
};
函式
對外介面
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);
建立執行緒池,用 thread_count 指定派生執行緒數,queue_size 指定任務佇列長度,flags 為保留引數,未使用。int threadpool_add(threadpool_t *pool, void (*routine)(void *),void *arg, int flags);
新增需要執行的任務。第二個引數為對應函式指標,第三個為對應函式引數。flags 未使用。int threadpool_destroy(threadpool_t *pool, int flags);
銷燬存在的執行緒池。flags 可以指定是立刻結束還是平和結束。立刻結束指不管任務佇列是否為空,立刻結束。平和結束指等待任務佇列的任務全部執行完後再結束,在這個過程中不可以新增新的任務。
內部輔助函式
static void *threadpool_thread(void *threadpool);
執行緒池每個執行緒所執行的函式。int threadpool_free(threadpool_t *pool);
釋放執行緒池所申請的記憶體資源。
執行緒池使用
編譯
參考專案根目錄下的 Makefile, 直接用 make
編譯。
測試用例
專案提供了三個測試用例(見 threadpool/test/
),我們可以以此來學習執行緒池的用法並測試是否正常工作。這裡提供其中一個:
#define THREAD 32
#define QUEUE 256
#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include <assert.h>
#include "threadpool.h"
int tasks = 0, done = 0;
pthread_mutex_t lock;
void dummy_task(void *arg) {
usleep(10000);
pthread_mutex_lock(&lock);
/* 記錄成功完成的任務數 */
done++;
pthread_mutex_unlock(&lock);
}
int main(int argc, char **argv)
{
threadpool_t *pool;
/* 初始化互斥鎖 */
pthread_mutex_init(&lock, NULL);
/* 斷言執行緒池建立成功 */
assert((pool = threadpool_create(THREAD, QUEUE, 0)) != NULL);
fprintf(stderr, "Pool started with %d threads and "
"queue size of %d\n", THREAD, QUEUE);
/* 只要任務佇列還沒滿,就一直新增 */
while(threadpool_add(pool, &dummy_task, NULL, 0) == 0) {
pthread_mutex_lock(&lock);
tasks++;
pthread_mutex_unlock(&lock);
}
fprintf(stderr, "Added %d tasks\n", tasks);
/* 不斷檢查任務數是否完成一半以上,沒有則繼續休眠 */
while((tasks / 2) > done) {
usleep(10000);
}
/* 這時候銷燬執行緒池,0 代表 immediate_shutdown */
assert(threadpool_destroy(pool, 0) == 0);
fprintf(stderr, "Did %d tasks\n", done);
return 0;
}
原始碼註釋
原始碼註釋一併放在 github, 點我。
threadpool.h
/*
* Copyright (c) 2013, Mathias Brossard <[email protected]>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef _THREADPOOL_H_
#define _THREADPOOL_H_
#ifdef __cplusplus
/* 對於 C++ 編譯器,指定用 C 的語法編譯 */
extern "C" {
#endif
/**
* @file threadpool.h
* @brief Threadpool Header File
*/
/**
* Increase this constants at your own risk
* Large values might slow down your system
*/
#define MAX_THREADS 64
#define MAX_QUEUE 65536
/* 簡化變數定義 */
typedef struct threadpool_t threadpool_t;
/* 定義錯誤碼 */
typedef enum {
threadpool_invalid = -1,
threadpool_lock_failure = -2,
threadpool_queue_full = -3,
threadpool_shutdown = -4,
threadpool_thread_failure = -5
} threadpool_error_t;
typedef enum {
threadpool_graceful = 1
} threadpool_destroy_flags_t;
/* 以下是執行緒池三個對外 API */
/**
* @function threadpool_create
* @brief Creates a threadpool_t object.
* @param thread_count Number of worker threads.
* @param queue_size Size of the queue.
* @param flags Unused parameter.
* @return a newly created thread pool or NULL
*/
/**
* 建立執行緒池,有 thread_count 個執行緒,容納 queue_size 個的任務佇列,flags 引數沒有使用
*/
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags);
/**
* @function threadpool_add
* @brief add a new task in the queue of a thread pool
* @param pool Thread pool to which add the task.
* @param function Pointer to the function that will perform the task.
* @param argument Argument to be passed to the function.
* @param flags Unused parameter.
* @return 0 if all goes well, negative values in case of error (@see
* threadpool_error_t for codes).
*/
/**
* 新增任務到執行緒池, pool 為執行緒池指標,routine 為函式指標, arg 為函式引數, flags 未使用
*/
int threadpool_add(threadpool_t *pool, void (*routine)(void *),
void *arg, int flags);
/**
* @function threadpool_destroy
* @brief Stops and destroys a thread pool.
* @param pool Thread pool to destroy.
* @param flags Flags for shutdown
*
* Known values for flags are 0 (default) and threadpool_graceful in
* which case the thread pool doesn't accept any new tasks but
* processes all pending tasks before shutdown.
*/
/**
* 銷燬執行緒池,flags 可以用來指定關閉的方式
*/
int threadpool_destroy(threadpool_t *pool, int flags);
#ifdef __cplusplus
}
#endif
#endif /* _THREADPOOL_H_ */
threadpool.c
/*
* Copyright (c) 2013, Mathias Brossard <[email protected]>.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
*
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/**
* @file threadpool.c
* @brief Threadpool implementation file
*/
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
#include "threadpool.h"
/**
* 執行緒池關閉的方式
*/
typedef enum {
immediate_shutdown = 1,
graceful_shutdown = 2
} threadpool_shutdown_t;
/**
* @struct threadpool_task
* @brief the work struct
*
* @var function Pointer to the function that will perform the task.
* @var argument Argument to be passed to the function.
*/
/**
* 執行緒池一個任務的定義
*/
typedef struct {
void (*function)(void *);
void *argument;
} threadpool_task_t;
/**
* @struct threadpool
* @brief The threadpool struct
*
* @var notify Condition variable to notify worker threads.
* @var threads Array containing worker threads ID.
* @var thread_count Number of threads
* @var queue Array containing the task queue.
* @var queue_size Size of the task queue.
* @var head Index of the first element.
* @var tail Index of the next element.
* @var count Number of pending tasks
* @var shutdown Flag indicating if the pool is shutting down
* @var started Number of started threads
*/
/**
* 執行緒池的結構定義
* @var lock 用於內部工作的互斥鎖
* @var notify 執行緒間通知的條件變數
* @var threads 執行緒陣列,這裡用指標來表示,陣列名 = 首元素指標
* @var thread_count 執行緒數量
* @var queue 儲存任務的陣列,即任務佇列
* @var queue_size 任務佇列大小
* @var head 任務佇列中首個任務位置(注:任務佇列中所有任務都是未開始執行的)
* @var tail 任務佇列中最後一個任務的下一個位置(注:佇列以陣列儲存,head 和 tail 指示佇列位置)
* @var count 任務佇列裡的任務數量,即等待執行的任務數
* @var shutdown 表示執行緒池是否關閉
* @var started 開始的執行緒數
*/
struct threadpool_t {
pthread_mutex_t lock;
pthread_cond_t notify;
pthread_t *threads;
threadpool_task_t *queue;
int thread_count;
int queue_size;
int head;
int tail;
int count;
int shutdown;
int started;
};
/**
* @function void *threadpool_thread(void *threadpool)
* @brief the worker thread
* @param threadpool the pool which own the thread
*/
/**
* 執行緒池裡每個執行緒在跑的函式
* 宣告 static 應該只為了使函式只在本檔案內有效
*/
static void *threadpool_thread(void *threadpool);
int threadpool_free(threadpool_t *pool);
threadpool_t *threadpool_create(int thread_count, int queue_size, int flags)
{
if(thread_count <= 0 || thread_count > MAX_THREADS || queue_size <= 0 || queue_size > MAX_QUEUE) {
return NULL;
}
threadpool_t *pool;
int i;
/* 申請記憶體建立記憶體池物件 */
if((pool = (threadpool_t *)malloc(sizeof(threadpool_t))) == NULL) {
goto err;
}
/* Initialize */
pool->thread_count = 0;
pool->queue_size = queue_size;
pool->head = pool->tail = pool->count = 0;
pool->shutdown = pool->started = 0;
/* Allocate thread and task queue */
/* 申請執行緒陣列和任務佇列所需的記憶體 */
pool->threads = (pthread_t *)malloc(sizeof(pthread_t) * thread_count);
pool->queue = (threadpool_task_t *)malloc
(sizeof(threadpool_task_t) * queue_size);
/* Initialize mutex and conditional variable first */
/* 初始化互斥鎖和條件變數 */
if((pthread_mutex_init(&(pool->lock), NULL) != 0) ||
(pthread_cond_init(&(pool->notify), NULL) != 0) ||
(pool->threads == NULL) ||
(pool->queue == NULL)) {
goto err;
}
/* Start worker threads */
/* 建立指定數量的執行緒開始執行 */
for(i = 0; i < thread_count; i++) {
if(pthread_create(&(pool->threads[i]), NULL,
threadpool_thread, (void*)pool) != 0) {
threadpool_destroy(pool, 0);
return NULL;
}
pool->thread_count++;
pool->started++;
}
return pool;
err:
if(pool) {
threadpool_free(pool);
}
return NULL;
}
int threadpool_add(threadpool_t *pool, void (*function)(void *),
void *argument, int flags)
{
int err = 0;
int next;
if(pool == NULL || function == NULL) {
return threadpool_invalid;
}
/* 必須先取得互斥鎖所有權 */
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
/* 計算下一個可以儲存 task 的位置 */
next = pool->tail + 1;
next = (next == pool->queue_size) ? 0 : next;
do {
/* Are we full ? */
/* 檢查是否任務佇列滿 */
if(pool->count == pool->queue_size) {
err = threadpool_queue_full;
break;
}
/* Are we shutting down ? */
/* 檢查當前執行緒池狀態是否關閉 */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* Add task to queue */
/* 在 tail 的位置放置函式指標和引數,新增到任務佇列 */
pool->queue[pool->tail].function = function;
pool->queue[pool->tail].argument = argument;
/* 更新 tail 和 count */
pool->tail = next;
pool->count += 1;
/* pthread_cond_broadcast */
/*
* 發出 signal,表示有 task 被新增進來了
* 如果由因為任務佇列空阻塞的執行緒,此時會有一個被喚醒
* 如果沒有則什麼都不做
*/
if(pthread_cond_signal(&(pool->notify)) != 0) {
err = threadpool_lock_failure;
break;
}
/*
* 這裡用的是 do { ... } while(0) 結構
* 保證過程最多被執行一次,但在中間方便因為異常而跳出執行塊
*/
} while(0);
/* 釋放互斥鎖資源 */
if(pthread_mutex_unlock(&pool->lock) != 0) {
err = threadpool_lock_failure;
}
return err;
}
int threadpool_destroy(threadpool_t *pool, int flags)
{
int i, err = 0;
if(pool == NULL) {
return threadpool_invalid;
}
/* 取得互斥鎖資源 */
if(pthread_mutex_lock(&(pool->lock)) != 0) {
return threadpool_lock_failure;
}
do {
/* Already shutting down */
/* 判斷是否已在其他地方關閉 */
if(pool->shutdown) {
err = threadpool_shutdown;
break;
}
/* 獲取指定的關閉方式 */
pool->shutdown = (flags & threadpool_graceful) ?
graceful_shutdown : immediate_shutdown;
/* Wake up all worker threads */
/* 喚醒所有因條件變數阻塞的執行緒,並釋放互斥鎖 */
if((pthread_cond_broadcast(&(pool->notify)) != 0) ||
(pthread_mutex_unlock(&(pool->lock)) != 0)) {
err = threadpool_lock_failure;
break;
}
/* Join all worker thread */
/* 等待所有執行緒結束 */
for(i = 0; i < pool->thread_count; i++) {
if(pthread_join(pool->threads[i], NULL) != 0) {
err = threadpool_thread_failure;
}
}
/* 同樣是 do{...} while(0) 結構*/
} while(0);
/* Only if everything went well do we deallocate the pool */
if(!err) {
/* 釋放記憶體資源 */
threadpool_free(pool);
}
return err;
}
int threadpool_free(threadpool_t *pool)
{
if(pool == NULL || pool->started > 0) {
return -1;
}
/* Did we manage to allocate ? */
/* 釋放執行緒 任務佇列 互斥鎖 條件變數 執行緒池所佔記憶體資源 */
if(pool->threads) {
free(pool->threads);
free(pool->queue);
/* Because we allocate pool->threads after initializing the
mutex and condition variable, we're sure they're
initialized. Let's lock the mutex just in case. */
pthread_mutex_lock(&(pool->lock));
pthread_mutex_destroy(&(pool->lock));
pthread_cond_destroy(&(pool->notify));
}
free(pool);
return 0;
}
static void *threadpool_thread(void *threadpool)
{
threadpool_t *pool = (threadpool_t *)threadpool;
threadpool_task_t task;
for(;;) {
/* Lock must be taken to wait on conditional variable */
/* 取得互斥鎖資源 */
pthread_mutex_lock(&(pool->lock));
/* Wait on condition variable, check for spurious wakeups.
When returning from pthread_cond_wait(), we own the lock. */
/* 用 while 是為了在喚醒時重新檢查條件 */
while((pool->count == 0) && (!pool->shutdown)) {
/* 任務佇列為空,且執行緒池沒有關閉時阻塞在這裡 */
pthread_cond_wait(&(pool->notify), &(pool->lock));
}
/* 關閉的處理 */
if((pool->shutdown == immediate_shutdown) ||
((pool->shutdown == graceful_shutdown) &&
(pool->count == 0))) {
break;
}
/* Grab our task */
/* 取得任務佇列的第一個任務 */
task.function = pool->queue[pool->head].function;
task.argument = pool->queue[pool->head].argument;
/* 更新 head 和 count */
pool->head += 1;
pool->head = (pool->head == pool->queue_size)