1. 程式人生 > >基於Linux/C++簡單執行緒池的實現

基於Linux/C++簡單執行緒池的實現

我們知道Java語言對於多執行緒的支援十分豐富,JDK本身提供了很多效能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我還沒有看,這裡先佔個坑)還有很多第三方庫的實現。這裡我重複“造輪子”的目的還是為了深入理解C++和Linux執行緒基礎概念,主要以學習的目的。

首先,為什麼要使用執行緒池。因為執行緒的建立、和清理都是需要耗費系統資源的。我們知道Linux中執行緒實際上是由輕量級程序實現的,相對於純理論上的執行緒這個開銷還是有的。假設某個執行緒的建立、執行和銷燬的時間分別為T1、T2、T3,當T1+T3的時間相對於T2不可忽略時,執行緒池的就有必要引入了,尤其是處理數百萬級的高併發處理時。執行緒池提升了多執行緒程式的效能,因為執行緒池裡面的執行緒都是現成的而且能夠重複使用,我們不需要臨時建立大量執行緒,然後在任務結束時又銷燬大量執行緒。一個理想的執行緒池能夠合理地動態調節池內執行緒數量,既不會因為執行緒過少而導致大量任務堆積,也不會因為執行緒過多了而增加額外的系統開銷。

其實執行緒池的原理非常簡單,它就是一個非常典型的生產者消費者同步問題。根據剛才描述的執行緒池的功能,可以看出執行緒池至少有兩個主要動作,一個是主程式不定時地向執行緒池新增任務,另一個是執行緒池裡的執行緒領取任務去執行。且不論任務和執行任務是個什麼概念,但是一個任務肯定只能分配給一個執行緒執行。這樣就可以簡單猜想執行緒池的一種可能的架構了:主程式執行入隊操作,把任務新增到一個佇列裡面;池子裡的多個工作執行緒共同對這個佇列試圖執行出隊操作,這裡要保證同一時刻只有一個執行緒出隊成功,搶奪到這個任務,其他執行緒繼續共同試圖出隊搶奪下一個任務。所以在實現執行緒池之前,我們需要一個佇列。這裡的生產者就是主程式,生產任務(增加任務),消費者就是工作執行緒,消費任務(執行、減少任務)。因為這裡涉及到多個執行緒同時訪問一個佇列的問題,所以我們需要互斥鎖來保護佇列,同時還需要條件變數來處理主執行緒通知任務到達、工作執行緒搶奪任務的問題。

一般來說實現一個執行緒池主要包括以下4個組成部分:

  1. 執行緒管理器:用於建立並管理執行緒池。
  2. 工作執行緒:執行緒池中實際執行任務的執行緒。在初始化執行緒時會預先建立好固定數目的執行緒在池中,這些初始化的執行緒一般處於空閒狀態。
  3. 任務介面:每個任務必須實現的介面。當執行緒池的任務佇列中有可執行任務時,被空間的工作執行緒調去執行(執行緒的閒與忙的狀態是通過互斥量實現的),把任務抽象出來形成一個介面,可以做到執行緒池與具體的任務無關。
  4. 任務佇列:用來存放沒有處理的任務。提供一種緩衝機制。實現這種結構有很多方法,常用的有佇列和連結串列結構。

流程圖如下:

ool.h

#ifndef __THREAD_POOL_H
#define
__THREAD_POOL_H
#include <vector> #include <string> #include <pthread.h> using namespace std; /*執行任務的類:設定任務資料並執行*/ class CTask { protected: string m_strTaskName; //任務的名稱 void* m_ptrData; //要執行的任務的具體資料 public: CTask() = default; CTask(string &taskName): m_strTaskName(taskName), m_ptrData(NULL) {} virtualint Run() = 0; void setData(void* data); //設定任務資料 virtual ~CTask() {} }; /*執行緒池管理類*/ class CThreadPool { private: static vector<CTask*> m_vecTaskList; //任務列表 static bool shutdown; //執行緒退出標誌 int m_iThreadNum; //執行緒池中啟動的執行緒數 pthread_t *pthread_id; static pthread_mutex_t m_pthreadMutex; //執行緒同步鎖 static pthread_cond_t m_pthreadCond; //執行緒同步條件變數 protected: staticvoid* ThreadFunc(void *threadData); //新執行緒的執行緒回撥函式 staticint MoveToIdle(pthread_t tid); //執行緒執行結束後,把自己放入空閒執行緒中 staticint MoveToBusy(pthread_t tid); //移入到忙碌執行緒中去 int Create(); //建立執行緒池中的執行緒 public: CThreadPool(int threadNum); int AddTask(CTask *task); //把任務新增到任務佇列中 int StopAll(); //使執行緒池中的所有執行緒退出 int getTaskSize(); //獲取當前任務佇列中的任務數 }; #endif

