【muduo庫學習】實現最簡單的reactor模式
阿新 • • 發佈:2019-02-15
《linux多執行緒服務端程式設計 使用muduo c++網路庫》的第8章是從0開始講述一個網路庫的實現,比較適合初學者入門。
在本書的第8章中是實現了以下幾個類:eventloop類,poller類,channel類
首先分析channel類的定義:
fd_是channel類的私有變數,一個channel物件是跟檔案控制代碼fd直接掛鉤的。channel類的建構函式是需要loop 和 fd兩個引數,因此每一個關注的fd都會建立一個專門的channel物件來對其負責。#ifndef MUDUO_NET_CHANNEL_H #define MUDUO_NET_CHANNEL_H #include <boost/function.hpp> #include <boost/noncopyable.hpp> namespace muduo { class EventLoop; /// /// A selectable I/O channel. /// /// This class doesn't own the file descriptor. /// The file descriptor could be a socket, /// an eventfd, a timerfd, or a signalfd class Channel : boost::noncopyable { public: typedef boost::function<void()> EventCallback; Channel(EventLoop* loop, int fd); void handleEvent(); void setReadCallback(const EventCallback& cb) { readCallback_ = cb; } void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; } void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; } int fd() const { return fd_; } int events() const { return events_; } void set_revents(int revt) { revents_ = revt; } bool isNoneEvent() const { return events_ == kNoneEvent; } void enableReading() { events_ |= kReadEvent; update(); } // void enableWriting() { events_ |= kWriteEvent; update(); } // void disableWriting() { events_ &= ~kWriteEvent; update(); } // void disableAll() { events_ = kNoneEvent; update(); } // for Poller int index() { return index_; } void set_index(int idx) { index_ = idx; } EventLoop* ownerLoop() { return loop_; } private: void update(); static const int kNoneEvent; static const int kReadEvent; static const int kWriteEvent; EventLoop* loop_; const int fd_; int events_; int revents_; int index_; // used by Poller. EventCallback readCallback_; EventCallback writeCallback_; EventCallback errorCallback_; }; } #endif // MUDUO_NET_CHANNEL_H
接下來看看poller類的定義:
class Poller : boost::noncopyable { public: typedef std::vector<Channel*> ChannelList; Poller(EventLoop* loop); ~Poller(); /// Polls the I/O events. /// Must be called in the loop thread. Timestamp poll(int timeoutMs, ChannelList* activeChannels); /// Changes the interested I/O events. /// Must be called in the loop thread. void updateChannel(Channel* channel); void assertInLoopThread() { ownerLoop_->assertInLoopThread(); } private: void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; typedef std::vector<struct pollfd> PollFdList; typedef std::map<int, Channel*> ChannelMap; EventLoop* ownerLoop_; PollFdList pollfds_; ChannelMap channels_; }; } #endif // MUDUO_NET_POLLER_H
poller類的實現
#include "Poller.h" #include "Channel.h" #include "logging/Logging.h" #include <assert.h> #include <poll.h> using namespace muduo; Poller::Poller(EventLoop* loop) : ownerLoop_(loop) { } Poller::~Poller() { } Timestamp Poller::poll(int timeoutMs, ChannelList* activeChannels) { // XXX pollfds_ shouldn't change int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs); Timestamp now(Timestamp::now()); if (numEvents > 0) { LOG_TRACE << numEvents << " events happended"; fillActiveChannels(numEvents, activeChannels); } else if (numEvents == 0) { LOG_TRACE << " nothing happended"; } else { LOG_SYSERR << "Poller::poll()"; } return now; } void Poller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { for (PollFdList::const_iterator pfd = pollfds_.begin(); pfd != pollfds_.end() && numEvents > 0; ++pfd) { if (pfd->revents > 0) { --numEvents; ChannelMap::const_iterator ch = channels_.find(pfd->fd); assert(ch != channels_.end()); Channel* channel = ch->second; assert(channel->fd() == pfd->fd); channel->set_revents(pfd->revents); // pfd->revents = 0; activeChannels->push_back(channel); } } } void Poller::updateChannel(Channel* channel) { assertInLoopThread(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events(); if (channel->index() < 0) { // a new one, add to pollfds_ assert(channels_.find(channel->fd()) == channels_.end()); struct pollfd pfd; pfd.fd = channel->fd(); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; pollfds_.push_back(pfd); int idx = static_cast<int>(pollfds_.size())-1; channel->set_index(idx); channels_[pfd.fd] = channel; } else { // update existing one assert(channels_.find(channel->fd()) != channels_.end()); assert(channels_[channel->fd()] == channel); int idx = channel->index(); assert(0 <= idx && idx < static_cast<int>(pollfds_.size())); struct pollfd& pfd = pollfds_[idx]; assert(pfd.fd == channel->fd() || pfd.fd == -1); pfd.events = static_cast<short>(channel->events()); pfd.revents = 0; if (channel->isNoneEvent()) { // ignore this pollfd pfd.fd = -1; } } }
poller物件是有pollfdlist,channels這樣的私有成員。在poller::updatechannel中,會把channel物件中的fd抽取出來,組成pollfd結構體,然後新增到pollfds_陣列中。並且會對pollfdlist,channels等私有成員進行相應的修改,所以poller::update是將channel和poll聯絡起來的關鍵函式。
而在poller::poll中則會呼叫poll系統呼叫來完成關心事件的IO多路複用,並且將當前有活動的事件新增到activechannels中。
最後在eventloop::loop()中會呼叫相應事件的處理方法:
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
}
LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;
}
通過以上幾個簡單的類,即可實現最簡單的reactor模式。
再來看簡單的測試程式:
#include "Channel.h"
#include "EventLoop.h"
#include <stdio.h>
#include <sys/timerfd.h>
muduo::EventLoop* g_loop;
void timeout()
{
printf("Timeout!\n");
g_loop->quit();
}
int main()
{
muduo::EventLoop loop;
g_loop = &loop;
int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
muduo::Channel channel(&loop, timerfd);
channel.setReadCallback(timeout);
channel.enableReading();
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec = 5;
::timerfd_settime(timerfd, 0, &howlong, NULL);
loop.loop();
::close(timerfd);
}
上述程式中是將timerfd交由eventloop管理,並且編寫其handler函式timeout()。eventloop會完成事件的檢測和分發。