1. 程式人生 > >【Linux】生產者消費者程式設計實現-執行緒池+訊號量

【Linux】生產者消費者程式設計實現-執行緒池+訊號量

生產者消費者程式設計實現,採用了執行緒池以及訊號量技術。

執行緒的概念就不多說,首先說一下多執行緒的好處:多執行緒技術主要解決處理器單元內多個執行緒執行的問題,它可以顯著減少處理器單元的閒置時間,增加處理器單元的吞吐能力。

那麼為什麼又需要執行緒池呢?

我們知道應用程式建立一個物件,然後銷燬物件是很耗費資源的。建立執行緒,銷燬執行緒,也是如此。因此,我們就預先生成一些執行緒,等到我們使用的時候在進行排程,於是,一些"池化資源"技術就這樣的產生了。

一般一個簡單執行緒池至少包含下列組成部分。

1)執行緒池管理器(ThreadPoolManager:用於建立並管理執行緒池

2)工作執行緒(

WorkThread執行緒池中執行緒

3)任務介面(Task:每個任務必須實現的介面,以供工作執行緒排程任務的執行。

4)任務佇列:用於存放沒有處理的任務。提供一種緩衝機制。

圖示:


圖1 執行緒池圖解

生產者消費者模型C語言程式碼實現:

thread_pool_pv.h:

//執行緒池程式設計實現
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <stdio.h>
#include <stdlib.h>
#include <semaphore.h>//訊號量sem_t
#include <pthread.h>


//任務介面,執行緒呼叫的函式
typedef void* (*FUNC)(void *arg);

//任務資料結構
typedef struct thread_pool_job_s{
        FUNC function;//執行緒呼叫的函式
        void *arg;//函式引數
        struct thread_pool_job_s *pre;//指向上一個節點
        struct thread_pool_job_s *next;//指向下一個節點
}thread_pool_job;

//工作佇列
typedef struct thread_pool_job_queue_s{
        thread_pool_job *head;//佇列頭指標
        thread_pool_job *tail;//佇列尾指標
        int num;//任務數目
        sem_t *quene_sem;//訊號量
}thread_pool_job_queue;

//執行緒池(存放消費者程序)
typedef struct thread_pool_s{
        pthread_t *threads;//執行緒
        int threads_num;//執行緒數目
        thread_pool_job_queue *job_queue;//指向工作佇列的指標
}thread_pool;

//typedef struct thread_data_s{
//      pthread_mutex_t *mutex_t;//互斥量
//      thread_pool *tp_p;//指向執行緒池的指標
//}thread_data;

//初始化執行緒池
thread_pool* tp_init(int thread_num);

//初始化工作佇列
int tp_job_quene_init(thread_pool *tp);

//向工作佇列中新增一個元素
void tp_job_quene_add(thread_pool *tp,thread_pool_job *new_job);

//向執行緒池中新增一個工作項
int tp_add_work(thread_pool *tp,void *(*func_p)(void *),void *arg);

//取得工作佇列的最後個節點
thread_pool_job* tp_get_lastjob(thread_pool *tp);

//刪除工作佇列的最後個節點
int tp_delete__lastjob(thread_pool *tp);

//銷燬執行緒池
void tp_destroy(thread_pool *tp);

//消費者執行緒函式
void* tp_thread_func(thread_pool *tp);

//生產者執行緒執行函式
void* thread_func_producer(thread_pool *tp);

#endif



thread_pool_pv.c:
//執行緒池程式設計實現
#include "thread_pool.h"

//互斥量,用於對工作佇列的訪問
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

//標記執行緒池是否處於可用狀態
static int tp_alive = 1;

//初始化執行緒池
thread_pool* tp_init(int thread_num){
        thread_pool *tp;
        int i;

        if(thread_num < 1)
                thread_num = 1;

        tp = (thread_pool *)malloc(sizeof(thread_pool));
        //判斷記憶體分配是否成功
        if(NULL == tp){
                printf("ERROR:allocate memory for thread_pool failed\n");
                return NULL;
        }
        tp->threads_num = thread_num;
        //分配執行緒所佔記憶體空間
        tp->threads = (pthread_t*)malloc(thread_num * sizeof(pthread_t));
        //判斷記憶體分配是否成功
        if(NULL == tp->threads){
                printf("ERROR:allocate memory for threads in thread pool failed\n");
                return NULL;
        }
        
        if(tp_job_quene_init(tp))
                return NULL;
        
        tp->job_queue->quene_sem = (sem_t *)malloc(sizeof(sem_t));
        sem_init(tp->job_queue->quene_sem,0,0);//訊號量初始化

        //初始化執行緒
        for(i = 0;i < thread_num;++i){
                pthread_create(&(tp->threads[i]),NULL,(void *)tp_thread_func,(void *)tp);
        }

        return tp;
}