2 thread_pool.cpp

#include "thread_pool.h"
#include <cstdio>

void CTask::setData(void* data) {
    m_ptrData = data;
}

//靜態成員初始化
vector<CTask*> CThreadPool::m_vecTaskList;
bool CThreadPool::shutdown = false;
pthread_mutex_t CThreadPool::m_pthreadMutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t CThreadPool::m_pthreadCond = PTHREAD_COND_INITIALIZER;

//執行緒管理類建構函式
CThreadPool::CThreadPool(int threadNum) {
    this->m_iThreadNum = threadNum;
    printf("I will create %d threads.\n", threadNum);
    Create();
}

//執行緒回撥函式
void* CThreadPool::ThreadFunc(void* threadData) {
    pthread_t tid = pthread_self();
    while(1)
    {
        pthread_mutex_lock(&m_pthreadMutex);
        //如果佇列為空,等待新任務進入任務佇列
        while(m_vecTaskList.size() == 0 && !shutdown)
            pthread_cond_wait(&m_pthreadCond, &m_pthreadMutex);
        
        //關閉執行緒
        if(shutdown)
        {
            pthread_mutex_unlock(&m_pthreadMutex);
            printf("[tid: %lu]\texit\n", pthread_self());
            pthread_exit(NULL);
        }
        
        printf("[tid: %lu]\trun: ", tid);
        vector<CTask*>::iterator iter = m_vecTaskList.begin();
        //取出一個任務並處理之
        CTask* task = *iter;
        if(iter != m_vecTaskList.end())
        {
            task = *iter;
            m_vecTaskList.erase(iter);
        }
        
        pthread_mutex_unlock(&m_pthreadMutex);
        
        task->Run();    //執行任務
        printf("[tid: %lu]\tidle\n", tid);
        
    }
    
    return (void*)0;
}

//往任務佇列裡新增任務併發出線程同步訊號
int CThreadPool::AddTask(CTask *task) { 
    pthread_mutex_lock(&m_pthreadMutex);    
    m_vecTaskList.push_back(task);  
    pthread_mutex_unlock(&m_pthreadMutex);  
    pthread_cond_signal(&m_pthreadCond);    
    
    return 0;
}

//建立執行緒
int CThreadPool::Create() { 
    pthread_id = new pthread_t[m_iThreadNum];
        for(int i = 0; i < m_iThreadNum; i++)
            pthread_create(&pthread_id[i], NULL, ThreadFunc, NULL);
        
    return 0;
}

//停止所有執行緒
int CThreadPool::StopAll() {    
    //避免重複呼叫
    if(shutdown)
        return -1;
    printf("Now I will end all threads!\n\n");
    
    //喚醒所有等待程序,執行緒池也要銷燬了
    shutdown = true;
    pthread_cond_broadcast(&m_pthreadCond);
    
    //清楚殭屍
    for(int i = 0; i < m_iThreadNum; i++)
        pthread_join(pthread_id[i], NULL);
    
    delete[] pthread_id;
    pthread_id = NULL;
    
    //銷燬互斥量和條件變數
    pthread_mutex_destroy(&m_pthreadMutex);
    pthread_cond_destroy(&m_pthreadCond);
    
    return 0;
}

//獲取當前佇列中的任務數
int CThreadPool::getTaskSize() {    
    return m_vecTaskList.size();
}

3 main.cpp

#include "thread_pool.h"
#include <cstdio>
#include <stdlib.h>
#include <unistd.h>

class CMyTask: public CTask {   
public:
    CMyTask() = default;    
    int Run() {
        printf("%s\n", (char*)m_ptrData);
        int x = rand()%4 + 1;
        sleep(x);   
        return 0;
    }
    ~CMyTask() {}
};

int main() {
    CMyTask taskObj;
    char szTmp[] = "hello!";
    taskObj.setData((void*)szTmp);
    CThreadPool threadpool(5);  //執行緒池大小為5
    
    for(int i = 0; i < 10; i++)
        threadpool.AddTask(&taskObj);
    
    while(1) {
        printf("There are still %d tasks need to handle\n", threadpool.getTaskSize());
        //任務佇列已沒有任務了
        if(threadpool.getTaskSize()==0) {
            //清除執行緒池
            if(threadpool.StopAll() == -1) {
                printf("Thread pool clear, exit.\n");
                exit(0);
            }
        }
        sleep(2);
        printf("2 seconds later...\n");
    }   
    return 0;
}

