1. 程式人生 > >Muduo網路庫原始碼分析(一) EventLoop事件迴圈(Poller和Channel)

Muduo網路庫原始碼分析(一) EventLoop事件迴圈(Poller和Channel)

從這一篇博文起,我們開始剖析Muduo網路庫的原始碼,主要結合《Linux多執行緒服務端程式設計》和網上的一些學習資料!

(一)TCP網路程式設計的本質:三個半事件

1. 連線的建立,包括服務端接受(accept) 新連線和客戶端成功發起(connect) 連線。TCP 連線一旦建立,客戶端和服務端是平等的,可以各自收發資料。

2. 連線的斷開,包括主動斷開(close 或shutdown) 和被動斷開(read(2) 返回0)。

3. 訊息到達,檔案描述符可讀。這是最為重要的一個事件,對它的處理方式決定了網路程式設計的風格(阻塞還是非阻塞,如何處理分包,應用層的緩衝如何設計等等)。

3.5 訊息傳送完畢,這算半個。對於低流量的服務,可以不必關心這個事件;另外,這裡“傳送完畢”是指將資料寫入作業系統的緩衝區,將由TCP 協議棧負責資料的傳送與重傳,不代表對方已經收到資料。

這其中,最主要的便是第三點: 訊息到達,檔案描述符可讀。下面我們來仔細分析(順便分析訊息傳送完畢):


(1)訊息到達,檔案可讀:

核心接收-> 網路庫可讀事件觸發--> 將資料從核心轉至應用緩衝區(並且回撥函式OnMessage根據協議判斷是否是完整的資料包,如果不是立即返回)-->如果完整就取出讀走、解包、處理、傳送(read decode compute encode write)

(2)訊息傳送完畢:

應用緩衝區-->核心緩衝區(可全填)--->觸發傳送完成的事件,回撥Onwrite。如果核心緩衝區不足以容納資料(高流量的服務),要把資料追加到應用層傳送緩衝區中核心資料傳送之後,觸發socket

可寫事件,應用層-->核心;當全傳送至核心時,又會回撥Onwrite(可繼續寫)

(二)事件迴圈類圖


EventLoop類:

EventLoop是對Reactor模式的封裝,由於Muduo的併發原型是 Multiple reactors + threadpool  (one loop per thread + threadpool),所以每個執行緒最多隻能有一個EventLoop物件。EventLoop物件構造的時候,會檢查當前執行緒是否已經建立了其他EventLoop物件,如果已建立,終止程式(LOG_FATAL),EventLoop類的建構函式會記錄本物件所屬執行緒(threadld_

),建立了EventLoop物件的執行緒稱為IO執行緒,其功能是執行事件迴圈(EventLooploop),啥也不幹==

下面是簡化版的EventLoop(內部的Poller尚未實現,只是一個框架)

EventLoop.h

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <boost/noncopyable.hpp>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Thread.h>

namespace muduo
{
namespace net
{
/// Reactor, at most one per thread.
/// This is an interface class, so don't expose too much details.
class EventLoop : boost::noncopyable
{
 public:
  EventLoop();
  ~EventLoop();  // force out-line dtor, for scoped_ptr members.
  /// Loops forever.
  /// Must be called in the same thread as creation of the object.
  void loop();
  void assertInLoopThread()
  {
    if (!isInLoopThread())
    {
      abortNotInLoopThread();
    }
  }
  bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }

  static EventLoop* getEventLoopOfCurrentThread();

 private:
  void abortNotInLoopThread();
  
  bool looping_; /* atomic */
  const pid_t threadId_;		// 當前物件所屬執行緒ID
};

}
}
#endif  // MUDUO_NET_EVENTLOOP_H
EventLoop.c
#include <muduo/net/EventLoop.h>
#include <muduo/base/Logging.h>
#include <poll.h>
using namespace muduo;
using namespace muduo::net;

namespace
{
// 當前執行緒EventLoop物件指標
// 執行緒區域性儲存
__thread EventLoop* t_loopInThisThread = 0;
}

EventLoop* EventLoop::getEventLoopOfCurrentThread()
{
  return t_loopInThisThread;
}

EventLoop::EventLoop()
  : looping_(false),
    threadId_(CurrentThread::tid())
{
  LOG_TRACE << "EventLoop created " << this << " in thread " << threadId_;
  // 如果當前執行緒已經建立了EventLoop物件,終止(LOG_FATAL)
  if (t_loopInThisThread)
  {
    LOG_FATAL << "Another EventLoop " << t_loopInThisThread
              << " exists in this thread " << threadId_;
  }
  else
  {
    t_loopInThisThread = this;
  }
}

