1. 程式人生 > >Liun下執行緒池的原理及實現

Liun下執行緒池的原理及實現

一.基本概念介紹

程序

執行中的程式,程式執行的動態描述

執行緒

  1. 執行緒是輕量級程序,沒有獨立的地址空間,共享程序的資源
  2. 在linux下執行緒是CPU的基本排程單位。在巨集觀上執行緒是並行的,使用多執行緒可以有效提高cpu的使用率。
  3. 一個程式中有一個主執行緒,可以建立其他執行緒並行執行
  4. 一個程式有多個執行緒就有多個執行流程。相當於單進執行緒是一條路走下去,而多執行緒是有很多條路同時走。

為啥有了程序還需要執行緒

  1. 執行緒是輕量級的程序,系統維護執行緒需要的資源比程序少
  2. 執行緒的切換比程序切換更快
  3. 執行緒之間的通訊更為簡單,直接共享全域性變數

執行緒同步

1.互斥鎖
  多個執行緒共享一個資源時,會出現意想不到的結果。使用互斥鎖來解決執行緒之間的資源競爭問題。
  一個執行緒訪問資源必須先獲取鎖,訪問完資源再解鎖。如果沒有獲取到鎖就掛起執行緒(執行緒掛起不消耗CPU)
2. 條件變數
  條件變數是對互斥鎖的補充,它解決執行緒之間相互合作的問題。使用條件變數都搭配一個互斥鎖來使用:
加鎖
while(條件)
{
等待條件變數(掛起執行緒 釋放鎖)
}
解鎖
加鎖
操作
解鎖
傳送變數執行緒量量執行緒量的執行緒

二.執行緒池原理

1.什麼執行緒池
  執行緒池:負責建立和銷燬執行緒,讓程式設計師使用執行緒變得簡單。
2.什麼時候使用執行緒
 當需要建立大量的執行緒處理任務,並且任務執行的時間很短。這種情況下建立執行緒和銷燬執行緒的時間大於任務執行的時間程式效率低,在這種情況下可以使用執行緒池。
3.執行緒池的實現原理
 執行緒池內部維護著一個任務佇列。初始化時建立一堆執行緒執行,執行緒的入口函式實現從任務佇列中取出任務執行。任務執行完不用銷燬執行緒等待下一個任務來繼續執行。如果任務佇列中沒有任務就掛起執行緒,當向執行緒池新增任務時喚醒一個執行緒執行。
4.執行緒池結構體設計
_互斥鎖 操作任務和執行緒內部資料時必須是執行緒安全的
-條件變數 使用條件變數來實現工作執行緒的掛起和喚醒
-任務佇列頭指標 儲存任務
-執行緒id陣列指標 儲存執行緒id 方便控制執行緒
-執行緒執行狀態標識 啟動和停止執行緒
-執行緒銷燬標識 用來銷燬執行緒池
-最大執行緒數
-任務佇列個數
5.任務佇列設計
-任務佇列使用連結串列實現
6.對外提供介面
-執行緒初始化
-新增任務
-執行緒池的銷燬

三.執行緒池的實現

#include <stdio.h>
#include <stdlib.h>
#include<pthread.h>
#include<unistd.h>
#include<string.h>

/*================================================
 * 巨集定義
 * ===============================================*/
#define THREAD_DESTORY 0
#define SUCCESS  0
#define FAILURE  1

#define LOG(fmt, ...) printf(fmt, ##__VA_ARGS__)


/*================================================
 * 結構體定義
 * ===============================================*/

/*回撥介面*/
typedef void *(*THREAD_FUNC)(void*);

/*任務介面結構體*/
typedef struct task
{
    THREAD_FUNC func;  /*執行緒回撥函式*/
    void *arg;          /*回撥函式引數*/
    struct task *next;
}task;

/*執行緒池結構體定義*/
typedef struct threadpool
{
    pthread_cond_t task_queue_ready;  /*任務佇列條件變數*/
    pthread_mutex_t tpool_lock;     /*執行緒池互斥鎖*/
    task * tasks;                   /*任務連結串列頭*/
    pthread_t * t_ids;              /*執行緒id陣列指標*/
    int isdestory;                  /*執行緒池銷燬標誌*/
    int max_size;                   /*執行緒最大個數*/
    int tasks_size;                 /*任務佇列等待個數*/

}threadpool;


/*================================================
 * 全域性變數定義
 * ===============================================*/
threadpool *g_pool=NULL; /*執行緒池指標*/
int g_isrunning=0;                  /*執行緒池執行狀態  銷燬執行緒時用到*/

/*================================================
 * 函式定義
 * ===============================================*/


int tpool_init(int thread_count);       /*執行緒池初始化函式*/
void t_pool_destory();                  /*執行緒池銷燬函式*/
void routine();                         /*執行緒入口函式*/
int add_task(THREAD_FUNC func,void* arg); /*執行緒池新增任務函式*/
/*================================================
 * 函式實現
 * ===============================================*/



/*
 * 函式名:tpool_init
 * 函式功能:初始化執行緒池
 * 引數
 *      thread_count  執行緒池執行緒的個數
 * 返回值
 *      FAILURE 失敗
 *      SUCCESS 成功
 * */
