1. 程式人生 > >Muduo庫的框架剖析及總結(三)

Muduo庫的框架剖析及總結(三)

有些人覺得我的部落格口水話很多,嗯,我個人考慮了一下,可能我寫東西確實沒有那麼高大上,口水話很多,但是我個人確實非常喜歡這種看著非常親切的風格,其實我覺得還好,只要你能讀懂我的部落格就行!!!有問題和意見可以提,有問題可以相互探討、留言,甚至在留言板罵我都行,對我個人也是一種鼓勵吧,好了還是太多廢話了!!!

我前面有說過把Muduo庫分成大致四個模組,那麼我們今天把Channel模組和Eventloop這兩個模組給搞定吧

Channel我們前面說過了它是一個事件處理器,看看它的物件

  boost::weak_ptr<void> tie_;
  bool tied_;
  bool
eventHandling_; bool addedToLoop_; //讀處理的回撥 ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_;

再看看它的建構函式:

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1
), logHup_(true), tied_(false), eventHandling_(false), addedToLoop_(false) { }

我們前面知道Acceptor和TcpConnection都用了Channel的setReadCallback,而這個函式呢就是相當於設定了Channel中的ReadBack回撥函式,而這個ReadBack函式的作用是在Epoll一旦觸發了就緒事件的回撥函式

  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }

不過在這裡強調的Channel則內的主要函式則不是這個
我們這裡第一個應該強調的函式應該是:
Channel::handleEvent

//呼叫底下的處理事件
void Channel::handleEvent(Timestamp receiveTime)
{
  boost::shared_ptr<void> guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      handleEventWithGuard(receiveTime);
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}

而它的底層呼叫的是:
Channel::handleEventWithGuard

//對各種事件的一個處理
void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  LOG_TRACE << reventsToString();
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  //讀事件處理
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  //寫事件處理
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}

其實從這裡我們能看出來這個函式其實是對當有就緒事件發生時判斷事件是可讀還是可寫時間,判斷完後呼叫一個可讀事件函式或者一個可寫事件函式,這是Channel中的最重要的一個函式,當然還有另一個重要的函式:void enableReading()
在前面的TcpConnection中的TcpConnection::connectEstablished()我們呼叫過它,還有前面Acceptor中的Acceptor::listen()中我們也呼叫過它,我們這裡就直接告訴它的功能了吧,就是起到一個註冊事件的功能,我們來看一下它的程式碼:

 void enableReading() { events_ |= kReadEvent; update(); }

裡面有個update():

void Channel::update()
{
  addedToLoop_ = true;
  loop_->updateChannel(this);
}

我們可以看到它呼叫了Eventloop中的updateChannel(),雖然我們還沒說到Eventloop,但是我們先走進去繼續往下看吧,等後面說完Eventloop再來把之前的坑給補上:

//新增事件
void EventLoop::updateChannel(Channel* channel)
{
  assert(channel->ownerLoop() == this);
  assertInLoopThread();
  poller_->updateChannel(channel);
}

我們看到它又呼叫了一個poller**底下的updateChannel()(我想吐槽為什麼封裝那麼多),我們現在這裡先不細說**poller,因為它是一個抽象類,它會供我們去選擇epoll或者poll,我後面並沒有分析poll的部分,而只是分析了epoll,直接進入epoll看就好咯:

//註冊事件
void EPollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  const int index = channel->index();
  if (index == kNew || index == kDeleted)
  {
    // a new one, add with EPOLL_CTL_ADD
    int fd = channel->fd();
    if (index == kNew)
    {
      assert(channels_.find(fd) == channels_.end());
      channels_[fd] = channel;
    }
    else // index == kDeleted
    {
      assert(channels_.find(fd) != channels_.end());
      assert(channels_[fd] == channel);
    }
    channel->set_index(kAdded);
    update(EPOLL_CTL_ADD, channel);
  }
  else
  {
    // update existing one with EPOLL_CTL_MOD/DEL
    int fd = channel->fd();
    (void)fd;
    assert(channels_.find(fd) != channels_.end());
    assert(channels_[fd] == channel);
    assert(index == kAdded);
    if (channel->isNoneEvent())
    {
      update(EPOLL_CTL_DEL, channel);
      channel->set_index(kDeleted);
    }
    else
    {
      update(EPOLL_CTL_MOD, channel);
    }
  }
}

最後還是呼叫了一個EPollPoller::update():

//註冊刪除事件核心
void EPollPoller::update(int operation, Channel* channel)
{
  struct epoll_event event;
  bzero(&event, sizeof event);
  event.events = channel->events();
  event.data.ptr = channel;
  int fd = channel->fd();
  //核心
  if (::epoll_ctl(epollfd_, operation, fd, &event) < 0)
  {
    if (operation == EPOLL_CTL_DEL)
    {
      LOG_SYSERR << "epoll_ctl op=" << operation << " fd=" << fd;
    }
    else
    {
      LOG_SYSFATAL << "epoll_ctl op=" << operation << " fd=" << fd;
    }
  }
}

當然,在這裡你就能看到它實際上呼叫了epoll_ctl,所以無論是註冊事件還是刪除事件其實底層都是呼叫了這個函式,啊,這裡說得有些深入Eventloop中了,待會說到這個地方,會把這裡的一些坑給補上,這裡我就要說明一點就是void enableReading()是一個註冊事件的函式,而它底層呼叫指向Eventloop中的updateChannel(),我們一直往下深入就會發現實際上它呼叫了epoll_ctl

/*****************************************************/
好吧Channel也說得差不多了,該說Eventloop這個傢伙了,所謂的Eventloop在這裡就類似於一個事件分流器,我們把我們所謂的一些事件封裝成功能類,比如TcpConnection和Acceptor,當然這些事件有它們的檔案描述符,OK!這個時候我們的Channel就相當於一個事件處理器,它把對於這個事件的一些方法收集在一起,當有就緒事件發生的時候,就會回撥這些方法,我們把我們事件的檔案描述符傳入到Channel中,再將Channel放入到Eventloop中,那麼剩下的事情就由Eventloop去處理吧

我們來看一下Eventloop這個類,它實際上是獨立於TcpServer而存在,它,當然,我們只能說我們在結構上可以暫時這麼說,因為當你建立一個Eventloop是獨立建立然後傳入指標進入TcpServer中的,而不是TcpServer建立好的,還是老規矩,老看一下它的成員變數:

bool looping_; /* atomic */
  bool quit_; /* atomic and shared between threads, okay on x86, I guess. */
  bool eventHandling_; /* atomic */
  bool callingPendingFunctors_; /* atomic */
  int64_t iteration_;
  const pid_t threadId_;
  Timestamp pollReturnTime_;//時間戳
  boost::scoped_ptr<Poller> poller_;//底層的poll或者epoll
  boost::scoped_ptr<TimerQueue> timerQueue_;//時間佇列

typedef std::vector<Channel*> ChannelList;
  ChannelList activeChannels_;//就緒事件
  Channel* currentActiveChannel_;//當前就緒事件

建構函式:

EventLoop::EventLoop()
  : looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),//選擇poll或者epoll
    timerQueue_(new TimerQueue(this)),//epoll_wait的時間列表
    wakeupFd_(createEventfd()),//執行緒間通訊需要的fd
    wakeupChannel_(new Channel(this, wakeupFd_)),
    currentActiveChannel_(NULL)
{
    //...貌似應該和ioloop有關,跟主執行緒和執行緒之間的響應有關
  wakeupChannel_->setReadCallback(
      boost::bind(&EventLoop::handleRead, this));
  // we are always reading the wakeupfd
  wakeupChannel_->enableReading();
}

好吧,其實它的重點是這個:
poller_(Poller::newDefaultPoller(this))

Poller* Poller::newDefaultPoller(EventLoop* loop)
{
  if (::getenv("MUDUO_USE_POLL"))
  {
    return new PollPoller(loop);
  }
  else
  {
    return new EPollPoller(loop);
  }
}

