1. 程式人生 > >多執行緒讀寫資料方法之讀寫鎖方法與shared_ptr+互斥鎖方法的比較

多執行緒讀寫資料方法之讀寫鎖方法與shared_ptr+互斥鎖方法的比較

對共享資源進行多執行緒讀寫操作有很多方法,本文舉出兩種方法並進行對比。

一:讀寫鎖方法。執行緒進行讀操作時以讀的方式加鎖,執行緒進行寫操作時用寫的方式加鎖。

二:另外一種比較新奇的方法是使用shared_ptr+互斥鎖。shared_ptr是一種用引用計數實現的智慧指標,當計數為零時,它接管的物件會被銷燬。利用這一點可以與互斥鎖配合使用實現另外一種比讀寫鎖更為高效的方法。方法如下:

1:對於read端,在讀之前加互斥鎖,把引用計數加1,解除互斥鎖(臨界區非常小),讀完之後計數減1,這樣保證在讀期間其引用計數大於1,可以防止併發寫。

2:對於write端,對整個寫函式加互斥鎖,如果引用計數為1,這時可以安全修改共享物件,不必擔心有人讀它;當引用計數大於1時,生成一個新的資料物件(舊物件的深拷貝),並用一個shared_ptr進行保管,然後與原來的shared_ptr進行交換,對新物件進行寫操作。這時舊物件只被讀函式中的shared_ptr保管,當那些函式都執行完畢時,舊物件的引用計數成為零,就物件被自動銷燬。

這種方法的好處就是在讀的時候可以去寫,從而會使操作時間有所縮短,缺點是寫時有可能會拷貝舊資料,這點到不必太多擔心。下面的程式驗證了寫時拷貝資料發生的概率在1%左右,而且整體時間也會比讀寫鎖方法有所縮短。

實驗模擬了股票的購買和查詢過程,實驗採用四執行緒,兩個執行緒買股票(寫),兩個執行緒查詢股票(讀)。


CustomerData.h。shared_ptr+互斥鎖方法

#ifndef CUSTOMERDATA_H_INCLUDED
#define CUSTOMERDATA_H_INCLUDED

#include"MutexLock.h"
#include"MutexLockGuard.h"
#include<boost/shared_ptr.hpp>
#include<boost/noncopyable.hpp>
#include<vector>
#include<map>
#include<iostream>
using namespace std;

typedef struct MyStock{
    string stockName;
    int itemCnt;
}MyStock;

class CustomerData{
private:
    typedef vector<MyStock> StockList;
    typedef boost::shared_ptr<StockList> Stoptr;
    typedef map<string,StockList> Map;
    typedef boost::shared_ptr<Map> Maptr;
    Maptr data_;
    MutexLock mutex_;
    static int CopyTime;
    static int WriteTime;
    public:
    CustomerData():data_(new Map){}
    void traverse(){
        Maptr tmp;
        {
            MutexLockGuard lock(mutex_);
            tmp=data_;
            cout<<"Copy Time="<<CopyTime<<endl;
            cout<<"Write Time="<<WriteTime<<endl;
            cout<<(double)CopyTime*100/WriteTime<<"%"<<endl;
        }
        for(Map::iterator p=data_->begin();p!=data_->end();++p){
            cout<<"-----------------------------"<<endl;
            cout<<"Customer:"<<p->first<<endl;
            for(StockList::iterator q=p->second.begin();q!=p->second.end();++q)
                cout<<"\t"<<q->stockName<<":"<<q->itemCnt<<endl;
            cout<<"-----------------------------"<<endl;
        }
    }

    int queryCnt(const string& customer,const string& stock){
        Maptr tmp;
        {
            MutexLockGuard lock(mutex_);
            tmp=data_;
        }
        Map::iterator p=tmp->find(customer);
        if(p==tmp->end())return -1;
        for(StockList::iterator q=p->second.begin();q!=p->second.end();++q)
            if(q->stockName==stock)return q->itemCnt;
        return -1;
    }