4 Makefile

CC:= g++
TARGET:= threadpool
INCLUDE:= -I./
LIBS:= -lpthread
# C++語言編譯引數  
CXXFLAGS:= -std=c++11 -g -Wall -D_REENTRANT
# C預處理引數
# CPPFLAGS:=
OBJECTS :=thread_pool.o main.o
  
$(TARGET): $(OBJECTS)
    $(CC) -o $(TARGET) $(OBJECTS) $(LIBS)
  
# [email protected]表示所有目標集  
%.o:%.cpp   
    $(CC) -c $(CXXFLAGS) $(INCLUDE) $< -o [email protected]
  
.PHONY : clean
clean:   
    -rm -f $(OBJECTS) $(TARGET)

5 輸出結果

I will create 5 threads.
There are still 10 tasks need to handle
[tid: 140056759576320]  run: hello!
[tid: 140056751183616]  run: hello!
[tid: 140056742790912]  run: hello!
[tid: 140056734398208]  run: hello!
[tid: 140056767969024]  run: hello!
2 seconds later...
There are still 5 tasks need to handle
[tid: 140056742790912]  idle
[tid: 140056742790912]  run: hello!
[tid: 140056767969024]  idle
[tid: 140056767969024]  run: hello!
[tid: 140056751183616]  idle
[tid: 140056751183616]  run: hello!
[tid: 140056759576320]  idle
[tid: 140056759576320]  run: hello!
[tid: 140056751183616]  idle
[tid: 140056751183616]  run: hello!
[tid: 140056734398208]  idle
2 seconds later...
There are still 0 tasks need to handle
Now I will end all threads!
2 seconds later...
[tid: 140056734398208]  exit
[tid: 140056767969024]  idle
[tid: 140056767969024]  exit
[tid: 140056759576320]  idle
[tid: 140056759576320]  exit
[tid: 140056751183616]  idle
[tid: 140056751183616]  exit
[tid: 140056742790912]  idle
[tid: 140056742790912]  exit
2 seconds later...
There are still 0 tasks need to handle
Thread pool clear, exit.

擴充套件資料:

相關推薦

基於Linux/C++簡單執行實現

我們知道Java語言對於多執行緒的支援十分豐富,JDK本身提供了很多效能優良的庫,包括ThreadPoolExecutor和ScheduleThreadPoolExecutor等。C++11中的STL也提供了std:thread(然而我還沒有看,這裡先佔個坑)還有很多第三方庫

c++簡單執行實現

執行緒池,簡單來說就是有一堆已經建立好的執行緒(最大數目一定),初始時他們都處於空閒狀態,當有新的任務進來,從執行緒池中取出一個空閒的執行緒處理任務,然後當任務處理完成之後,該執行緒被重新放回到執行緒池中,供其他的任務使用,當執行緒池中的執行緒都在處理任務時,就沒有空閒執行緒供使用,此時,若有新的任務產生

基於Linux C執行軟體框架實現

之前寫過一篇基於C語言連結串列實現的工作任務註冊與執行,連結如下:https://blog.csdn.net/morixinguan/article/details/77986553後面使用它演變成為了另外一個框架,也就是多執行緒,當時的設計思路主要是為了服務測試程式。搞過R

Linux中epoll+執行實現高併發

伺服器併發模型通常可分為單執行緒和多執行緒模型,這裡的執行緒通常是指“I/O執行緒”,即負責I/O操作,協調分配任務的“管理執行緒”,而實際的請求和任務通常交由所謂“工作者執行緒”處理。通常多執行緒模型下,每個執行緒既是I/O執行緒又是工作者執行緒。所以這裡討論的是,單I/O執行緒+多工作者執行緒的模型,這也

c++11執行實現

咳咳。c++11 加入了執行緒庫,從此告別了標準庫不支援併發的歷史。然而 c++ 對於多執行緒的支援還是比較低階,稍微高階一點的用法都需要自己去實現,譬如執行緒池、訊號量等。執行緒池(thread pool)這個東西,在面試上多次被問到,一般的回答都是:“管理一個任務佇列

c# socket執行實現

伺服器端: PoolServer.cs類 using System; using System.Collections.Generic; using System.Text; using System.Net; using System.Net.Sockets; usi

c++11 執行實現以及示例