EventLoop::~EventLoop()
{
  t_loopInThisThread = NULL;
}

// 事件迴圈,該函式不能跨執行緒呼叫
// 只能在建立該物件的執行緒中呼叫
void EventLoop::loop()
{
  assert(!looping_);
  // 斷言當前處於建立該物件的執行緒中
  assertInLoopThread();
  looping_ = true;
  LOG_TRACE << "EventLoop " << this << " start looping";

  ::poll(NULL, 0, 5*1000);

  LOG_TRACE << "EventLoop " << this << " stop looping";
  looping_ = false;
}

void EventLoop::abortNotInLoopThread()
{
  LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this
            << " was created in threadId_ = " << threadId_
            << ", current thread id = " <<  CurrentThread::tid();
}


Poller類:

時序圖:



Poller是個抽象類,具體可以是EPollPoller(預設) 或者PollPoller,需要去實現(唯一使用面向物件的一個類)

對於PollPoller來說,存在一個map,用來關聯fd和channel的,我們可以根據fd快速找到對應的channel。一個fd對應一個struct pollfd(pollfd.fd),一個fd 對應一個channel*;這個fd 可以是socket, eventfd, timerfd, signalfd。

Poller的作用是更新IO複用中的channel(IO事件),新增、刪除Channel。我們看一下PollPoller的實現:

PollPoller.h

#ifndef MUDUO_NET_POLLER_POLLPOLLER_H
#define MUDUO_NET_POLLER_POLLPOLLER_H

#include <muduo/net/Poller.h>
#include <map>
#include <vector>

struct pollfd;

namespace muduo
{
namespace net
{

/// IO Multiplexing with poll(2).
class PollPoller : public Poller
{
 public:

  PollPoller(EventLoop* loop);
  virtual ~PollPoller();

  virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels);
  virtual void updateChannel(Channel* channel);
  virtual void removeChannel(Channel* channel);

 private:
  void fillActiveChannels(int numEvents,
                          ChannelList* activeChannels) const;

  typedef std::vector<struct pollfd> PollFdList;
  typedef std::map<int, Channel*> ChannelMap;	// key是檔案描述符,value是Channel*
  PollFdList pollfds_;
  ChannelMap channels_;
};

}
}
#endif  // MUDUO_NET_POLLER_POLLPOLLER_H

PollPoller.c

#include <muduo/net/poller/PollPoller.h>
#include <muduo/base/Logging.h>
#include <muduo/base/Types.h>
#include <muduo/net/Channel.h>
#include <assert.h>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

PollPoller::PollPoller(EventLoop* loop)
  : Poller(loop)
{
}

PollPoller::~PollPoller()
{
}

Timestamp PollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
  // XXX pollfds_ shouldn't change
  int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
  Timestamp now(Timestamp::now());
  if (numEvents > 0)
  {
    LOG_TRACE << numEvents << " events happended";
    fillActiveChannels(numEvents, activeChannels);
  }
  else if (numEvents == 0)
  {
    LOG_TRACE << " nothing happended";
  }
  else
  {
    LOG_SYSERR << "PollPoller::poll()";
  }
  return now;
}

void PollPoller::fillActiveChannels(int numEvents,
                                    ChannelList* activeChannels) const
{
  for (PollFdList::const_iterator pfd = pollfds_.begin();
      pfd != pollfds_.end() && numEvents > 0; ++pfd)
  {
    if (pfd->revents > 0)
    {
      --numEvents;
      ChannelMap::const_iterator ch = channels_.find(pfd->fd);
      assert(ch != channels_.end());
      Channel* channel = ch->second;
      assert(channel->fd() == pfd->fd);
      channel->set_revents(pfd->revents);
      // pfd->revents = 0;
      activeChannels->push_back(channel);
    }
  }
}