    void buyStock(const string& customer,const MyStock& stock){
        MutexLockGuard lock(mutex_);
        ++WriteTime;
        if(!data_.unique()){
            Maptr tmp(new Map(*data_));
            tmp.swap(data_);
            ++CopyTime;
        }
        assert(data_.unique());
        Map::iterator p=data_->find(customer);
        if(p==data_->end()){
            (*data_)[customer].push_back(stock);
            return;
        }else{
            for(StockList::iterator q=p->second.begin();q!=p->second.end();++q){
                if(q->stockName==stock.stockName){
                    q->itemCnt+=stock.itemCnt;
                    return;
                    }
            }
            (*data_)[customer].push_back(stock);
        }
    }

    void addCustomer(const string& customer){
        MutexLockGuard lock(mutex_);
        if(!data_.unique()){
            Maptr tmp(new Map(*data_));
            tmp.swap(data_);
        }
        assert(data_.unique());
        (*data_)[customer];
    }
};

int CustomerData::CopyTime=0;
int CustomerData::WriteTime=0;


#endif // CUSTOMERDATA_H_INCLUDED
CustomerDataReadWriteLock.h。讀寫鎖方法
#ifndef CUSTOMERDATAREADWRITELOCK_H_INCLUDED
#define CUSTOMERDATAREADWRITELOCK_H_INCLUDED


#include"ReadWriteLock.h"
#include<boost/shared_ptr.hpp>
#include<boost/weak_ptr.hpp>
#include<boost/noncopyable.hpp>
#include<vector>
#include<iostream>
#include"CustomerData.h"
using namespace std;


class CustomerDataReadWriteLock{
private:
    typedef vector<MyStock> StockList;
    typedef map<string,StockList> Map;
    typedef boost::shared_ptr<Map> Maptr;
    Maptr data_;
    ReadWriteLock mutex_;
    static int CopyTime;
    static int WriteTime;
    public:
    CustomerDataReadWriteLock():data_(new Map){}
    void traverse(){
        Maptr tmp;
        ReadWriteLockGuard lock(mutex_);
        lock.ReadLock();
        tmp=data_;
        cout<<"Copy Time="<<CopyTime<<endl;
        cout<<"Write Time="<<WriteTime<<endl;

        for(Map::iterator p=data_->begin();p!=data_->end();++p){
            cout<<"-----------------------------"<<endl;
            cout<<"Customer:"<<p->first<<endl;
            for(StockList::iterator q=p->second.begin();q!=p->second.end();++q)
                cout<<"\t"<<q->stockName<<":"<<q->itemCnt<<endl;
            cout<<"-----------------------------"<<endl;
        }
    }

    int queryCnt(const string& customer,const string& stock){
        ReadWriteLockGuard lock(mutex_);
        lock.ReadLock();

        Map::iterator p=data_->find(customer);
        if(p==data_->end())return -1;
        for(StockList::iterator q=p->second.begin();q!=p->second.end();++q)
            if(q->stockName==stock)return q->itemCnt;
        return -1;
    }

    void buyStock(const string& customer,const MyStock& stock){
        ReadWriteLockGuard lock(mutex_);
        lock.WriteLock();
        ++WriteTime;
        Map::iterator p=data_->find(customer);
        if(p==data_->end()){
            (*data_)[customer].push_back(stock);
            return;
        }else{
            for(StockList::iterator q=p->second.begin();q!=p->second.end();++q){
                if(q->stockName==stock.stockName){
                    q->itemCnt+=stock.itemCnt;
                    return;
                    }
            }
            (*data_)[customer].push_back(stock);
        }
    }

    void addCustomer(const string& customer){
        ReadWriteLockGuard lock(mutex_);
        lock.WriteLock();
        (*data_)[customer];
    }
};

int CustomerDataReadWriteLock::CopyTime=0;
int CustomerDataReadWriteLock::WriteTime=0;


#endif // CUSTOMERDATAREADWRITELOCK_H_INCLUDED
MutexLock.h。互斥鎖
#ifndef MUTEXLOCK_H_INCLUDED
#define MUTEXLOCK_H_INCLUDED

