1. 程式人生 > >Linux C++ 執行緒池

Linux C++ 執行緒池

標頭檔案定義:

話不多說,先上程式碼:
注意:本文程式碼來源於https://github.com/progschj/ThreadPool.git
不需要註釋,想要乾淨程式碼的可以直接上GitHub去找資源

標頭檔案 執行緒池類程式碼
/*
 * Date : 2018/7/24
 * Author : wqw
 * Content : ThreadPool 執行緒池實現
 * Origin : https://github.com/progschj/ThreadPool.git
 * File : ThreadPool.h
 */
 
#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

class ThreadPool{
public:
    ThreadPool(size_t);
    // auto 自動推導型別, Args... 不定形參列表, -> xxx 返回值型別
    // 新增任務
    template <class F, class... Args>
    auto enqueue(F&& f, Args&&... args)
            -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();

private:
    std::vector<std::thread> workers;   // 執行任務的執行緒容器
    // std::function<> 函式包裝, 任務佇列
    std::queue<std::function<void()>> tasks;
    std::mutex queue_mutex;             // 互斥鎖
    std::condition_variable condition;  // 條件鎖
    bool stop;                          // 判斷是否停止執行緒池
};

// 建構函式 開啟一定數量的執行緒
inline ThreadPool::ThreadPool(size_t threads) : stop(false) {
    for (size_t i = 0; i < threads; i++) {
        // 新增工作執行緒到workers
        workers.emplace_back(
            // lambda 匿名函式
            [this] {
                // 讓執行緒持續工作,
                for (;;) {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);   // 互斥鎖開啟,直到生命週期結束
                        // 若任務列表 tasks 為空,則暫時阻塞執行緒
                        this->condition.wait(lock,
                                [this]{ return this->stop || !this->tasks.empty(); });
                        if (this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());  // 將tasks最前面的以右值形式賦值給task
                        this->tasks.pop();  // 將tasks剛剛賦值給task的元素彈出
                    }
                    task();  // 讓執行緒執行函式
                }
            }
        );
    }
}

// 新增工作
// 返回值為該工作(即將要執行的任務函式)的返回值
template <class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
    -> std::future<typename std::result_of<F(Args...)>::type> {
    // 自定義返回值型別 類似於typedef的用法
    using return_type = typename std::result_of<F(Args...)>::type;
    // forward() 保證完美轉發,即引數左值右值型別不變
    // std::packaged_task與std::promise相似 用於非同步呼叫,常與std::future共同使用
    // std::bind 包裝器 對函式F<Args>進行包裝
    // make_shared 智慧指標 製作一個智慧指標指向包裝過的F<Args>
    auto task = std::make_shared<std::packaged_task<return_type()>> (
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
            );
    // std::future 併發程式設計 繫結task任務
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(this->queue_mutex);
        // 判斷ThreadPool是否停止
        if (stop)
            throw std::runtime_error("enqueue on stopped ThreadPool!\n");
        // 新增任務到任務列表
        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one(); // 喚醒一個任務
    return res;
}

// 解構函式
inline ThreadPool::~ThreadPool() {
    {
        std::unique_lock<std::mutex> lock(this->queue_mutex);
        stop = true;
    }
    condition.notify_all();     // 喚醒所有任務
    for (std::thread &worker: workers)
        worker.join();          // 執行緒自動加上互斥鎖執行, detach則是併發執行
}

#endif

呼叫:

// example 測試程式碼
/*
 * Date : 2018/7/24
 * Author : wqw
 * Content : ThreadPool 執行緒池實現類
 * Origin : https://github.com/progschj/ThreadPool.git
 * File : main.cpp
 */

#include <iostream>
#include <vector>
#include <chrono>

#include "Thread_pool.h"


int main()
{

    ThreadPool pool(4);
    std::vector< std::future<int> > results;

    for(int i = 0; i < 8; ++i) {
        results.emplace_back(
                pool.enqueue([i] {
                    std::cout << "hello " << i << std::endl;
                    std::this_thread::sleep_for(std::chrono::seconds(1));
                    std::cout << "world " << i << std::endl;
                    return i*i;
                })
        );
    }

    for(auto && result: results)
        std::cout << result.get() << ' ';
    std::cout << std::endl;

    return 0;
}

執行緒池的概念:

        執行緒池是一個類似於銀行辦理業務的排隊系統一樣的東西。先開一定數量的執行緒(如:銀行業務視窗),若執行緒沒任何任務則阻塞全部執行緒,但不關閉(如:銀行暫時沒人辦理業務,但是工作人員總不能下班吧,還是要待機)。若需要執行的任務很多,則我們需要建立一個佇列,讓佇列裡的任務先等候,等到有執行緒空閒時則從佇列中取任務(如:銀行人很多就要排隊一個一個來)。

該程式碼的執行緒池的思路:這個執行緒池類主要由三大部分(三個成員函式)組成:

建構函式 ThreadPool():開啟一定數量的執行緒,每開啟一個執行緒,讓該執行緒進入死迴圈,對接下來的操作,先利用互斥鎖先加鎖,再利用條件鎖判斷執行緒任務列表是否為空,若為空即阻塞該執行緒(就是沒人辦理業務時進入待機狀態)。接下來,從任務列表中取任務,然後解開互斥鎖,讓其執行完後再重複以上操作。對於開啟的每一個執行緒都是如此。

新增任務函式 enqueue():先獲取任務,加互斥鎖,將該任務加入任務列表,解鎖,然後喚醒一個任務,讓其進行等待。

解構函式 ~ThreadPool():喚醒所有的工作,讓執行緒一個個的執行。

 

 

1. auto 自動推導型別

2. ... 可變模板引數 這篇文章有關其概念使用方法及其引數展開 https://www.cnblogs.com/qicosmos/p/4325949.html

3. function,bind 等是函式包裝器,這兩個有類似之處都是對函式進行包裝

使用方法部分簡介可以參考該文章 https://blog.csdn.net/liukang325/article/details/53668046

4. mutex 鎖類 詳情自行查閱api

5. [] { ... } lambda 匿名函式表示式 此文章有詳細的介紹 https://www.cnblogs.com/DswCnblog/p/5629165.html

6. &&,move(),forward()  這裡的&&不是指 且 的意思,是右值引用。這裡涉及一個左值右值的問題,move()和forward() ,就是圍繞左值引用,右值引用而設立的專門的函式。是一個很麻煩但是又挺重要的知識點。

這個文章寫的比較通俗易懂 http://www.cnblogs.com/qicosmos/p/4283455.html#3995775

這篇文章是更深入的講解 http://www.cnblogs.com/catch/p/3507883.html (這篇我都看到迷迷糊糊^.^)

7. future, promise, package_task 這幾個是關於執行緒方面的輔助函式,future與(promise,package)配合使用,用於非同步讀取,追蹤,正在的執行緒中某些變數。這篇文章有其概念的簡介及聯絡 https://blog.csdn.net/jiange_zh/article/details/51602938

8. make_share 用於製作智慧指標 share_ptr C++ primer 6 有介紹。該智慧指標是會自動釋放記憶體故比較安全。

該文章有其簡單概述及使用 https://blog.csdn.net/yagerfgcs/article/details/72886630