int tpool_init(int thread_count)
{
    if(NULL != g_pool)
    {
        LOG("已經初始化過啦 \n");
        return FAILURE;
    }
    LOG("開始初始化\n");
    g_pool = (threadpool*)malloc(sizeof(struct threadpool));
    LOG("%d\n",sizeof(struct threadpool));
    if(NULL == g_pool)
    {
        LOG("記憶體分配失敗\n");
        return FAILURE;
    }
    memset(g_pool,'\0',sizeof(sizeof(struct threadpool)));

    g_isrunning=1;

    pthread_cond_init(&g_pool->task_queue_ready,NULL);
    pthread_mutex_init(&g_pool->tpool_lock,NULL);
    g_pool->isdestory=1;
    g_pool->max_size=thread_count;
    g_pool->tasks_size=0;
    g_pool->tasks=NULL;


    g_pool->t_ids = (pthread_t*)malloc(sizeof(pthread_t)*g_pool->max_size);

    if(NULL == g_pool->t_ids)
    {
        LOG("分配記憶體失敗\n");

        free(g_pool);
        return FAILURE;
    }

    int i=0,ret;
    for(;i<g_pool->max_size;i++)
    {
        ret = pthread_create(&g_pool->t_ids[i],NULL,(void*(*)(void*))routine,NULL);
        LOG("%d\n",g_pool->t_ids[i]);
        if(ret)
        {
            g_pool->t_ids[i]=0;
            t_pool_destory();
        }
    }
    return SUCCESS;
}


/*
 * 函式名:tpool_destory
 * 函式功能:執行緒池銷燬
 * 引數
 *
 * 返回值
 *
 * */
void t_pool_destory()
{
    LOG("t_pool_destory\n");
    if(NULL == g_pool)
    {
        return ;
    }
    pthread_mutex_lock(&g_pool->tpool_lock);
    if(THREAD_DESTORY == g_pool->isdestory)
    {
        LOG("t_pool_destory isdestory is 0\n");
        pthread_mutex_unlock(&g_pool->tpool_lock);
        return;
    }
    g_isrunning=0;
    g_pool->isdestory=THREAD_DESTORY;
    pthread_cond_broadcast(&g_pool->task_queue_ready);
    pthread_mutex_unlock(&g_pool->tpool_lock);
    free(g_pool->t_ids);

    task* t = NULL;
    while(NULL != g_pool->tasks)
    {
        t= g_pool->tasks;
        g_pool->tasks=g_pool->tasks->next;
        free(t);
    }

    /*釋放鎖和條件變數*/
    pthread_mutex_destroy(&g_pool->tpool_lock);
    pthread_cond_destroy(&g_pool->task_queue_ready);

    /*釋放執行緒池*/
    free(g_pool);
    g_pool=NULL;
}


/*
 * 函式名:routine
 * 函式功能:從任務列表中取出任務執行
 * 引數
 *
 * 返回值
 *
 * */
void routine()
{

    task* t=NULL;
    LOG("routine \n");
    if(NULL==g_pool)
    {
        LOG("routine g_pool is null \n");
        pthread_exit(NULL);
    }

    while(g_isrunning)
    {
        LOG("routine while中\n");
        pthread_mutex_lock(&g_pool->tpool_lock);
        /*任務佇列為空 執行緒池不銷燬執行緒睡眠*/
        while((0>=g_pool->tasks_size)&&(1==g_pool->isdestory))
        {
            LOG("routine 任務佇列為空 執行緒掛起\n");
            pthread_cond_wait(&g_pool->task_queue_ready,&g_pool->tpool_lock);
        }

        /*執行緒池銷燬就退出執行緒*/
        if(THREAD_DESTORY==g_pool->isdestory)
        {
            LOG("isdestory broadcast wake thread exit\n");
            pthread_mutex_unlock(&g_pool->tpool_lock);
            pthread_exit(NULL);
        }
        t=g_pool->tasks;
        g_pool->tasks = g_pool->tasks->next;

        g_pool->tasks_size--;
        pthread_mutex_unlock(&g_pool->tpool_lock);

        t->func(t->arg);

        free(t);
    }
    LOG("routine g_isrunning is 0 thread exit\n");
    pthread_exit(NULL);
}

/*
 * 函式名:add_task
 * 函式功能:給執行緒池提交任務
 * 引數
 *      func 任務回撥函式
 *      arg 回撥函式引數
 * 返回值
 *  0 成功
 *  其他 錯誤
 * */
int add_task(THREAD_FUNC func,void* arg)
{
    /*引數檢查*/
    if(NULL== g_pool)
    {
        LOG("add_task g_pool is null\n");
        return 1;
    }

    if(NULL==func)
    {
        LOG("add_task func is null\n");
        return 1;
    }

    pthread_mutex_lock(&g_pool->tpool_lock);

    /*建立一個任務節點*/
    task * t = (task*)malloc(sizeof(task));

    if(NULL==t)
    {
        LOG("add_task malloc is failure\n");
        pthread_mutex_unlock(&g_pool->tpool_lock);
        return 1;
    }
    t->arg=arg;
    t->func=func;
    t->next=NULL;
    task* w= g_pool->tasks;
    if(NULL!=w)
    {
        while(NULL!=w->next)
        {
            w=w->next;
        }
        w->next=t;
    }
    else
    {
        g_pool->tasks=t;
    }
    g_pool->tasks_size++;
    LOG("create task is success\n");
    pthread_cond_signal(&g_pool->task_queue_ready);
    pthread_mutex_unlock(&g_pool->tpool_lock);
}