1. 程式人生 > >moduo網路庫的reactor模式(中)

moduo網路庫的reactor模式(中)

moduo網路庫的reactor模式基本構成為“non-blocking I/O + I/O multiplexing”,程式的基本結構是一個事件迴圈(event loop),以事件驅動(event-driven)和事件回撥(event callback)的方式實現業務邏輯。在moduo網路庫的reactor模式(上)理清了事件迴圈(event loop)的基本框架,在此基礎上此文加上事件驅動(event-driven)和事件回撥(event callback)結構,即reactor最核心的I/O多路複用(I/O mutiplexing)和事件分發(dispatching)機制:進行I/O多路複用並將拿到的I/O事件分發給各個檔案描述符(fd)的事件處理函式。


1、事件驅動與分發:I/O多路複用與事件分發

(1)Channel類:封裝檔案描述符(fd)實現事件分發

每個Channel物件都只屬於某一個EventLoop,因此只屬於某一個I/O執行緒。每個Channel物件只負責一個檔案描述符的I/O事件分發,Channel會把不同的I/O事件(讀、寫、錯誤等)分發為不同的回撥。Channel類就是對檔案描述符的封裝,建構函式Channel(EventLoop* loop, int fd)即將此Channel物件與唯一所屬的EventLoop以及檔案描述符(fd)綁定了起來。Channel類資料成員events_表示fd事件,用於更新I/O多路複用poll(2)。資料成員revents_表示現正要執行的fd事件,用於事件回撥。在目前程式中

1)Channel::enableReading()、Channel::enableWriting()等為設定檔案描述符(fd)事件的介面函式:

首先設定fd事件events_,然後執行update()將該Channel的新事件更新到I/O多路複用器poll(2)(update()是通過資料成員EventLoop* loop_,即自己所屬的EventLoop物件指標呼叫EventLoop::updateChannel(),再呼叫Poller::updateChannel()間接更新poll(2)中的事件。此處疑問:為什麼不直接在Channel中新增資料成員Poller指標直接更新事件到poll(2),而是要繞一圈間接更新事件?)

void enableReading() { events_ |= kReadEvent; update(); }
// void enableWriting() { events_ |= kWriteEvent; update(); }

void Channel::update()
{
  loop_->updateChannel(this);
}

2)Channel::setReadCallback(const EventCallback& cb)、Channel::setWriteCallback(const EventCallback& cb)、Channel::setErrorCallback(const EventCallback& cb)為設定fd對應事件的使用者回撥函式的介面函式。

void setReadCallback(const EventCallback& cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }

3)Channel::handleEvent()是Channel的核心,實現事件分發功能,它由EventLoop::loop()呼叫,它的功能是根據revents_的值分別呼叫不同的使用者呼叫。而revents_則是在Poller::poll()中得以更新的。

void Channel::handleEvent()
{
  if (revents_ & POLLNVAL)
    LOG_WARN << "Channel::handle_event() POLLNVAL";

  if (revents_ & (POLLERR | POLLNVAL))
    if (errorCallback_) 
      errorCallback_();

  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    if (readCallback_) 
      readCallback_();

  if (revents_ & POLLOUT) 
    if (writeCallback_) 
      writeCallback_();
}

//inside of function EventLoop::loop():
while (!quit_)
{
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
        (*it)->handleEvent();
    }
    doPendingFunctors();
}

(2)Poller類:封裝I/O多路複用poll(2)實現I/O多路複用

首先看系統呼叫poll(2)的函式原型為

#include <poll.h>
int poll(struct pollfd fd[], nfds_t nfds, int timeout);

struct pollfd的結構為

struct pollfd{

 int fd; // 檔案描述符

 short event;// 請求的事件

 short revent;// 返回的事件

}

1)為poll(2)提供資料準備:

Poller封裝類首先是通過Poller::updateChannel(Channel* channel)將檔案描述符(fd)封裝類Channel儲存到資料成員std::map<int, Channel*> ChannelMap中(具體實現過程大致為:使用者呼叫Channel::update()----->EventLoop::updateChannel()----->Poller::updateChannel())。接著則可將各個Channel物件的資料成員更新給poll(2),並在每次poll(2)後更新各個Channel的revents_。

2)封裝I/O多路複用(I/O multiplexing):