#include<boost/noncopyable.hpp>
#include<pthread.h>
#include<sys/types.h>
#include<assert.h>
class MutexLock:private boost::noncopyable{
    private:
    pthread_mutex_t mutex_;
    public:
    MutexLock(){
        pthread_mutex_init(&mutex_,NULL);
    }
    ~MutexLock(){
        pthread_mutex_destroy(&mutex_);
    }
   void lock(){
        pthread_mutex_lock(&mutex_);
   }
   void unlock(){
        pthread_mutex_unlock(&mutex_);
   }
   pthread_mutex_t* getPthreadMutex(){
        return &mutex_;
   }
};

#endif // MUTEXLOCK_H_INCLUDED
MutexLockGuard.h。使用RAII方法使用MutexLock
#ifndef MUTEXLOCKGUARD_H_INCLUDED
#define MUTEXLOCKGUARD_H_INCLUDED

#include"MutexLock.h"
class MutexLockGuard:private boost::noncopyable{
    private:
    MutexLock& mutex_;
    public:
    explicit MutexLockGuard(MutexLock& mutex):mutex_(mutex){
        mutex_.lock();
    }
    ~MutexLockGuard(){
        mutex_.unlock();
    }
};

#endif // MUTEXLOCKGUARD_H_INCLUDED
Condition.h。條件變數
#ifndef CONDITION_H_INCLUDED
#define CONDITION_H_INCLUDED

#include"MutexLock.h"
#include<semaphore.h>
#include<sys/time.h>
class Condition:private boost::noncopyable{
    private:
        MutexLock& mutex_;
        pthread_cond_t pcond_;
        struct timespec t;
    public:
    explicit Condition(MutexLock& mutex):mutex_(mutex){
        pthread_cond_init(&pcond_,NULL);
    }
    ~Condition(){
        pthread_cond_destroy(&pcond_);
    }
    void wait(){
        struct timeval now;
        gettimeofday(&now,NULL);
        t.tv_sec=now.tv_sec;
        t.tv_nsec=now.tv_usec*1000;
        t.tv_sec+=2;
        pthread_cond_wait(&pcond_,mutex_.getPthreadMutex());
        //pthread_cond_timedwait(&pcond_,mutex_.getPthreadMutex(),&t);
    }
    void notify(){
        pthread_cond_signal(&pcond_);
    }
    void notifyAll(){
        pthread_cond_broadcast(&pcond_);
    }
};


#endif // CONDITION_H_INCLUDED
CountDown.h用條件變數與互斥鎖實現用於主執行緒等待子執行緒結束。
#ifndef COUNTDOWN_H_INCLUDED
#define COUNTDOWN_H_INCLUDED

#include<iostream>
#include<boost/shared_ptr.hpp>
#include"MutexLock.h"
#include"MutexLockGuard.h"
#include"Condition.h"
#include<pthread.h>
using namespace std;

class CountDown{
    private:
        MutexLock mutex_;
        Condition cond_;
        int num;
    public:
        CountDown(int n):cond_(mutex_),num(n){}
        void wait(){
            MutexLockGuard lock(mutex_);
            while(num>0){
            cond_.wait();
            }
        }
    void Done(){
        MutexLockGuard lock(mutex_);
        --num;
        if(num==0)
            cond_.notify();
    }
};
#endif // COUNTDOWN_H_INCLUDED

ReadWriteLock.h

#ifndef READWRITELOCK_H_INCLUDED
#define READWRITELOCK_H_INCLUDED

#include<boost/noncopyable.hpp>
#include<pthread.h>
class ReadWriteLock:private boost::noncopyable{
private:
    pthread_rwlock_t rwlock_;
    public:
        ReadWriteLock(){
            pthread_rwlock_init(&rwlock_,NULL);
        }
        ~ReadWriteLock(){
            pthread_rwlock_destroy(&rwlock_);
        }
        int ReadLock(){
            return pthread_rwlock_rdlock(&rwlock_);
        }
        int WriteLock(){
            return pthread_rwlock_wrlock(&rwlock_);
        }
        int Unlock(){
            return pthread_rwlock_unlock(&rwlock_);
        }
        pthread_rwlock_t* getMutex(){
            return &rwlock_;
        }
};

class ReadWriteLockGuard{
private:
    ReadWriteLock& rwlock_;
    public:
    ReadWriteLockGuard(ReadWriteLock& rwlock):rwlock_(rwlock){}
    ~ReadWriteLockGuard(){
        rwlock_.Unlock();
    }
    int ReadLock(){
            return rwlock_.ReadLock();
    }
    int WriteLock(){
            return rwlock_.WriteLock();
    }
};


#endif // READWRITELOCK_H_INCLUDED


main.cpp用於測試兩種方法

#include<iostream>
#include<unistd.h>
#include<time.h>
#include<stdlib.h>
#include<pthread.h>
#include<boost/shared_ptr.hpp>

#include"MutexLock.h"
#include"MutexLockGuard.h"
#include"Condition.h"
#include"CustomerData.h"
#include"CountDown.h"
#include"CustomerDataReadWriteLock.h"
using namespace std;

CustomerData customerData;//shared_ptr+互斥鎖方法
//CustomerDataReadWriteLock customerData;//讀寫鎖方法
CountDown countdown(4);//四個執行緒

const int Size=5;
const int CSize=40;
string customer[CSize]={"Hiho","Earth","Floating","Muduo","Game",
                                                "Hiho1","Earth1","Floating1","Muduo1","Game1",
                                                "Hiho2","Earth2","Floating2","Muduo2","Game2",
                                                "Hiho3","Earth3","Floating3","Muduo3","Game3",
                                                "Hiho4","Earth4","Floating4","Muduo4","Game4",
                                                "Hiho5","Earth5","Floating5","Muduo5","Game5",
                                                "Hiho6","Earth6","Floating6","Muduo6","Game6",
                                                "Hiho7","Earth7","Floating7","Muduo7","Game7"};
string stocks[Size]={"Google","Baidu","Tencent","Alibaba","Nest"};
int stockCnt[Size]={1,2,3,4,5};

void * threadFun1(void * arg) {//寫執行緒方法
    int cnt=200000;//寫次數
    MyStock stock;
    for(int i=0;i<cnt;++i){
        //usleep(rand()%100);
        for(int j=0;j<CSize;++j){//customer
            for(int k=0;k<Size;++k){//stocks
                for(int p=0;p<Size;++p){//cnt
                    stock.stockName=stocks[k];
                    stock.itemCnt=stockCnt[p];
                    customerData.buyStock(customer[j],stock);
                }
            }
        }
    }
    countdown.Done();
    cout<<"One Write Done"<<endl;
    return ((void*)0);
}

void * threadFun2(void * arg) {//讀執行緒方法
    int cnt=1000;//讀執行緒次數
    for(int i=0;i<cnt;++i){
        //usleep(rand()%10);
        for(int j=0;j<CSize;++j){//customer
            for(int k=0;k<Size;++k){//stocks
                for(int p=0;p<Size;++p){//cnt
                    customerData.queryCnt(customer[j],stocks[k]);
                }
            }
        }
    }
    cout<<"One Read Done"<<endl;
    countdown.Done();
    return ((void*)0);
}
int main()
{
    //srand((int)time(0));
    for(int i=0;i<CSize;++i){
        customerData.addCustomer(customer[i]);
    }

    pthread_t thr1,thr2,thr3,thr4;
    pthread_create(&thr1, NULL, threadFun1, NULL);//兩個寫執行緒
    pthread_create(&thr2, NULL, threadFun1, NULL);

    pthread_create(&thr3, NULL, threadFun2, NULL);//兩個讀執行緒
    pthread_create(&thr4, NULL, threadFun2, NULL);

    countdown.wait();
    customerData.traverse();
   return 0;
}