1. 程式人生 > >n個執行緒併發去完成m個任務的C++11實現

n個執行緒併發去完成m個任務的C++11實現

有一個需求,有m個計算任務,每個計算任務都有結果,僅有n個執行緒,讓這n個執行緒去完成這m個任務,並將計算結果返回。

其中n<m,計算任務用vector<function<T> >來表示,返回結果假設統一為int,用一個vector<int>來記錄各個計算任務的結果。

每個任務的計算量不同,有的很長,有的很短,要求儘可能快的完成所有計算任務。

思路

大體的思路都知道,多執行緒併發執行多個任務。難點在於如何進行排程,如何進行任務分配。

錯誤的想法:

1、做完1個任務就等待分配。

2、用佇列。做完一個任務出佇列。

每個執行緒執行完一個任務就去呼叫join之類的方法然後去給其分配下一個新任務。這是一個死衚衕。

這種想法是想人為去設計一個演算法然後在不同時刻給不同執行緒分配新的任務。不合適。

行得通的想法:

1、做完1個任務看還有沒有可做的任務,有就拿過來繼續做。

2、用陣列,多個執行緒共享,當前未完成任務索引,多執行緒共享,各個執行緒區別在於取不同下標的任務完成得到計算結果後放到對應下標的結果陣列中。

n個執行緒均去任務佇列中取任務,然後執行,執行完當前任務後,將任務結果放到結果陣列中,再執行下一個待執行的任務。

也即任一個執行緒在執行完一個任務後並不直接退出join之類,而是判斷是否還有任務。如果還有任務,就執行,沒有就退出。

這樣就避免了被動分配任務(站線上程的角度,一個執行緒被分配一個任務)的問題,而讓執行緒主動去申請任務,直到所有任務均完成。

這樣下來,多個任務的執行函式邏輯都是一樣的。共同從陣列中取任務,將結果寫到共同的結果陣列中。由於取的是陣列中第幾個任務可以記錄下來,那將其放到結果陣列對應位置即可。

程式碼

標頭檔案

#ifndef CONCURRENCY_H
#define CONCURRENCY_H

#include <bits/stdc++.h>
using namespace std;

template<class T>
class Concurrency
{
    vector<function<T>> &m_tasks__;		///<reference to tasks vector
    vector<int> &m_results__;			///<reference to result vector

    atomic<int> m_aiCurJobIdx__;

    int work();

    static int payload();
public:
    Concurrency(vector<function<T> > &vt,vector<int> &vr,int num);

    static void test();
};

#endif // CONCURRENCY_H

原始檔

#include "Concurrency.h"
#include "common.h"

namespace _concurrency {
    static ostringstream logs;
    static mutex mut;
}

template<class T>
int Concurrency<T>::work()
{
    static atomic<int> id(0);
    int myid=id++;
    int idx;
    atomic<int> count(0);		//<if not atomic ++ op may be optimized after oss<<
    ostringstream oss;
    while((idx=m_aiCurJobIdx__++)<m_tasks__.size()){
        ++count;
        m_results__[idx]=m_tasks__[idx]();
//        oss<<"thread id:"<<myid<<" done job "<<idx<<endl;
    }
    oss<<"-------"<<myid<<" done "<<count.load(memory_order_acquire)<<" jobs"<<endl<<endl;
    lock_guard<mutex> lock(_concurrency::mut);    ///<RAII
    _concurrency::logs<<oss.str();
    return myid;
}

template<class T>
int Concurrency<T>::payload()
{
    static atomic<int> sai(0);

    int ret=sai++;
    int sleepTime=random()%1000;
    cout<<"sleep "<<sleepTime<<"ms,return val:"<<ret<<endl;
    this_thread::sleep_for(chrono::milliseconds(sleepTime));
    return ret;
}

template<class T>
Concurrency<T>::Concurrency(vector<function<T> > &vt, vector<int> &vr, int num):m_tasks__(vt),m_results__(vr),m_aiCurJobIdx__(0)
{
    vector<thread> vth;
    for(int i=0;i<num;++i){
        vth.emplace_back(thread(&Concurrency::work,this));
    }

    for(auto &t:vth){
        t.join();
    }

}

template<class T>
void Concurrency<T>::test()
{
    int nJobNum=100,nThreadNum=5;
    vector<function<T> > jobs(nJobNum,Concurrency::payload);
    vector<int> results(nJobNum,-1);
    Concurrency(jobs,results,nThreadNum);

    print_arr_raw(results,results.size());
    cout<<endl;
    cout<<_concurrency::logs.str().c_str()<<endl;

}

直接在外部呼叫test()方法即可看到結果。

注意:

1、由於模板方法實現放在了cpp檔案中,所以在外面使用的時候直接include原始檔。簡單粗暴的方法實現模板類頭原始檔分離。也有別的繁瑣一些的方法,比如在原始檔中預先宣告幾個特化模板例項,確保編譯器能生成對應特化的模板例項類。

2、多執行緒中原子操作。執行緒共享變數所依賴的區域性變數,也需要加上保護,否則會出現共享變數會取到一箇舊值的現象。比如

int count=0;
++count;
oss<<"-------"<<myid<<" done "<<count<<" jobs"<<endl<<endl;

count是區域性變數,oss是執行緒間共享變數,count不加保護,oss取到的仍有可能是舊值0。

3、優先用鎖,而不用mutex。因為mutex比較原始,不能自己釋放,必須顯式呼叫unlock才行,當unlock之前丟擲了異常,或者沒有unlock,那這個資源就鎖死了。用lock_guard,這是一種RAII(Resource Acquisition Is Initialization)的實體例項。在建構函式中加鎖,在解構函式中解鎖。

4、由於直接#inlcude原始檔。所以原始檔中的全域性變數,或者加static表示僅在檔案內作用域,保險的方法還是加上名稱空間,防止重名或者出現變數重複定義的錯誤。