void PollPoller::updateChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
  if (channel->index() < 0)
  {
	// index < 0說明是一個新的通道
    // a new one, add to pollfds_
    assert(channels_.find(channel->fd()) == channels_.end());
    struct pollfd pfd;
    pfd.fd = channel->fd();
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
    pollfds_.push_back(pfd);
    int idx = static_cast<int>(pollfds_.size())-1;
    channel->set_index(idx);
    channels_[pfd.fd] = channel;
  }
  else
  {
    // update existing one
    assert(channels_.find(channel->fd()) != channels_.end());
    assert(channels_[channel->fd()] == channel);
    int idx = channel->index();
    assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
    struct pollfd& pfd = pollfds_[idx];
    assert(pfd.fd == channel->fd() || pfd.fd == -channel->fd()-1);
    pfd.events = static_cast<short>(channel->events());
    pfd.revents = 0;
	// 將一個通道暫時更改為不關注事件,但不從Poller中移除該通道
    if (channel->isNoneEvent())
    {
      // ignore this pollfd
	  // 暫時忽略該檔案描述符的事件
	  // 這裡pfd.fd 可以直接設定為-1
      pfd.fd = -channel->fd()-1;	// 這樣子設定是為了removeChannel優化
    }
  }
}

void PollPoller::removeChannel(Channel* channel)
{
  Poller::assertInLoopThread();
  LOG_TRACE << "fd = " << channel->fd();
  assert(channels_.find(channel->fd()) != channels_.end());
  assert(channels_[channel->fd()] == channel);
  assert(channel->isNoneEvent());
  int idx = channel->index();
  assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
  const struct pollfd& pfd = pollfds_[idx]; (void)pfd;
  assert(pfd.fd == -channel->fd()-1 && pfd.events == channel->events());
  size_t n = channels_.erase(channel->fd());
  assert(n == 1); (void)n;
  if (implicit_cast<size_t>(idx) == pollfds_.size()-1)
  {
    pollfds_.pop_back();
  }
  else
  {
	// 這裡移除的演算法複雜度是O(1),將待刪除元素與最後一個元素交換再pop_back
    int channelAtEnd = pollfds_.back().fd;
    iter_swap(pollfds_.begin()+idx, pollfds_.end()-1);
    if (channelAtEnd < 0)
    {
      channelAtEnd = -channelAtEnd-1;
    }
    channels_[channelAtEnd]->set_index(idx);
    pollfds_.pop_back();
  }
}
程式碼中的幾個技巧都在註釋中標出。

Channel類:

Channel是selectable IO channel,負責註冊與響應IO 事件,它不擁有file descriptor。

Channel是Reactor結構中的“事件”,它自始至終都屬於一個EventLoop(一個EventLoop對應多個Channel,處理多個IO),負責一個檔案描述符的IO事件,它包含又檔案描述符fd_,但實際上它不擁有fd_,不用負責將其關閉。在Channel類中儲存這IO事件的型別以及對應的回撥函式,當IO事件發生時,最終會呼叫到Channel類中的回撥函式。Channel類一般不單獨使用,它常常包含在其他類中(Acceptor、Connector、EventLoop、TimerQueue、TcpConnection)使用。Channel類有EventLoop的指標 loop_,通過這個指標可以向EventLoop中添加當前Channel事件。事件型別用events_表示,不同事件型別對應不同回撥函式。

以下兩個都由Channel註冊:

Acceptor是被動連線的抽象--->關注監聽套接字的可讀事件,回撥handleRead

Connector對主動連線的抽象。

時序圖:


Channel.h

#ifndef MUDUO_NET_CHANNEL_H
#define MUDUO_NET_CHANNEL_H

#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/weak_ptr.hpp>
#include <muduo/base/Timestamp.h>