Poller::poll(int timeoutMs, ChannelList* activeChannels)函式中首先進行I/O多路複用poll(2),所需引數從資料成員std::map<int, Channel*> ChannelMap中獲得,阻塞直到有fd事件發生或定時時間到時,返回事件發生數量numEvents。然後繼續執行內部函式Poller::fillActiveChannels(numEvents, activeChannels)將可發生fd事件對應的Channel反饋給外界,即在資料成員std::map<int, Channel*> ChannelMap中找出事件可發生的fd對應的Channel,存放到外部引數ChannelList* activeChannels中。

注意 2)中並未將I/O多路複用與事件分發合在一起,而是隻實現了I/O多路複用事件分發Channel::handleEvent()則是在EventLoop::loop()中實現。一方面是程式安全方面的考慮,另一方面則是為了方便替換為其他更高效的I/O多路複用機制,如epoll(4)。

至此,一個完整的Reactor模式基本框架就完成了。


2、測試:利用timerfd(2)

有了Reactor基本框架後,我們使用timerfd給EventLoop加上一個定時器功能,對這個框架進行測試。

#include <sys/timerfd.h>

int timerfd_create(int clockid, int flags);

int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);

int timerfd_gettime(int fd, struct itimerspec *curr_value);

傳統的Reactor通過控制poll(2)或select(2)的等待時間來實現定時,而現在linux有了timerfd,我們可以用和處理I/O事件相同的方式來處理定時。具體原因參考文章Muduo 網路程式設計示例之三:定時器

#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <memory>
#include <iostream>

using namespace std;

EventLoop* loop;

void timeout()
{
  cout<<"tid "<<CurrentThreadtid()<<": Timeout!"<<endl;
  loop->quit();
}

int main()
{
  EventLoopThread ELThread;
  loop = ELThread.startLoop();//thread2
  int timerfd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK|TFD_CLOEXEC);
  struct itimerspec howlong;
  bzero(&howlong, sizeof howlong);
  howlong.it_value.tv_sec=3;
  timerfd_settime(timerfd,0,&howlong,NULL);
  Channel channel(loop,timerfd);
  channel.setReadCallback(timeout);  
  channel.enableReading();  

  sleep(5);//ensure the main thread do not exit faster than thread2
  close(timerfd);
  return 0;
}
[email protected]:~/Documents/Reactor/s1.1$ g++ -std=c++11 -pthread -o test1 MutexLockGuard.hpp Condition.hpp Thread.hpp Thread.cpp Channel.hpp Channel.cpp EventLoop.hpp EventLoop.cpp Poller.hpp Poller.cpp EventLoopThread.hpp testEventLoopThread.cpp 
[email protected]:~/Documents/Reactor/s1.1$ /usr/bin/valgrind ./testTimerDemo 
==25681== Memcheck, a memory error detector
==25681== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==25681== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==25681== Command: ./testTimerDemo
==25681== 
tid 25681: create a new thread
tid 25681: waiting
tid 25682: Thread::func_() started!
tid 25682: notified
tid 25681: received notification
tid 25682: start looping...
tid 25682: Timeout!
tid 25682: end looping...
tid 25682: Thread end!
==25681== 
==25681== HEAP SUMMARY:
==25681==     in use at exit: 0 bytes in 0 blocks
==25681==   total heap usage: 16 allocs, 16 frees, 74,552 bytes allocated
==25681== 
==25681== All heap blocks were freed -- no leaks are possible
==25681== 
==25681== For counts of detected and suppressed errors, rerun with: -v
==25681== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

muduo將timerfd封裝到channel,此流程和普通檔案描述符的處理過程一致,也是將timerfd放入到I/O多路複用,其實現思路類似下一小節。然後建立TimerQueue類實現定時器功能。此處省略。


3、使用者喚醒執行緒(擴充套件功能):利用eventfd(2)

由於I/O執行緒平時阻塞在事件迴圈EventLoop::loop()的poll(2)呼叫中,為了讓I/O執行緒能立刻執行使用者回撥,我們需要設法喚醒它。這裡用到了eventfd(2)。該函式返回一個檔案描述符,類似於其他的檔案描述符操作,可以對該描述符進行一系列的操作,如讀、寫、poll、select等,當然這裡我們僅僅考慮read、write。

#include <sys/eventfd.h>

 int eventfd(unsigned int initval, int flags);

