1. 程式人生 > >c/c++多執行緒程式設計與無鎖資料結構漫談

c/c++多執行緒程式設計與無鎖資料結構漫談

本文主要針對c/c++,系統主要針對linux。本文引述別人的資料均在引述段落加以宣告。

場景:

thread...1...2...3...:多執行緒遍歷

thread...a...b...c...:多執行緒插入刪除修改

眾所周知的stl是多執行緒不安全的。為何stl不提供執行緒安全的資料結構呢?這個問題我只能姑且猜測:可能stl追求效能的卓越性,再加上容器資料結構的執行緒安全確實太複雜了。

網上常見的執行緒安全的研究都是針對最simple的queue型別的容器。為何常見的show理論實力的部落格均是針對queue型別資料結構呢?想必是因為queue一般不涉及迭代遍歷,我感覺這個原因很靠譜。多執行緒對queue的操作一般都是多執行緒同時對queue進行pop和push。不涉及到一個或者多個執行緒只讀(query),另外一個或者多個執行緒寫操作(更新,刪除,插入)。後者的需求,實現起來很棘手。

那麼先說說queue型別資料結構是如何做lock-free操作的吧。

lock-free queue

CAS操作語句

lock-free queue都是基於CAS操作實現無鎖的。CAS是compare-and-swap的簡寫,意思是比較交換。CAS指令需要CPU和編譯器的支援,現在的CPU大多數是支援CAS指令的。如果是GCC編譯器,則需要GCC4.1.0或更新版本。CAS在GCC中的實現有兩個原子操作。大多數無鎖資料結構都用到了下面兩個函式的前者,其返回bool表明當前的原子操作是否成功,後者返回值是值型別。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
/// @brief Compare And Swap
///        If the current value of *a_ptr is a_oldVal, then write a_newVal into *a_ptr
/// @return true if the comparison is successful and a_newVal was written
#define CAS(a_ptr, a_oldVal, a_newVal) __sync_bool_compare_and_swap(a_ptr, a_oldVal, a_newVal)

下面兩段引用參考這篇部落格的描述(十分建議讀者閱讀原部落格,可以幫助理解。但是,我強烈建議讀者別用這個程式碼商用,有bug,不適用map,list等),關於CAS佇列的進出操作。

