1. 程式人生 > >muduo_base程式碼剖析之Thread/Mutex/MutexLockGuard /Condition/CountDownLatch

muduo_base程式碼剖析之Thread/Mutex/MutexLockGuard /Condition/CountDownLatch

(1) 執行緒:Thread、CurrentThread

Thread.h、Thread.cc程式碼解讀

在這裡插入圖片描述
功能介紹:

  1. 建立執行緒時,把執行緒函式當作形參傳遞進去:muduo::Thread t(std::bind(threadFunc2, 42), “thread for free function with argument”);
    說明:執行緒形參func_是void func()型別的,可以使用boost::bind進行適配
  2. 啟動執行緒,使執行緒回撥執行緒函式func_:t.start()
  3. 使執行緒不先退出:t.join

原始碼分析:Thread類

class Thread : noncopyable
{
 public:
  typedef std::function<void ()> ThreadFunc; //執行緒函式型別
  
  //建立執行緒時,形參:func是執行緒函式,當start()時,func()開始執行
  explicit Thread(ThreadFunc func, const string& n)
    : started_(false),joined_(false),pthreadId_(0),tid_(0),
    func_(std::move(func)), //執行緒函式
    name_
(n),latch_(1) { setDefaultName(); } ~Thread(); void start() //最後呼叫了傳進來的func()函式 {//呼叫順序:start()-->pthread_create()-->startThread()-->runInThread()-->func_() assert(!started_); started_ = true; // FIXME: move(func_) detail::ThreadData* data = new detail::ThreadData(
func_, name_, &tid_, &latch_); if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) { started_ = false; delete data; // or no delete? LOG_SYSFATAL << "Failed in pthread_create"; } else { latch_.wait(); assert(tid_ > 0); } } int join(); bool started() const { return started_; } pid_t tid() const { return tid_; } const string& name() const { return name_; } static int numCreated() { return numCreated_.get(); } private: void setDefaultName(); bool started_; bool joined_; pthread_t pthreadId_; pid_t tid_; ThreadFunc func_; //執行緒函式 string name_; CountDownLatch latch_; static AtomicInt32 numCreated_; };

測試程式碼 ./test/Thread_test.c

#include <muduo/base/Thread.h>
#include <muduo/base/CurrentThread.h>

#include <string>
#include <stdio.h>
#include <unistd.h>

void threadFunc() 
{
  printf("tid=%d\n", muduo::CurrentThread::tid());
}

void threadFunc2(int x)
{
  printf("tid=%d, x=%d\n", muduo::CurrentThread::tid(), x);
}

class Foo
{
 public:
  explicit Foo(double x)
    : x_(x)
  {
  }

  void memberFunc()
  {
    printf("tid=%d, Foo::x_=%f\n", muduo::CurrentThread::tid(), x_);
  }

  void memberFunc2(const std::string& text)
  {
    printf("tid=%d, Foo::x_=%f, text=%s\n", muduo::CurrentThread::tid(), x_, text.c_str());
  }

 private:
  double x_;
};

int main()
{
  printf("pid=%d, tid=%d\n", ::getpid(), muduo::CurrentThread::tid());

  //threadFunc無參,無需適配
  muduo::Thread t1(threadFunc); 
  t1.start();
  printf("t1.tid=%d\n", t1.tid());
  t1.join();

  //threadFunc一個引數,需適配
  muduo::Thread t2(std::bind(threadFunc2, 42),
                   "thread for free function with argument");
  t2.start();
  printf("t2.tid=%d\n", t2.tid());
  t2.join();

  //Foo物件的成員函式memberFunc
  Foo foo(87.53);
  muduo::Thread t3(std::bind(&Foo::memberFunc, &foo),
                   "thread for member function without argument");
  t3.start();
  t3.join();
  //Foo物件的成員函式memberFunc2
  muduo::Thread t4(std::bind(&Foo::memberFunc2, std::ref(foo), std::string("Shuo Chen")));
  t4.start();
  t4.join();

  printf("number of created threads %d\n", muduo::Thread::numCreated());
}

CurrentThread.h

namespace CurrentThread
{
# __thread修飾的變數是執行緒區域性儲存的,每個執行緒都有一份
__thread int t_cachedTid = 0; # 執行緒真實pid(tid)的快取,是為
     #了提高獲取tid的效率,較少::syscall(SYS_gettid)系統呼叫的次數
__thread char t_tidString[32]; # tid的字串表示形式
__thread int t_tidStringLength = 6;
__thread const char* t_threadName = "unknown"; #每個執行緒的名稱

(2)互斥鎖:MutexLock、MutexLockGuard

MutexLock

功能:對Linux C下的互斥鎖Mutex進行了封裝,功能一致
在這裡插入圖片描述

class CAPABILITY("mutex") MutexLock : noncopyable
{
private:
  pthread_mutex_t mutex_;
  pid_t holder_; //當前擁有該鎖的真實的執行緒id
public:
  MutexLock()
    : holder_(0)
  {
    MCHECK(pthread_mutex_init(&mutex_, NULL));
  }

  ~MutexLock()
  {
    assert(holder_ == 0);
    MCHECK(pthread_mutex_destroy(&mutex_));
  }

  // must be called when locked, i.e. for assertion
  bool isLockedByThisThread() const // 是否當前執行緒擁有該鎖
  {
    return holder_ == CurrentThread::tid();
  }

  void assertLocked() const ASSERT_CAPABILITY(this)
  {
    assert(isLockedByThisThread());
  }

  // internal usage

  void lock() ACQUIRE() //加鎖
  {
    MCHECK(pthread_mutex_lock(&mutex_));
    assignHolder();
  }

  void unlock() RELEASE()
  {
    unassignHolder();
    MCHECK(pthread_mutex_unlock(&mutex_));
  }

  pthread_mutex_t* getPthreadMutex() /* non-const */
  {
    return &mutex_;
  }
... ...
};

MutexLockGuard

與MutexLock不同點:當MutexLockGuard型別的鎖物件被銷燬時,將呼叫解構函式,導致加的鎖將自動釋放
在這裡插入圖片描述

class SCOPED_CAPABILITY MutexLockGuard : noncopyable
{
 public:
  explicit MutexLockGuard(MutexLock& mutex) ACQUIRE(mutex)
    : mutex_(mutex) // 構造時,對mutex_加鎖
  {
    mutex_.lock();
  }
  ~MutexLockGuard() RELEASE() //析構時,對mutex_解鎖
  {
    mutex_.unlock();
  }
 private:
  MutexLock& mutex_;
};

// 巨集:不允許定義一個匿名的MutexLockGuard物件
#define MutexLockGuard(x) error "Missing guard object name"

示例程式碼

#include <muduo/base/Mutex.h>
#include <muduo/base/Thread.h>
#include <muduo/base/Timestamp.h>

#include <vector>
#include <stdio.h>

using namespace muduo;
using namespace std;

MutexLock g_mutex;
vector<int> g_vec;
const int kCount = 10*1000*1000;

void threadFunc()
{
  for (int i = 0; i < kCount; ++i)
  {
    MutexLockGuard lock(g_mutex); //建構函式中,加鎖
    g_vec.push_back(i); 
  } //函式退出,呼叫MutexLockGuard解構函式,解鎖(因此不需要手動的解鎖)
}
int main()
{
  const int kMaxThreads = 8;
  g_vec.reserve(kMaxThreads * kCount);

  //單個執行緒不加鎖
  Timestamp start(Timestamp::now());
  for (int i = 0; i < kCount; ++i)
  {
    g_vec.push_back(i);
  }

  printf("single thread without lock %f\n", timeDifference(Timestamp::now(), start));

  //單個執行緒加鎖
  start = Timestamp::now();
  threadFunc();
  printf("single thread with lock %f\n", timeDifference(Timestamp::now(), start));

  //分別建立1、2、3、4、5、6、7個執行緒,統計時間開銷
  for (int nthreads = 1; nthreads < kMaxThreads; ++nthreads)
  {
    std::vector<std::unique_ptr<Thread>> threads;
    g_vec.clear();
    start = Timestamp::now();
    for (int i = 0; i < nthreads; ++i)
    {
      threads.emplace_back(new Thread(&threadFunc));
      threads.back()->start();
    }
    for (int i = 0; i < nthreads; ++i)
    {
      threads[i]->join();
    }
    printf("%d thread(s) with lock %f\n", nthreads, timeDifference(Timestamp::now(), start));
  }
}

(3) 條件變數:Condition、CountDownLatch

Condition

主要功能:對Linux C下的條件變數Condition進行了封裝,功能一致

class Condition : noncopyable
{
 private:
  MutexLock& mutex_;  // 條件變數要配合互斥鎖一起使用
  pthread_cond_t pcond_;
 public:
  explicit Condition(MutexLock& mutex)
    : mutex_(mutex)
  {
    MCHECK(pthread_cond_init(&pcond_, NULL));
  }

  ~Condition()
  {
    MCHECK(pthread_cond_destroy(&pcond_));
  }

  void wait()
  {
    MutexLock::UnassignGuard ug(mutex_);
    MCHECK(pthread_cond_wait(&pcond_, mutex_.getPthreadMutex()));
  }

  // returns true if time out, false otherwise.
  bool waitForSeconds(double seconds) // 呼叫了pthread_cond_timedwait
  {
    struct timespec abstime;
    // FIXME: use CLOCK_MONOTONIC or CLOCK_MONOTONIC_RAW to prevent time rewind.
    clock_gettime(CLOCK_REALTIME, &abstime);

    const int64_t kNanoSecondsPerSecond = 1000000000;
    int64_t nanoseconds = static_cast<int64_t>(seconds * kNanoSecondsPerSecond);

    abstime.tv_sec += static_cast<time_t>((abstime.tv_nsec + nanoseconds) / kNanoSecondsPerSecond);
    abstime.tv_nsec = static_cast<long>((abstime.tv_nsec + nanoseconds) % kNanoSecondsPerSecond);

    MutexLock::UnassignGuard ug(mutex_);
    return ETIMEDOUT == pthread_cond_timedwait(&pcond_, mutex_.getPthreadMutex(), &abstime);
  }

  void notify()
  {
    MCHECK(pthread_cond_signal(&pcond_));
  }

  void notifyAll()
  {
    MCHECK(pthread_cond_broadcast(&pcond_));
  }
};

CountDownLatch

CountDownLatch類,對條件變數類進行了封裝

  1. 使用條件變數Condition,可以使得執行緒阻塞在一個條件上
  2. 使用CountDownLatch,可以使得執行緒阻塞在count_變數是否為0上。那麼,哪個執行緒將會阻塞在count!=0條件上呢?答案:呼叫wait()函式的執行緒
class CountDownLatch : noncopyable
{
private:
  mutable MutexLock mutex_;
  Condition condition_ ;
  int count_ ;
public:
  explicit CountDownLatch::CountDownLatch(int count)
  : mutex_(),condition_(mutex_),count_(count)
  {
  }

  //呼叫wait()函式的執行緒,當count_>0時將會阻塞在wait()函式處
  void wait() 
  {
    MutexLockGuard lock(mutex_);
    while (count_ > 0) 
    {
      condition_.wait(); 
    }
  }
  void countDown()
  {
    MutexLockGuard lock(mutex_);
    --count_;
    if (count_ == 0)
    {
      condition_.notifyAll();
    }
  }

  int getCount() const
  {
    MutexLockGuard lock(mutex_);
    return count_;
  }
};