執行緒池的使用在工作中非常普遍,對於java,python而言執行緒池使用還是比較方便。去年年底由於工作需要,用c++搭建一套工程程式碼,需要使用執行緒池,但是C++中並沒有現有的執行緒池,為了快速開發,以及程式碼的穩定還是google在github上面找

【原始碼剖析】threadpool —— 基於 pthread 實現簡單執行

部落格新地址:https://github.com/AngryHacker/articles/issues/1#issue-369867252 執行緒池介紹 執行緒池可以說是專案中經常會用到的元件,在這裡假設讀者都有一定的多執行緒基礎,如果沒有的話不妨在這裡進行了解:POSIX

C++實現簡單執行

執行緒池 什麼是執行緒池 在使用執行緒的CS模型中,伺服器端每接收到一個客戶端請求,都會為客戶端建立執行緒資源,當有大量突發性請求時,伺服器來不及為每個客戶端建立執行緒。執行緒每次的建立與銷燬都會耗費伺服器大量資源與時間,可以在伺服器一開始就建立好一堆執行緒,等到客戶端請求來

簡單實現Linux C++多執行的互斥訪問

#include <stdlib.h> #include <string.h> #include <iostream> #include <unistd.h> #include <errno.h> #include <pthrea

c語言執行簡單實現

一、背景 在某種CPU密集型的應用場景中,處理計算任務耗時較多(如影象處理),考慮多核CPU的優勢,若能把計算分擔到多個執行緒中處理則能有效利用CPU; 但是,若過開啟過多的執行緒,執行緒建立銷燬、執行緒間切換所帶來的開銷也不容小覷; 二、相關知識 2.1 思路整理 對

Linux執行實踐(9) --簡單執行的設計與實現

執行緒池的技術背景   在面向物件程式設計中,建立和銷燬物件是很費時間的,因為建立一個物件要獲取記憶體資源或者其它更多資源。在Java中更是如此,虛擬機器將試圖跟蹤每一個物件,以便能夠在物件銷燬後進行垃圾回收。所以提高服務程式效率的一個手段就是儘可能減少建立和銷燬物件的次數,

典型C/S模式___執行實現

一、多執行緒和執行緒池的區別 執行緒池 多執行緒 同時啟動若干個執行緒,持續存在,時刻準備處理請求 來一個請求啟動一個執行緒 響應時間短 響應時間較長 二、執行緒池

簡單Linux C++多執行CLOCK(時鐘)類

剛剛加入CSDN部落格,初來乍到也不知道寫什麼,所以來分享一個自己以前學C++的時候寫的第一個類,一個關於時鐘的簡單的Linux多執行緒CLOCK(時鐘)類: /***********************************************

C++執行實現

最近讀了muduo的原始碼,看了一下其中執行緒池的是實現。其中互斥量、條件變數都是庫裡面自己封裝的,正好現在C++標準庫裡面有對應的類,所以就改造了一下,補充了部分註釋。同時總結了一下條件變數和鎖的使用。程式碼如下: ThreadPool.h #pragma once #incl

白話跨平臺C++執行實現

執行緒池在一個C++專案中是必不可少的。去看任何一個C++開發框架,絕大部分都會實現一個執行緒池。而如今C++11已經成熟,藉助C++標準庫中的執行緒庫std::thread,以及標準庫提供的多執行緒同步神器std::condition_variable(這個已

Socket+執行實現簡單http單檔案伺服器

import java.io.*; import java.net.*; import java.nio.charset.Charset; import java.nio.file.*; import java.util.concurrent.*; impor

C++簡易執行實現

文章目錄 1.執行緒池基本瞭解 2.執行緒池的應用場景 3.實現一個簡單的加法任務的執行緒池 1)首先我們建立一個任務類。 2)執行緒池類 3) 完整程式碼以及測試 1

c++多執行模式下的socket程式設計(執行實現

     socket 程式設計可以說是一個基本的技術掌握,而多個客戶端向服務端傳送請求又是一個非常常見的場景,因此多執行緒模式下的socket程式設計則顯得尤為常見與重要。     本文主要利用執行緒池的技術,來實現多執行緒的模式,執行緒池的優點就不多述了,相信大家都能理

一個用來“拉”任務的簡單執行 c#版

通常用到執行緒池時,會用到“生產者-消費者”模型。 如果專案中不好實現“生產者”這一角色,而是預先開好N條執行緒,然後讓執行緒自己去“拉”任務,“拉”到有任務就處理,然後再“拉”任務,這樣實現起來很簡單,但任務的源頭若是一直沒有任務,這N條執行緒依然是不停地在