namespace muduo
{
namespace net
{

class EventLoop;

/// A selectable I/O channel.
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class Channel : boost::noncopyable
{
 public:
  typedef boost::function<void()> EventCallback;
  typedef boost::function<void(Timestamp)> ReadEventCallback;

  Channel(EventLoop* loop, int fd);
  ~Channel();

  void handleEvent(Timestamp receiveTime);
  void setReadCallback(const ReadEventCallback& cb)
  { readCallback_ = cb; }
  void setWriteCallback(const EventCallback& cb)
  { writeCallback_ = cb; }
  void setCloseCallback(const EventCallback& cb)
  { closeCallback_ = cb; }
  void setErrorCallback(const EventCallback& cb)
  { errorCallback_ = cb; }

  /// Tie this channel to the owner object managed by shared_ptr,
  /// prevent the owner object being destroyed in handleEvent.
  void tie(const boost::shared_ptr<void>&);

  int fd() const { return fd_; }
  int events() const { return events_; }
  void set_revents(int revt) { revents_ = revt; } // used by pollers
  // int revents() const { return revents_; }
  bool isNoneEvent() const { return events_ == kNoneEvent; }

  void enableReading() { events_ |= kReadEvent; update(); }
  // void disableReading() { events_ &= ~kReadEvent; update(); }
  void enableWriting() { events_ |= kWriteEvent; update(); }
  void disableWriting() { events_ &= ~kWriteEvent; update(); }
  void disableAll() { events_ = kNoneEvent; update(); }
  bool isWriting() const { return events_ & kWriteEvent; }

  // for Poller
  int index() { return index_; }
  void set_index(int idx) { index_ = idx; }

  // for debug
  string reventsToString() const;

  void doNotLogHup() { logHup_ = false; }

  EventLoop* ownerLoop() { return loop_; }
  void remove();

 private:
  void update();
  void handleEventWithGuard(Timestamp receiveTime);

  static const int kNoneEvent;
  static const int kReadEvent;
  static const int kWriteEvent;

  EventLoop* loop_;			// 所屬EventLoop
  const int  fd_;			// 檔案描述符,但不負責關閉該檔案描述符
  int        events_;		// 關注的事件
  int        revents_;		// poll/epoll返回的事件
  int        index_;		// used by Poller.表示在poll的事件陣列中的序號
  bool       logHup_;		// for POLLHUP

  boost::weak_ptr<void> tie_;
  bool tied_;
  bool eventHandling_;		// 是否處於處理事件中
  ReadEventCallback readCallback_;
  EventCallback writeCallback_;
  EventCallback closeCallback_;
  EventCallback errorCallback_;
};

}
}
#endif  // MUDUO_NET_CHANNEL_H
Channel.c
#include <muduo/base/Logging.h>
#include <muduo/net/Channel.h>
#include <muduo/net/EventLoop.h>
#include <sstream>
#include <poll.h>

using namespace muduo;
using namespace muduo::net;

const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;

Channel::Channel(EventLoop* loop, int fd__)
  : loop_(loop),
    fd_(fd__),
    events_(0),
    revents_(0),
    index_(-1),
    logHup_(true),
    tied_(false),
    eventHandling_(false)
{
}

Channel::~Channel()
{
  assert(!eventHandling_);
}

void Channel::tie(const boost::shared_ptr<void>& obj)
{
  tie_ = obj;
  tied_ = true;
}

void Channel::update()
{
  loop_->updateChannel(this);
}

// 呼叫這個函式之前確保呼叫disableAll
void Channel::remove()
{
  assert(isNoneEvent());
  loop_->removeChannel(this);
}

void Channel::handleEvent(Timestamp receiveTime)
{
  boost::shared_ptr<void> guard;
  if (tied_)
  {
    guard = tie_.lock();
    if (guard)
    {
      handleEventWithGuard(receiveTime);
    }
  }
  else
  {
    handleEventWithGuard(receiveTime);
  }
}

void Channel::handleEventWithGuard(Timestamp receiveTime)
{
  eventHandling_ = true;
  if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
  {
    if (logHup_)
    {
      LOG_WARN << "Channel::handle_event() POLLHUP";
    }
    if (closeCallback_) closeCallback_();
  }

  if (revents_ & POLLNVAL)
  {
    LOG_WARN << "Channel::handle_event() POLLNVAL";
  }

  if (revents_ & (POLLERR | POLLNVAL))
  {
    if (errorCallback_) errorCallback_();
  }
  if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
  {
    if (readCallback_) readCallback_(receiveTime);
  }
  if (revents_ & POLLOUT)
  {
    if (writeCallback_) writeCallback_();
  }
  eventHandling_ = false;
}

string Channel::reventsToString() const
{
  std::ostringstream oss;
  oss << fd_ << ": ";
  if (revents_ & POLLIN)
    oss << "IN ";
  if (revents_ & POLLPRI)
    oss << "PRI ";
  if (revents_ & POLLOUT)
    oss << "OUT ";
  if (revents_ & POLLHUP)
    oss << "HUP ";
  if (revents_ & POLLRDHUP)
    oss << "RDHUP ";
  if (revents_ & POLLERR)
    oss << "ERR ";
  if (revents_ & POLLNVAL)
    oss << "NVAL ";

  return oss.str().c_str();
}
這三個類之間的關係不難理解,其實本質就是一個Poll/Epoll,只不過進行了更高的抽象後劃分出來的這些類,重點理解部落格開頭的那張類圖即可。

參考:

《Muduo使用手冊》

《Linux多執行緒服務端程式設計》