EnQueue(x) //進佇列
{
    //準備新加入的結點資料
    q = new record();
    q->value = x;
    q->next = NULL;
 
    do {
        p = tail; //取連結串列尾指標的快照
    } while( CAS(p->next, NULL, q) != TRUE); //如果沒有把結點鏈在尾指標上,再試
 
    CAS(tail, p, q); //置尾結點
}
我們可以看到,程式中的那個 do- while 的 Re-Try-Loop。就是說,很有可能我在準備在佇列尾加入結點時,別的執行緒已經加成功了,於是tail指標就變了,於是我的CAS返回了false,於是程式再試,直到試成功為止。這個很像我們的搶電話熱線的不停重播的情況。
你會看到,為什麼我們的“置尾結點”的操作(第12行)不判斷是否成功,因為:
1、如果有一個執行緒T1,它的while中的CAS如果成功的話,那麼其它所有的 隨後執行緒的CAS都會失敗,然後就會再迴圈,
2、此時,如果T1 執行緒還沒有更新tail指標,其它的執行緒繼續失敗,因為tail->next不是NULL了。
3、直到T1執行緒更新完tail指標,於是其它的執行緒中的某個執行緒就可以得到新的tail指標,繼續往下走了。
這裡有一個潛在的問題——如果T1執行緒在用CAS更新tail指標的之前,執行緒停掉或是掛掉了,那麼其它執行緒就進入死迴圈了。下面是改良版的EnQueue()
EnQueue(x) //進佇列改良版
{
    q = new record();
    q->value = x;
    q->next = NULL;
 
    p = tail;
    oldp = p
    do {
        while (p->next != NULL)
            p = p->next;
    } while( CAS(p.next, NULL, q) != TRUE); //如果沒有把結點鏈在尾上,再試
 
    CAS(tail, oldp, q); //置尾結點
}
我們讓每個執行緒,自己fetch 指標 p 到連結串列尾。但是這樣的fetch會很影響效能。而通實際情況看下來,99.9%的情況不會有執行緒停轉的情況,所以,更好的做法是,你可以接合上述的這兩個版本,如果retry的次數超了一個值的話(比如說3次),那麼,就自己fetch指標。
我們解決了EnQueue,我們再來看看DeQueue的程式碼:
DeQueue() //出佇列
{
    do{
        p = head;
        if (p->next == NULL){
            return ERR_EMPTY_QUEUE;
        }
    while( CAS(head, p, p->next) != TRUE );
    return p->next->value;
}

關於通用無鎖資料結構

如果讀者是用c語言實現的自定義連結串列等結構,那無需看本節關於通用無鎖資料結構的描述,因為本節內容是C++相關。

方案一:stl+鎖

stl/boost+鎖是最常規的方案之一。如果需求滿足(一個寫執行緒,多個讀執行緒),可以考慮boost::shared_mutex。

方案二:TBB

TBB庫貌似和很多其他Intel的庫一樣,不出名。TBB是Threading Building Blocks@Intel 的縮寫。

TBB的併發容器通過下面的方法做到高度並行操作:
細粒度鎖(Fine-grained locking):使用細粒度鎖,容器上的多執行緒操作只有同時存取同一位置時才會鎖定,如果是同時存取不同位置,可以並行處理。
免鎖演算法(Lock-free algorithms):使用免鎖演算法,不同執行緒的評估並校正與其它執行緒之間的相互影響。
和std::map一樣,concurrent_hash_map也是一個std::pair<const Key,T>的容器。為了避免出現競爭,我們不能直接存放散列表裡的單元資料,而是使用accessor或const_accessor。
accessor是std::pair的智慧指標,它負責對散列表中各單元的更新,只要它指向了一個單元,其它嘗試對這個單元的操作就會被鎖定直到accessor完成。const_accessor類似,不過它是隻讀的,多個const_accessor可以指向同一單元,這在頻繁讀取和少量更新的情形下能極大地提高併發性。

以下程式碼為TBB的sample裡面concurrent_hash_map使用範例

/*
    Copyright 2005-2014 Intel Corporation.  All Rights Reserved.

    This file is part of Threading Building Blocks. Threading Building Blocks is free software;
    you can redistribute it and/or modify it under the terms of the GNU General Public License
    version 2  as  published  by  the  Free Software Foundation.  Threading Building Blocks is
    distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
    implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
    See  the GNU General Public License for more details.   You should have received a copy of
    the  GNU General Public License along with Threading Building Blocks; if not, write to the
    Free Software Foundation, Inc.,  51 Franklin St,  Fifth Floor,  Boston,  MA 02110-1301 USA

    As a special exception,  you may use this file  as part of a free software library without
    restriction.  Specifically,  if other files instantiate templates  or use macros or inline
    functions from this file, or you compile this file and link it with other files to produce
    an executable,  this file does not by itself cause the resulting executable to be covered
    by the GNU General Public License. This exception does not however invalidate any other
    reasons why the executable file might be covered by the GNU General Public License.
*/

// Workaround for ICC 11.0 not finding __sync_fetch_and_add_4 on some of the Linux platforms.
#if __linux__ && defined(__INTEL_COMPILER)
#define __sync_fetch_and_add(ptr,addend) _InterlockedExchangeAdd(const_cast<void*>(reinterpret_cast<volatile void*>(ptr)), addend)
#endif
#include <string>
#include <cstring>
#include <cctype>
#include <cstdlib>
#include <cstdio>
#include "tbb/concurrent_hash_map.h"
#include "tbb/blocked_range.h"
#include "tbb/parallel_for.h"
#include "tbb/tick_count.h"
#include "tbb/task_scheduler_init.h"
#include "tbb/tbb_allocator.h"
#include "../../common/utility/utility.h"


//! String type with scalable allocator.
/** On platforms with non-scalable default memory allocators, the example scales 
    better if the string allocator is changed to tbb::tbb_allocator<char>. */
typedef std::basic_string<char,std::char_traits<char>,tbb::tbb_allocator<char> > MyString;

using namespace tbb;
using namespace std;

//! Set to true to counts.
static bool verbose = false;
static bool silent = false;
//! Problem size
long N = 1000000;
const int size_factor = 2;

//! A concurrent hash table that maps strings to ints.
typedef concurrent_hash_map<MyString,int> StringTable;

//! Function object for counting occurrences of strings.
struct Tally {
    StringTable& table;
    Tally( StringTable& table_ ) : table(table_) {}
    void operator()( const blocked_range<MyString*> range ) const {
        for( MyString* p=range.begin(); p!=range.end(); ++p ) {
            StringTable::accessor a;
            table.insert( a, *p );
            a->second += 1;
        }
    }
};

static MyString* Data;

static void CountOccurrences(int nthreads) {
    StringTable table;

    tick_count t0 = tick_count::now();
    parallel_for( blocked_range<MyString*>( Data, Data+N, 1000 ), Tally(table) );
    tick_count t1 = tick_count::now();

    int n = 0;
    for( StringTable::iterator i=table.begin(); i!=table.end(); ++i ) {
        if( verbose && nthreads )
            printf("%s %d\n",i->first.c_str(),i->second);
        n += i->second;
    }

    if ( !silent ) printf("total = %d  unique = %u  time = %g\n", n, unsigned(table.size()), (t1-t0).seconds());
}

/// Generator of random words

struct Sound {
    const char *chars;
    int rates[3];// begining, middle, ending
};
Sound Vowels[] = {
    {"e", {445,6220,1762}}, {"a", {704,5262,514}}, {"i", {402,5224,162}}, {"o", {248,3726,191}},
    {"u", {155,1669,23}}, {"y", {4,400,989}}, {"io", {5,512,18}}, {"ia", {1,329,111}},
    {"ea", {21,370,16}}, {"ou", {32,298,4}}, {"ie", {0,177,140}}, {"ee", {2,183,57}},
    {"ai", {17,206,7}}, {"oo", {1,215,7}}, {"au", {40,111,2}}, {"ua", {0,102,4}},
    {"ui", {0,104,1}}, {"ei", {6,94,3}}, {"ue", {0,67,28}}, {"ay", {1,42,52}},
    {"ey", {1,14,80}}, {"oa", {5,84,3}}, {"oi", {2,81,1}}, {"eo", {1,71,5}},
    {"iou", {0,61,0}}, {"oe", {2,46,9}}, {"eu", {12,43,0}}, {"iu", {0,45,0}},
    {"ya", {12,19,5}}, {"ae", {7,18,10}}, {"oy", {0,10,13}}, {"ye", {8,7,7}},
    {"ion", {0,0,20}}, {"ing", {0,0,20}}, {"ium", {0,0,10}}, {"er", {0,0,20}}
};
Sound Consonants[] = {
    {"r", {483,1414,1110}}, {"n", {312,1548,1114}}, {"t", {363,1653,251}}, {"l", {424,1341,489}},
    {"c", {734,735,260}}, {"m", {732,785,161}}, {"d", {558,612,389}}, {"s", {574,570,405}},
    {"p", {519,361,98}}, {"b", {528,356,30}}, {"v", {197,598,16}}, {"ss", {3,191,567}},
    {"g", {285,430,42}}, {"st", {142,323,180}}, {"h", {470,89,30}}, {"nt", {0,350,231}},
    {"ng", {0,117,442}}, {"f", {319,194,19}}, {"ll", {1,414,83}}, {"w", {249,131,64}},
    {"k", {154,179,47}}, {"nd", {0,279,92}}, {"bl", {62,235,0}}, {"z", {35,223,16}},
    {"sh", {112,69,79}}, {"ch", {139,95,25}}, {"th", {70,143,39}}, {"tt", {0,219,19}},
    {"tr", {131,104,0}}, {"pr", {186,41,0}}, {"nc", {0,223,2}}, {"j", {184,32,1}},
    {"nn", {0,188,20}}, {"rt", {0,148,51}}, {"ct", {0,160,29}}, {"rr", {0,182,3}},
    {"gr", {98,87,0}}, {"ck", {0,92,86}}, {"rd", {0,81,88}}, {"x", {8,102,48}},
    {"ph", {47,101,10}}, {"br", {115,43,0}}, {"cr", {92,60,0}}, {"rm", {0,131,18}},
    {"ns", {0,124,18}}, {"sp", {81,55,4}}, {"sm", {25,29,85}}, {"sc", {53,83,1}},
    {"rn", {0,100,30}}, {"cl", {78,42,0}}, {"mm", {0,116,0}}, {"pp", {0,114,2}},
    {"mp", {0,99,14}}, {"rs", {0,96,16}}, /*{"q", {52,57,1}},*/ {"rl", {0,97,7}},
    {"rg", {0,81,15}}, {"pl", {56,39,0}}, {"sn", {32,62,1}}, {"str", {38,56,0}},
    {"dr", {47,44,0}}, {"fl", {77,13,1}}, {"fr", {77,11,0}}, {"ld", {0,47,38}},
    {"ff", {0,62,20}}, {"lt", {0,61,19}}, {"rb", {0,75,4}}, {"mb", {0,72,7}},
    {"rc", {0,76,1}}, {"gg", {0,74,1}}, {"pt", {1,56,10}}, {"bb", {0,64,1}},
    {"sl", {48,17,0}}, {"dd", {0,59,2}}, {"gn", {3,50,4}}, {"rk", {0,30,28}},
    {"nk", {0,35,20}}, {"gl", {40,14,0}}, {"wh", {45,6,0}}, {"ntr", {0,50,0}},
    {"rv", {0,47,1}}, {"ght", {0,19,29}}, {"sk", {23,17,5}}, {"nf", {0,46,0}},
    {"cc", {0,45,0}}, {"ln", {0,41,0}}, {"sw", {36,4,0}}, {"rp", {0,36,4}},
    {"dn", {0,38,0}}, {"ps", {14,19,5}}, {"nv", {0,38,0}}, {"tch", {0,21,16}},
    {"nch", {0,26,11}}, {"lv", {0,35,0}}, {"wn", {0,14,21}}, {"rf", {0,32,3}},
    {"lm", {0,30,5}}, {"dg", {0,34,0}}, {"ft", {0,18,15}}, {"scr", {23,10,0}},
    {"rch", {0,24,6}}, {"rth", {0,23,7}}, {"rh", {13,15,0}}, {"mpl", {0,29,0}},
    {"cs", {0,1,27}}, {"gh", {4,10,13}}, {"ls", {0,23,3}}, {"ndr", {0,25,0}},
    {"tl", {0,23,1}}, {"ngl", {0,25,0}}, {"lk", {0,15,9}}, {"rw", {0,23,0}},
    {"lb", {0,23,1}}, {"tw", {15,8,0}}, /*{"sq", {15,8,0}},*/ {"chr", {18,4,0}},
    {"dl", {0,23,0}}, {"ctr", {0,22,0}}, {"nst", {0,21,0}}, {"lc", {0,22,0}},
    {"sch", {16,4,0}}, {"ths", {0,1,20}}, {"nl", {0,21,0}}, {"lf", {0,15,6}},
    {"ssn", {0,20,0}}, {"xt", {0,18,1}}, {"xp", {0,20,0}}, {"rst", {0,15,5}},
    {"nh", {0,19,0}}, {"wr", {14,5,0}}
};
const int VowelsNumber = sizeof(Vowels)/sizeof(Sound);
const int ConsonantsNumber = sizeof(Consonants)/sizeof(Sound);
int VowelsRatesSum[3] = {0,0,0}, ConsonantsRatesSum[3] = {0,0,0};

int CountRateSum(Sound sounds[], const int num, const int part)
{
    int sum = 0;
    for(int i = 0; i < num; i++)
        sum += sounds[i].rates[part];
    return sum;
}

const char *GetLetters(int type, const int part)
{
    Sound *sounds; int rate, i = 0;
    if(type & 1)
        sounds = Vowels, rate = rand() % VowelsRatesSum[part];
    else
        sounds = Consonants, rate = rand() % ConsonantsRatesSum[part];
    do {
        rate -= sounds[i++].rates[part];
    } while(rate > 0);
    return sounds[--i].chars;
}

static void CreateData() {
    for(int i = 0; i < 3; i++) {
        ConsonantsRatesSum[i] = CountRateSum(Consonants, ConsonantsNumber, i);
        VowelsRatesSum[i] = CountRateSum(Vowels, VowelsNumber, i);
    }
    for( int i=0; i<N; ++i ) {
        int type = rand();
        Data[i] = GetLetters(type++, 0);
        for( int j = 0; j < type%size_factor; ++j )
            Data[i] += GetLetters(type++, 1);
        Data[i] += GetLetters(type, 2);
    }
    MyString planet = Data[12]; planet[0] = toupper(planet[0]);
    MyString helloworld = Data[0]; helloworld[0] = toupper(helloworld[0]);
    helloworld += ", "+Data[1]+" "+Data[2]+" "+Data[3]+" "+Data[4]+" "+Data[5];
    if ( !silent ) printf("Message from planet '%s': %s!\nAnalyzing whole text...\n", planet.c_str(), helloworld.c_str());
}

int main( int argc, char* argv[] ) {
    try {
        tbb::tick_count mainStartTime = tbb::tick_count::now();
        srand(2);

        //! Working threads count
        // The 1st argument is the function to obtain 'auto' value; the 2nd is the default value
        // The example interprets 0 threads as "run serially, then fully subscribed"
        utility::thread_number_range threads(tbb::task_scheduler_init::default_num_threads,0);

        utility::parse_cli_arguments(argc,argv,
            utility::cli_argument_pack()
            //"-h" option for displaying help is present implicitly
            .positional_arg(threads,"n-of-threads",utility::thread_number_range_desc)
            .positional_arg(N,"n-of-strings","number of strings")
            .arg(verbose,"verbose","verbose mode")
            .arg(silent,"silent","no output except elapsed time")
            );

        if ( silent ) verbose = false;

        Data = new MyString[N];
        CreateData();

        if ( threads.first ) {
            for(int p = threads.first;  p <= threads.last; p = threads.step(p)) {
                if ( !silent ) printf("threads = %d  ", p );
                task_scheduler_init init( p );
                CountOccurrences( p );
            }
        } else { // Number of threads wasn't set explicitly. Run serial and parallel version
            { // serial run
                if ( !silent ) printf("serial run   ");
                task_scheduler_init init_serial(1);
                CountOccurrences(1);
            }
            { // parallel run (number of threads is selected automatically)
                if ( !silent ) printf("parallel run ");
                task_scheduler_init init_parallel;
                CountOccurrences(0);
            }
        }

        delete[] Data;

        utility::report_elapsed_time((tbb::tick_count::now() - mainStartTime).seconds());

        return 0;
    } catch(std::exception& e) {
        std::cerr<<"error occurred. error text is :\"" <<e.what()<<"\"\n";
    }
}