程式中,在EventLoop建構函式中則已完成構造eventfd儲存到EventLoop::wakeupFd_、封裝成Channel儲存到EventLoop::wakeupChannel_、更新eventfd的讀事件到I/O多路複用poll(2)中。當poll(2)響應eventfd的讀事件(使用者觸發)時回撥EventLoop::handleRead()函式(eventfd讀操作)讀取wakeup()(eventfd寫操作)時所寫資料。此過程即為喚醒。注意此流程和普通檔案描述符的處理過程一致,也是將喚醒操作也放入到I/O多路複用。

void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = ::read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}
void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = ::write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

在目前程式中,EventLoop::runInLoop(const Functor& cb)為介面函式,供使用者呼叫,在EventLoop的I/O執行緒內執行某個使用者任務回撥。注意這個介面函式EventLoop::runInLoop(const Functor& cb)允許跨執行緒使用,這就帶來了執行緒安全性方面的問題,此處的解決辦法不是加鎖,而是把對回撥函式cb()的操作轉移到I/O執行緒來進行:

(1)如果使用者在當前I/O執行緒呼叫這個函式,回撥會同步進行(即執行cb();)。

(2)如果使用者在其他執行緒呼叫這個函式,cb會被加入佇列,I/O執行緒會被喚醒來呼叫這個回撥函式(即執行EventLoop::queueInLoop(cb);)。有了這個功能,我們就能輕易地線上程間調配任務,這樣可以在不用鎖的情況下保證執行緒安全性。

void EventLoop::runInLoop(const Functor& cb)
{
    if (isInLoopThread())
        cb();
    else
        queueInLoop(cb);
}

在上述(2)情況下,EventLoop::queueInLoop(cb)函式內部首先將回調函式指標cb儲存到函式佇列std::vector<Functor> pendingFunctors_中,然後執行wakeup()(eventfd寫操作)喚醒所屬I/O執行緒。執行緒被喚醒之後則會從I/O多路複用poll(2)中返回,執行後續的EventLoop::doPendingFunctors();遍歷函式佇列執行函式。

void EventLoop::queueInLoop(const Functor& cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
    }

    if (!isInLoopThread() || callingPendingFunctors_)
        wakeup();
}

//inside of EventLoop::loop()
while (!quit_)
{
    activeChannels_.clear();
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
    for (ChannelList::iterator it = activeChannels_.begin();
        it != activeChannels_.end(); ++it)
    {
        (*it)->handleEvent();
    }
    doPendingFunctors();
}

void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
    MutexLockGuard lock(mutex_);
    functors.swap(pendingFunctors_);
  }

  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}

值得學習的地方是在EventLoop::doPendingFunctors()函式中,不是簡單地在臨界區內依次呼叫Functor,而是把回撥佇列swap()到區域性變數functors中,既減小了臨界區的長度,也避免了死鎖,同時也對 EventLoop::pendingFunctors_進行了清空。


3、程式測試

測試檔案test.cpp

#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include <iostream>
#include <memory>

using namespace std;

void test()
{
  cout<<"tid "<<CurrentThreadtid()<<": runInLoop..."<<endl;
}

int main()
{
  cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread
  //sleep(1);

  EventLoopThread ELThread1;
  EventLoop* loop1 = ELThread1.startLoop();//thread 2
  sleep(1);
  loop1->runInLoop(test);
  
  EventLoopThread ELThread2;
  EventLoop* loop2 = ELThread2.startLoop();//thread 3
  sleep(1);
  loop2->runInLoop(test);

  loop1->loop(); //test "one thread one loop"
  loop2->loop(); //test "one thread one loop"
  
  sleep(1);
  //loop1->quit();
  loop1->runInLoop(bind(&EventLoop::quit,loop1));
  //loop2->quit();
  loop2->runInLoop(bind(&EventLoop::quit,loop2));
  sleep(1);

  return 0;
}
[email protected]:~/Documents/Reactor/s1.1$ g++ -std=c++11 -pthread -o test MutexLockGuard.hpp Condition.hpp Thread.hpp Thread.cpp Channel.hpp Channel.cpp EventLoop.hpp EventLoop.cpp Poller.hpp Poller.cpp EventLoopThread.hpp testEventLoopThread.cpp 
[email protected]:~/Documents/Reactor/s1.1$ which valgrind
/usr/bin/valgrind
[email protected]:~/Documents/Reactor/s1.1$ /usr/bin/valgrind ./test
==22825== Memcheck, a memory error detector
==22825== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==22825== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==22825== Command: ./test
==22825== 
Main: pid: 22825 tid: 22825
tid 22825: create a new thread
tid 22825: waiting
tid 22826: Thread::func_() started!
tid 22826: notified
tid 22825: received notification
tid 22826: start looping...
tid 22825: create a new thread
tid 22826: runInLoop...
tid 22825: waiting
tid 22827: Thread::func_() started!
tid 22827: notified
tid 22825: received notification
tid 22827: start looping...
tid 22825: This EventLoop had been created!
tid 22825: This EventLoop had been created!
tid 22827: runInLoop...
tid 22826: end looping...
tid 22827: end looping...
tid 22827: Thread end!
tid 22826: Thread end!
==22825== 
==22825== HEAP SUMMARY:
==22825==     in use at exit: 0 bytes in 0 blocks
==22825==   total heap usage: 34 allocs, 34 frees, 75,472 bytes allocated
==22825== 
==22825== All heap blocks were freed -- no leaks are possible
==22825== 
==22825== For counts of detected and suppressed errors, rerun with: -v
==22825== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)