//初始化工作佇列
int tp_job_quene_init(thread_pool *tp){
        tp->job_queue = (thread_pool_job_queue *)malloc(sizeof(thread_pool_job_queue));

        if(NULL == tp->job_queue){
                return -1;
        }

        tp->job_queue->head = NULL;
        tp->job_queue->tail = NULL;
        tp->job_queue->num = 0;

        return 0;
}

//執行緒函式
void* tp_thread_func(thread_pool *tp){
        FUNC function;
        void *arg_buf;
        thread_pool_job *job_p;

        while(tp_alive){
                //執行緒阻塞,等待訊號量
                if(sem_wait(tp->job_queue->quene_sem)){
                        printf("thread waiting for semaphore....\n");
                        exit(1);
                }
                if(tp_alive){
                        pthread_mutex_lock(&mutex);
                        job_p = tp_get_lastjob(tp);
                        if(NULL == job_p){
                                pthread_mutex_unlock(&mutex);
                                continue;
                        }
                        function = job_p->function;
                        arg_buf = job_p->arg;
                        if(tp_delete__lastjob(tp))
                                return;
                        pthread_mutex_unlock(&mutex);
                        //執行指定的執行緒函式
                        printf("consumer...get a job from job quene and run it!\n");
                        function(arg_buf);
                        free(job_p);
                }
                else
                        return;
        }

        return;
}

//向工作佇列中新增一個元素
void tp_job_quene_add(thread_pool *tp,thread_pool_job *new_job){
        new_job->pre = NULL;
        new_job->next = NULL;
        thread_pool_job *old_head_job = tp->job_queue->head;

        if(NULL == old_head_job){
                tp->job_queue->head = new_job;
                tp->job_queue->tail = new_job;
        }
        else{
                old_head_job->pre = new_job;
                new_job->next = old_head_job;
                tp->job_queue->head = new_job;
        }

        ++(tp->job_queue->num);

        sem_post(tp->job_queue->quene_sem);
}

//取得工作佇列的最後一個節點
thread_pool_job* tp_get_lastjob(thread_pool *tp){
        return tp->job_queue->tail;
}

//刪除工作佇列的最後個節點
int tp_delete__lastjob(thread_pool *tp){
        if(NULL == tp)
                return -1;

        thread_pool_job *last_job = tp->job_queue->tail;
        if(0 == tp->job_queue->num){
                return -1;
        }
        else if(1 == tp->job_queue->num){
                tp->job_queue->head = NULL;
                tp->job_queue->tail = NULL;
        }
        else{
                last_job->pre->next = NULL;
                tp->job_queue->tail = last_job->pre;
        }

        //修改相關變數
        --(tp->job_queue->num);

        return 0;
}

//向執行緒池中新增一個工作項
int tp_add_work(thread_pool *tp,void *(*func_p)(void *),void *arg){
        thread_pool_job *new_job = (thread_pool_job *)malloc(sizeof(thread_pool_job));
        if(NULL == new_job){
                printf("ERROR:allocate memory for new job failed!\n");
                exit(1);
        }
        new_job->function = func_p;
        new_job->arg = arg;
        pthread_mutex_lock(&mutex);
        tp_job_quene_add(tp,new_job);
        pthread_mutex_unlock(&mutex);
}


//銷燬執行緒池
void tp_destroy(thread_pool *tp){
        int i;
        tp_alive = 0;
        
        //等待執行緒執行結束
        //sleep(10);
        for(i = 0;i < tp->threads_num;++i){
                pthread_join(tp->threads[i],NULL);
        }
        free(tp->threads);

        if(sem_destroy(tp->job_queue->quene_sem)){
                printf("ERROR:destroy semaphore failed!\n");
        }
        free(tp->job_queue->quene_sem);

        //刪除job佇列
        thread_pool_job *current_job = tp->job_queue->tail;
        while(tp->job_queue->num){
                tp->job_queue->tail = current_job->pre;
                free(current_job);
                current_job = tp->job_queue->tail;
                --(tp->job_queue->num);
        }
        tp->job_queue->head = NULL;
        tp->job_queue->tail = NULL;
}

//自定義執行緒執行函式
void* thread_func1(){
        printf("Task1 running...by Thread  :%u\n",(unsigned int)pthread_self());
}

//自定義執行緒執行函式
void* thread_func2(){
        printf("Task2 running...by Thread  :%u\n",(unsigned int)pthread_self());
}

//生產者執行緒執行函式
void* thread_func_producer(thread_pool *tp){
        while(1){
                printf("producer...add a job(job1) to job quene!\n");
                tp_add_work(tp,(void*)thread_func1,NULL);
                sleep(1);
                printf("producer...add a job(job2) to job quene!\n");
                tp_add_work(tp,(void*)thread_func2,NULL);
        }
}

int main(){
        thread_pool *tp = tp_init(5);
        int i;
        int arg = 7;
        pthread_t producer_thread_id;//生產者執行緒ID
        pthread_create(&producer_thread_id,NULL,(void *)thread_func_producer,(void *)tp);

        pthread_join(producer_thread_id,NULL);
        tp_destroy(tp);

        return 0;
}


執行結果: