1. 程式人生 > >模板類高效執行緒安全的實現Queue

模板類高效執行緒安全的實現Queue

保證同一時刻多個執行緒不會同時修改同一個共享資源,那麼這個程式是執行緒安全的,或者是序列化訪問資源的。可以使用mutex類來控制執行緒的併發問題。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 #include <iostream>
#include <boost/thread/thread.hpp>
#include <string>
 
// A simple queue class; don't do this, use std::queue
template<typename T>

class Queue {
public:
   Queue( ) {}
  ~Queue( ) {}
 
   void enqueue(const T& x)     {
      // Lock the mutex for this queue
      boost::mutex::scoped_lock lock(mutex_);
      list_.push_back(x);
      // A scoped_lock is automatically destroyed (and thus unlocked)
      // when it goes out of scope
    } 
 
   T dequeue( ) {
      boost::mutex::scoped_lock lock(mutex_);
 
      if (list_.empty( ))
         throw "empty!";     // This leaves the current scope, so the
      T tmp = list_.front( ); // lock is released
      list_.pop_front( );
      return(tmp);
   } // Again: when scope ends, mutex_ is unlocked
 
private:
   std::list<T> list_;
   boost::mutex mutex_;
};
 
Queue<std::string> queueOfStrings;
 
void sendSomething( ) {
   std::string s;
   for (int i = 0; i < 10; ++i)     {       queueOfStrings.enqueue("Cyrus");
   }
}
 
void recvSomething( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      try {s = queueOfStrings.dequeue( );}
      catch(...) {}
   }
}
 
int main( ) {
   boost::thread thr1(sendSomething);
   boost::thread thr2(recvSomething);
 
   thr1.join( );
   thr2.join( );
}

    mutex物件本身並不知道它代表什麼,它僅僅是被多個消費者執行緒使用的資源訪問的鎖定解鎖標誌。在某個時刻,只有一個執行緒可以鎖定這個mutex物件,這就阻止了同一時刻有多個執行緒併發訪問共享資源。一個mutex就是一個簡單的訊號機制。

    給mutex加解鎖有多種策略,最簡單的是使用scoped_lock類,它使用一個mutex引數來構造,並一直鎖定這個mutex直到物件被銷燬。如果這個正在被構造的mutex已經被別的執行緒鎖定的話,當前執行緒就會進入wait狀態,直到這個鎖被解開。

利用read_write_mutex對上述的進行改進:

mutex有一個美中不足,它不區分讀和寫。執行緒如果只是進行讀操作,mutex強制執行緒序列化訪問資源,效率低。而且這種操作不需要排他性訪問。基於這個原因,Boost執行緒庫提供了read_write_mutex。

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 #include <iostream>
#include <boost/thread/thread.hpp>
#include <boost/thread/read_write_mutex.hpp>
#include <string>
 
template<typename T>
class Queue {
public:
   Queue( ) :  // Use a read/write mutex and give writers priority
      rwMutex_(boost::read_write_scheduling_policy::writer_priority){}
  ~Queue( ) {}
 
   void enqueue(const T& x) {
      // Use a r/w lock since enqueue updates the state
      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);
      list_.push_back(x);
   } 
 
   T dequeue( ) {
      // Again, use a write lock
      boost::read_write_mutex::scoped_write_lock writeLock(rwMutex_);
 
      if (list_.empty( ))
         throw "empty!";
      T tmp = list_.front( );
      list_.pop_front( );
      return(tmp);
   }
 
   T getFront( ) {
      // This is a read-only operation, so you only need a read lock
      boost::read_write_mutex::scoped_read_lock readLock(rwMutex_);
      if (list_.empty( ))
         throw "empty!";
      return(list_.front( ));
   }
 
private:
   std::list<T> list_;
   boost::read_write_mutex rwMutex_;
};
 
Queue<std::string> queueOfStrings;
 
void sendSomething( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      queueOfStrings.enqueue("Cyrus");
   }
}
 
void checkTheFront( ) {
   std::string s;
 
   for (int i = 0; i < 10; ++i) {
      try {s = queueOfStrings.getFront( );}
      catch(...) {}
   }
}
 
int main( ) {
 
   boost::thread thr1(sendSomething);
   boost::thread_group grp;
 
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
   grp.create_thread(checkTheFront);
 
   thr1.join( );
   grp.join_all( );
}

     注意Queue的建構函式中隊讀寫鎖rwMutex的初始化。同一時刻,可能有多個讀寫執行緒要鎖定一個read_write_mutex,而這些鎖的排程策略依賴於構造這個mutex時選定的排程策略。Boost庫中提供了四種排程策略:

1)reader_priority:等待讀鎖的執行緒優先於等待寫鎖的執行緒

2)writer_priority:等待寫鎖的執行緒優先於等待讀鎖的執行緒

3)alternating_single_read:在讀鎖和寫鎖之間交替

4)alternating_many_reads:在讀鎖和寫鎖之間交替,這個策略將在兩個寫鎖之間使得所有的在這個queue上掛起的讀鎖都被允許。

     選擇使用哪種策略要慎重,因為使用前兩種的話可能會導致某些鎖始終不能成功,出現餓死的現象。