介面函式EventLoopThread::startLoop()建立新執行緒開始執行EventLoop::loop()並返回所屬執行緒的EventLoop物件的指標loop*,供使用者呼叫。如使用者用loop*呼叫介面函式EventLoop::runInLoop(Functor cb),使用者可在EventLoop所屬I/O執行緒內呼叫回撥函式cb()。如使用者用loop*呼叫EventLoop::loop()會失敗,因為該Reactor模式遵循“one loop per thread”。

在 moduo網路庫的reactor模式(上),執行緒同步封裝類MutexLockGuard及Condition、執行緒封裝類Thread、事件迴圈封裝類EventLoop及EventLoopThread都已完成,實現了事件迴圈框架。本文中則再添上Channel類、Poller類以及擴充套件功能後的EventLoop類,實現事件驅動和分發功能。在上一章中,我是把類的宣告和實現都放在一起,導致在編譯時出現重複定義問題。所以需要養成良好的程式設計習慣,將類的宣告放在標頭檔案.h,而將類的實現放在.c檔案。全域性變數或函式也需要規範,將定義放在.c檔案而在.h檔案使用extern關鍵字宣告即可。

Channel類

#ifndef CHANNEL_H_
#define CHANNEL_H_

#include <functional>
#include <poll.h>

/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd

class EventLoop;

class Channel //: boost::noncopyable
{
 public:
  typedef std::function<void()> EventCallback;

  Channel(EventLoop* loop, int fdArg);

  void handleEvent();
  void setReadCallback(const EventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }

  int fd() const { return fd_; }
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  void enableReading() { events_ |= kReadEvent; update(); }
  // void enableWriting() { events_ |= kWriteEvent; update(); }
  // void disableWriting() { events_ &= ~kWriteEvent; update(); }
  // void disableAll() { events_ = kNoneEvent; update(); }

  // for Poller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  EventLoop* ownerLoop() { return loop_; }

private:
  void update();

  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* loop_;
  const int  fd_;
  int        events_;
  int        revents_;
  int        index_; // used by Poller.

  EventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback errorCallback_;
};

#endif
#include "Channel.hpp"
#include "EventLoop.hpp"
#include <poll.h>

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fdArg)
  : loop_(loop),
    fd_(fdArg),
    events_(0),
    revents_(0),
    index_(-1)
{}

void Channel::update()
{
  loop_->updateChannel(this);
}
 