之前我也說過,它是對I/O複用,poll和epoll的一個選擇,我所選擇的是epoll
epoll的建構函式:

EPollPoller::EPollPoller(EventLoop* loop)
  : Poller(loop),
  //構造一個epoll
    epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
    events_(kInitEventListSize)

我們能看到它呼叫了epoll_create,建立一個epoll事件佇列

我們再轉回Eventloop中:
我們要知道Eventloop最重要的一個函式就是loop(),我們應該分析的是這個函式:

void EventLoop::loop()
{
  assert(!looping_);
  assertInLoopThread();
  looping_ = true;
  quit_ = false;  // FIXME: what if someone calls quit() before loop() ?
  LOG_TRACE << "EventLoop " << this << " start looping";

//quit這個其實和執行緒有點關係,當然你要了解更多詳細的請去看陳碩的書= =!!
  while (!quit_)
  {
      //前面其實我們有看到這就是一個就是就緒事件列表哦
    activeChannels_.clear();//刪除事件列表所有元素
    pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//!!!最重要的函式(我們可以看到這裡它把activeChannels_傳了進去)
    ++iteration_;
    if (Logger::logLevel() <= Logger::TRACE)
    {
      printActiveChannels();
    }
    // TODO sort channel by priority
    eventHandling_ = true;
    for (ChannelList::iterator it = activeChannels_.begin();
    //把傳進去的activeChannels_的就緒事件提取出來
        it != activeChannels_.end(); ++it)
    {
      currentActiveChannel_ = *it;
      //根據就緒事件處理事件
      currentActiveChannel_->handleEvent(pollReturnTime_);
    }
    currentActiveChannel_ = NULL;
    eventHandling_ = false;
    doPendingFunctors();
  }

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

好吧poll這個函式比較重要,當然它是呼叫了EPollPoller::poll(),好吧來看看:

Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
//我們看到這裡呼叫了epoll_wait,這裡當然就是監聽就緒事件的檔案描述符了
  int numEvents = ::epoll_wait(epollfd_,
                               &*events_.begin(),
                               static_cast<int>(events_.size()),
                               timeoutMs);
  int savedErrno = errno;
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    //將就緒事件放入就緒佇列拉到Eventloop中統一處理
    fillActiveChannels(numEvents, activeChannels);
    if (implicit_cast<size_t>(numEvents) == events_.size())
    {
      events_.resize(events_.size()*2);
    }
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    // error happens, log uncommon ones
    if (savedErrno != EINTR)
    {
      errno = savedErrno;
      LOG_SYSERR << "EPollPoller::poll()";
    }
  }
  return now;
}

fillActiveChannels(numEvents, activeChannels)這個得看一下:

//將就緒事件放到activeChannels中
void EPollPoller::fillActiveChannels(int numEvents,
                                     ChannelList* activeChannels) const
{
  assert(implicit_cast<size_t>(numEvents) <= events_.size());
  for (int i = 0; i < numEvents; ++i)
  {
  //將就緒事件取出來放入到就緒事件列表中
    Channel* channel = static_cast<Channel*>(events_[i].data.ptr);
#ifndef NDEBUG
    int fd = channel->fd();
    ChannelMap::const_iterator it = channels_.find(fd);
    assert(it != channels_.end());
    assert(it->second == channel);
#endif
    channel->set_revents(events_[i].events);
    activeChannels->push_back(channel);
  }
}

其實可以看到這就是一個提取就緒事件的一個過程,我們把就緒事件放入到activeChannels中,在Eventloop中再把從epoll中提取到的事件提取出來 ,並呼叫了currentActiveChannel_的handleEvent,而我們在分析Channel的時候我們已經分析了handleEvent的作用,那麼實際上這裡就相當於有就緒事件然後返回呼叫它的回撥函數了,嗯,就是這麼一個過程!!!
/*****************************************************/

這篇是有點長!!!