1. 程式人生 > >muduo原始碼分析--執行緒池的實現

muduo原始碼分析--執行緒池的實現

muduo執行緒池主要有3個類的實現:EventLoop,EventLoopThreadPool,EventLoopThread,和Thread.
1.Thread為對一個執行緒的封裝:

class Thread : boost::noncopyable
{
 public:
  typedef boost::function<void ()> ThreadFunc;   //建立執行緒時,傳遞的函式

  explicit Thread(const ThreadFunc&, const string& name = string());
#ifdef __GXX_EXPERIMENTAL_CXX0X__
explicit Thread(ThreadFunc&&, const string& name = string()); #endif ~Thread(); void start(); //呼叫start,新建的執行緒開始執行ThreadFunc函式。 int join(); // return pthread_join() bool started() const { return started_; } // pthread_t pthreadId() const { return pthreadId_; } pid_t tid() const
{ return *tid_; } const string& name() const { return name_; } static int numCreated() { return numCreated_.get(); } private: void setDefaultName(); bool started_; bool joined_; pthread_t pthreadId_; boost::shared_ptr<pid_t> tid_; ThreadFunc func_; string
name_; }; } Thread::Thread(ThreadFunc&& func, const string& n) : started_(false), joined_(false), pthreadId_(0), tid_(new pid_t(0)), func_(std::move(func)), name_(n) { setDefaultName(); } Thread::~Thread() { if (started_ && !joined_) { pthread_detach(pthreadId_); } } void Thread::start() //建立一個執行緒 { assert(!started_); started_ = true; // FIXME: move(func_) detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_); if (pthread_create(&pthreadId_, NULL, &detail::startThread, data)) { started_ = false; delete data; // or no delete? LOG_SYSFATAL << "Failed in pthread_create"; } } int Thread::join() //建立該執行緒的執行緒可以呼叫join,等待執行緒退出 { assert(started_); assert(!joined_); joined_ = true; return pthread_join(pthreadId_, NULL); }

Thred執行緒類的使用:
Thread thread(func,”thread”); //例項化一個執行緒物件,其中func是一個函式指標
thread.start(); //建立一個執行緒,並開始執行

2,EventLoopThread類封裝一個EventLoop執行緒,即對Thread的進一步封裝

class EventLoopThread : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;//執行緒初始化回撥

  EventLoopThread(const ThreadInitCallback& cb = ThreadInitCallback(),
                  const string& name = string());
  ~EventLoopThread();
  EventLoop* startLoop();   //開始

 private:
  void threadFunc();

  EventLoop* loop_;
  bool exiting_;
  Thread thread_;
  MutexLock mutex_;
  Condition cond_;
  ThreadInitCallback callback_;
};
//EventLoopThread 建構函式會建立一個thread.
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
                                 const string& name)
  : loop_(NULL),
    exiting_(false),
    thread_(boost::bind(&EventLoopThread::threadFunc, this), name),
    mutex_(),
    cond_(mutex_),
    callback_(cb)
{
}

EventLoopThread::~EventLoopThread()
{
  exiting_ = true;
  if (loop_ != NULL) // not 100% race-free, eg. threadFunc could be running callback_.
  {
    // still a tiny chance to call destructed object, if threadFunc exits just now.
    // but when EventLoopThread destructs, usually programming is exiting anyway.
    loop_->quit();
    thread_.join();
  }
}
//呼叫該函式會呼叫thread.start()-》EventLoopThread::threadFunc()ThreadFunc()->例項化一個EventLoop-》return loop_;
EventLoop* EventLoopThread::startLoop()
{
  assert(!thread_.started());
  thread_.start();

  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      cond_.wait();
    }
  }

  return loop_;
}

void EventLoopThread::threadFunc()// thread_.start();會執行該函式,並且線上程內例項化一個新的EventLoop例項,並會停留在loop.loop();
{
  EventLoop loop;

  if (callback_)
  {
    callback_(&loop);
  }

  {
    MutexLockGuard lock(mutex_);
    loop_ = &loop;
    cond_.notify();
  }

  loop.loop();
  //assert(exiting_);
  loop_ = NULL;
}

EventLoopThread類的使用:
EventLoopThread eventloopthread;//例項化一個EventLoopThread,沒有ThreadInitCallback
EventLoop* ioloop = eventloopthread.startLoop();
//執行該執行緒的執行緒將會建立一個新的執行緒,這個新的執行緒有一個自己的EventLoop,並且會返回他的EventLoop物件的指標。即新執行緒的EventLoop物件是暴露給父執行緒的。

3,EventLoopThreadPool是管理EventLoopThread的池。

EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
  : baseLoop_(baseLoop),    //baseLoop為主執行緒擁有的EventLoop;
    name_(nameArg),        
    started_(false),
    numThreads_(0),        //該執行緒池需要建立的執行緒個數
    next_(0)
{
}

EventLoopThreadPool::~EventLoopThreadPool()
{
  // Don't delete loop, it's stack variable
}
//該函式會建立 numThreads_個EventLoopThread物件並執行各個執行緒,並在主執行緒儲存建立的EventLoopThread物件和EventLoopThread執行緒建立的EventLoop物件。如果建立的執行緒為0,則執行cb(baseLoop_);
void EventLoopThreadPool::start(const ThreadInitCallback& cb)  
{
  assert(!started_);
  baseLoop_->assertInLoopThread();
  started_ = true;
  for (int i = 0; i < numThreads_; ++i)
  {
    char buf[name_.size() + 32];
    snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
    EventLoopThread* t = new EventLoopThread(cb, buf);
    threads_.push_back(t);
    loops_.push_back(t->startLoop());
  }
  if (numThreads_ == 0 && cb)
  {
    cb(baseLoop_);
  }
}
//呼叫該函式會按照輪流的順序返回池裡的執行緒的EventLoop物件指標
EventLoop* EventLoopThreadPool::getNextLoop()
{
  baseLoop_->assertInLoopThread();
  assert(started_);
  EventLoop* loop = baseLoop_;

  if (!loops_.empty())
  {
    // round-robin
    loop = loops_[next_];
    ++next_;
    if (implicit_cast<size_t>(next_) >= loops_.size())
    {
      next_ = 0;
    }
  }
  return loop;
}
  void EventLoopThreadPool::setThreadNum(int numThreads) { numThreads_ = numThreads; }

EventLoopThreadPool類的使用:

EventLoop el;
EventLoopThreadPool eltp(el);
eltp.setThreadNum(5);
eltp.start();