void Channel::handleEvent()
{
  if (revents_ & POLLNVAL) {
    //LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
    if (errorCallback_) 
      errorCallback_();

  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
    if (readCallback_) 
      readCallback_();

  if (revents_ & POLLOUT)
    if (writeCallback_) 
      writeCallback_();
}

Poller類

#ifndef POLLER_H_
#define POLLER_H_

#include <map>
#include <vector>

/// IO Multiplexing with poll(2).
///
/// This class doesn't own the Channel objects.

struct pollfd;
class Channel;
class EventLoop;

class Poller //: boost::noncopyable
{
 public:
  typedef std::vector<Channel*> ChannelList;

  Poller(EventLoop* loop);
  ~Poller();

  /// Polls the I/O events.
  /// Must be called in the loop thread.
  void poll(int timeoutMs, ChannelList* activeChannels);

  /// Changes the interested I/O events.
  /// Must be called in the loop thread.
  void updateChannel(Channel* channel);

  void assertInLoopThread();

 private:
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;

  typedef std::vector<struct pollfd> PollFdList;
  typedef std::map<int, Channel*> ChannelMap;

  EventLoop* ownerLoop_;
  PollFdList pollfds_;
  ChannelMap channels_;
};

#endif 
#include "Poller.hpp"
#include "Channel.hpp"
#include "EventLoop.hpp"
#include <assert.h>
#include <poll.h>

Poller::Poller(EventLoop* loop) : ownerLoop_(loop) {}
Poller::~Poller() {}

void Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  if (numEvents > 0) {
    fillActiveChannels(numEvents, activeChannels);
  } else if (numEvents == 0) {
  } else {
  }
}

void Poller::assertInLoopThread()
{ 
 // ownerLoop_->assertInLoopThread(); 
}

void Poller::fillActiveChannels(int numEvents,
                                ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      //assert(ch != channels_.end());
      Channel* channel = ch->second;
      //assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}

void Poller::updateChannel(Channel* channel)
{
  //assertInLoopThread();
  //LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0) {
    // a new one, add to pollfds_
    //assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  } else {
    // update existing one
    //assert(channels_.find(channel->fd()) != channels_.end());
    //assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    //assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    //assert(pfd.fd == channel->fd() || pfd.fd == -1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    if (channel->isNoneEvent()) {
      // ignore this pollfd
      pfd.fd = -1;
    }
  }
}

EventLoop類

#include <poll.h>
#include <unistd.h>
#include <functional>
#include <memory>
#include <vector>
#include <iostream>
#include <sys/syscall.h>
#include "Thread.hpp"
#include "Channel.hpp"
#include "MutexLockGuard.hpp"
#include "Poller.hpp"

class EventLoop{
public:
    typedef std::function<void()> Functor;
    EventLoop(); 
    ~EventLoop(); 
    
    bool isInLoopThread() const { return threadId_==CurrentThreadtid(); }
    void loop();
    void quit();
    void runInLoop(const Functor& cb);
  /// Queues callback in the loop thread.
  /// Runs after finish pooling.
  /// Safe to call from other threads.
    void queueInLoop(const Functor& cb);
  // internal use only
    void wakeup();
    void updateChannel(Channel* channel);

private:
    void handleRead();  // waked up
    void doPendingFunctors();
    typedef std::vector<Channel*> ChannelList;
    bool looping_; /* atomic */
    bool quit_; /* atomic */
    bool callingPendingFunctors_; /* atomic */
    const pid_t threadId_;
    //Timestamp pollReturnTime_;
    std::unique_ptr<Poller> poller_;
    //std::unique_ptr<TimerQueue> timerQueue_;
    int wakeupFd_;
    // unlike in TimerQueue, which is an internal class,
    // we don't expose Channel to client.
    std::unique_ptr<Channel> wakeupChannel_;
    ChannelList activeChannels_;
    MutexLock mutex_;
    std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
};

#endif
#include "EventLoop.hpp"
#include "Channel.hpp"
#include "Poller.hpp"
#include "Thread.hpp"
#include "MutexLockGuard.hpp"

#include <thread>
#include <poll.h>
#include <unistd.h>
#include <functional>
#include <memory>
#include <vector>
#include <iostream>
#include <sys/syscall.h>
#include <sys/eventfd.h>

const int kPollTimeMs = 10000;

static int createEventfd()
{
  int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
  if (evtfd < 0)
  {
    //LOG_SYSERR << "Failed in eventfd";
    //abort();
  }
  return evtfd;
}

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    callingPendingFunctors_(false),
    threadId_(CurrentThreadtid()),
    poller_(new Poller(this)),
    //timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this, wakeupFd_))
{
  wakeupChannel_->setReadCallback(
      std::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  wakeupChannel_->enableReading();
}

EventLoop::~EventLoop()
{
  //assert(!looping_);
  close(wakeupFd_);
}

void EventLoop::loop()
{
  if( !isInLoopThread() ){
      std::cout<<"tid "<<CurrentThreadtid()<<": This EventLoop had been created!"<<std::endl;
  }else{
    std::cout<<"tid "<<CurrentThreadtid()<<": start looping..."<<std::endl;
    quit_=false; 
    while (!quit_)
    {
       activeChannels_.clear();
       poller_->poll(kPollTimeMs, &activeChannels_);
       for (ChannelList::iterator it = activeChannels_.begin();
           it != activeChannels_.end(); ++it){
         (*it)->handleEvent();
       }
       doPendingFunctors();
    }
    std::cout<<"tid "<<CurrentThreadtid()<<": end looping..."<<std::endl;
  }
}

void EventLoop::quit()
{
  quit_ = true;
  if (!isInLoopThread())
  {
    wakeup();
  }
}

void EventLoop::runInLoop(const Functor& cb)
{
  if (isInLoopThread())
  {
    cb();
  }
  else
  {
    queueInLoop(cb);
  }
}

void EventLoop::queueInLoop(const Functor& cb)
{
  {
  MutexLockGuard lock(mutex_);
  pendingFunctors_.push_back(cb);
  }

  if (!isInLoopThread() || callingPendingFunctors_)
  {
    wakeup();
  }
}

void EventLoop::updateChannel(Channel* channel)
{
  //assert(channel->ownerLoop() == this);
  //assertInLoopThread();
  poller_->updateChannel(channel);
}

void EventLoop::wakeup()
{
  uint64_t one = 1;
  ssize_t n = write(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    //LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
  }
}

void EventLoop::handleRead()
{
  uint64_t one = 1;
  ssize_t n = read(wakeupFd_, &one, sizeof one);
  if (n != sizeof one)
  {
    //LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
  }
}

void EventLoop::doPendingFunctors()
{
  std::vector<Functor> functors;
  callingPendingFunctors_ = true;

  {
  MutexLockGuard lock(mutex_);
  functors.swap(pendingFunctors_);
  }

  for (size_t i = 0; i < functors.size(); ++i)
  {
    functors[i]();
  }
  callingPendingFunctors_ = false;
}

至於其他類,此處只是將在它們在上一章的程式從.h檔案分開到.h宣告和.c實現:

Thread類

#ifndef THREAD_H_
#define THREAD_H_

#include <thread>
#include <memory>
#include <functional>
#include <string>
#include <iostream>
#include <unistd.h>
#include <sys/syscall.h>

//global variable and function
extern __thread pid_t t_cachedTid;
extern pid_t gettid();
extern pid_t CurrentThreadtid();

class Thread
{
public:
    //typedef void* (*ThreadFunc)(void*);
    typedef std::function<void ()> ThreadFunc;
    Thread(ThreadFunc func); 
    ~Thread();
    void start();

private:
    ThreadFunc func_;
    std::string name_;
    pid_t tid_;
    pthread_t tidp_;
};

#endif
#include "Thread.hpp"

#include <thread>
#include <memory>
#include <functional>
#include <string>
#include <unistd.h>
#include <iostream>
#include <sys/syscall.h>

namespace detail
{

struct ThreadData
{
    typedef std::function<void ()> ThreadFunc;
    ThreadFunc func_;
    std::string name_;
    pid_t tid_;

    ThreadData(ThreadFunc func, const std::string& name, pid_t tid)
      : func_(std::move(func)),
        name_(name),
        tid_(tid)
    { }

    void runInThread()
    {
        tid_ = CurrentThreadtid();
        std::cout<<"tid "<<CurrentThreadtid()<<": Thread::func_() started!"<<std::endl;
        func_();
        name_=std::string("finished");
    }
};

void* startThread(void* obj)
{
  ThreadData* data = static_cast<ThreadData*>(obj);
  data->runInThread();
  delete data;
  std::cout<<"tid "<<CurrentThreadtid()<<": Thread end!"<<std::endl;
  return NULL;
}
 
}
pid_t gettid()
{
  return static_cast<pid_t>(syscall(SYS_gettid));
}

__thread pid_t t_cachedTid = 0;

pid_t CurrentThreadtid()
{
  if (t_cachedTid == 0)
  {
    t_cachedTid = gettid();
  }
  return t_cachedTid;
}


Thread::Thread(ThreadFunc func) : func_(func), tidp_(0) {}
Thread::~Thread()
{
    pthread_detach(tidp_);//let system itself recovers the resources or it will cause memory leak! 
}
    
void Thread::start()
{
   detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
   std::cout<<"tid "<<CurrentThreadtid()<<": create a new thread"<<std::endl;
   if(pthread_create(&tidp_, NULL, &detail::startThread, data))
   {
       delete data;
       std::cout<<"thread create error"<<std::endl;
   }
}

MutexLockGuard類

#ifndef MUTEXLOCKGUARD_H_
#define MUTEXLOCKGUARD_H_

#include <pthread.h>
#include "Thread.hpp"
#include <iostream>

class MutexLock //: boost::noncopyable
{
private:
    pthread_mutex_t mutex_;
    MutexLock(const MutexLock&);
    MutexLock& operator=(const MutexLock&);

public:
    MutexLock()
    {
        pthread_mutex_init(&mutex_,NULL);
        //std::cout<<"MutexLock create!"<<std::endl;
    }
    ~MutexLock()
    {
        pthread_mutex_destroy(&mutex_);
        //std::cout<<"MutexLock destroy!"<<std::endl;
    }
    void lock() { pthread_mutex_lock(&mutex_); }
    void unlock() { pthread_mutex_unlock(&mutex_); }
    pthread_mutex_t* getPthreadMutex() { return &mutex_; }
};

class MutexLockGuard //: boost::noncopyable
{
private:
    MutexLock& mutex_; //???此處加&的目的是什麼
    MutexLockGuard(const MutexLockGuard&);
    MutexLockGuard& operator=(const MutexLockGuard&);

public:
    explicit MutexLockGuard( MutexLock& mutex )
      : mutex_(mutex)
    {
        mutex_.lock();
        //std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard lock!"<<std::endl;
    }
    ~MutexLockGuard()
    {
        mutex_.unlock();
        //std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard unlock!"<<std::endl;
    }
};

#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")
//C++0x中引入了static_assert這個關鍵字,用來做編譯期間的斷言,因此叫做靜態斷言。
//如果第一個引數常量表達式的值為false,會產生一條編譯錯誤,錯誤位置就是該static_assert語句所在行,第二個引數就是錯誤提示字串。

#endif // MUTEXLOCKGUARD_H_

Condition類

#ifndef CONDITION_H_
#define CONDITION_H_

#include "MutexLockGuard.hpp"
#include <pthread.h>

class Condition
{
  public:
    explicit Condition(MutexLock& mutex) : mutex_(mutex)
    {
      pthread_cond_init(&pcond_, NULL);
    }

   ~Condition()
   {
      pthread_cond_destroy(&pcond_);
   }

   void wait()
   {
      pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
   }

   void notify()
   {
      pthread_cond_signal(&pcond_);
   }

   void notifyAll()
   {
      pthread_cond_broadcast(&pcond_);
   }

  private:
    MutexLock& mutex_;
    pthread_cond_t pcond_;
};

#endif 

EventLoopThread類

#ifndef EVENT_LOOP_THREAD_H_
#define EVENT_LOOP_THREAD_H_

#include "EventLoop.hpp"
#include "Thread.hpp"
#include "MutexLockGuard.hpp"
#include "Condition.hpp"
#include <memory>
#include <iostream>

class EventLoopThread
{
public:
  EventLoopThread() 
    : loop_(NULL), exiting_(false), thread_(std::bind(&EventLoopThread::ThreadFunc, this)), mutex_(), cond_(mutex_) {}
  //~EventLoopThread();
  EventLoop* startLoop();
  
private:
  void ThreadFunc();

  EventLoop* loop_; 
  bool exiting_;
  Thread thread_; 
  MutexLock mutex_;
  Condition cond_;
};

EventLoop* EventLoopThread::startLoop()
{
  //assert(!thread_.started());
  thread_.start();
  
  {
    MutexLockGuard lock(mutex_);
    while (loop_ == NULL)
    {
      std::cout<<"tid "<<CurrentThreadtid()<<": waiting"<<std::endl;
      cond_.wait();
    }
    std::cout<<"tid "<<CurrentThreadtid()<<": received notification"<<std::endl;
  }
  return loop_;
}

void EventLoopThread::ThreadFunc()
{
    EventLoop loop;

    {
      MutexLockGuard lock(mutex_);
      loop_ = &loop;
      cond_.notify();
      std::cout<<"tid "<<CurrentThreadtid()<<": notified"<<std::endl;
    }

    loop.loop();
    //assert(exiting_);
}

#endif

參考資料

https://github.com/chenshuo/muduo