1. 程式人生 > >muduo的reactor模式基本實現

muduo的reactor模式基本實現

這幾天一直在看muduo的Eventloop(事件迴圈)這一塊的原始碼,感覺裡面有好多東西例如:智慧指標的使用,將eventfd,timerfd等linux新效能運用進去,C++一些容器的合理使用,還有如何能在多執行緒情況下減少鎖的使用等都是我們應該學習的東西。

關於muduo實現的reactor模式,有三個關鍵的類

.事件分發器類Channel
.封裝了I/O複用的Poller
.定時器介面類TimerQueue

接下來就給大家先一一介紹這幾個類,然後才給大家介紹reactor的基本構成

1.事件分發類Channel

事件分發器Channel的資料成員如下

//定義事件型別變數
            static
const int kNoneEvent; //無事件 static const int kReadEvent; //可讀事件 static const int kWriteEvent; //可寫事件 EventLoop *loop_; //channel所屬的loop const int fd_; //channel負責的檔案描述符 int events_; //註冊的事件 int
revents_; //就緒的事件 int index_; //被poller使用的下標 bool logHup_; //是否生成某些日誌 boost::weak_ptr<void> tie_; // bool tied_; bool eventHandling_; bool addedToLoop_; ReadEventCallback readCallback_; //讀事件回撥
EventCallback writeCallback_; //寫事件回撥 EventCallback closeCallback_; //關閉事件回撥 ReadEventCallback errorCallback_; //錯誤事件回撥

其中EventCallback和ReadEventCallback的宣告如下

 typedef boost::function<void()> EventCallback;
            typedef boost::function<void(Timestamp)> ReadEventCallback;
        //處理事件
        void handleEvent(Timestamp receiveTime);

        //設定可讀事件回撥
        void setReadCallback(const ReadEventCallback &cb)
        {
            readCallback_ = cb;
        }

        //設定可寫事件回撥
        void setWriteCallback(const EventCallback &cb)
        {
            writeCallback_ = cb;
        }

        //設定關閉事件回撥
        void setCloseCallback(const EventCallback &cb)
        {
            closeCallback_ = cb;
        }

        //設定錯誤事件回撥
        void setErrorCallback(const EventCallback &cb)
        {
            errorCallback_ = cb;
        }

        void tie(const boost::shared_ptr<void>&);



        //返回註冊的事件
        int events()const
        {
            return events_;
        }

        //設定註冊事件
        void set_revents(int revt)
        {
            revents_ = revt;
        }

        //判斷是否註冊的事件
        bool isNoneEvent()const
        {
            return events_ == kNoneEvent;
        }

        //設定可讀事件
        void enableReading()
        {
            events_ |= kReadEvent;
            update();
        }

        //銷燬讀事件
        void disableReading()
        {
            events_ &= ~kReadEvent;
            update();
        }

        //註冊寫事件
        void enableWriting()
        {
            events_ |= kWriteEvent;
         update();   
        }

        //銷燬寫事件
        void disableWriting()
        {
            events_ &= ~kWriteEvent;
            update();
        }

        //銷燬所有事件
        void disableAll()
        {
            events_ = kNoneEvent;
            update();
        }

        //是否註冊可寫事件
        isWriting()const
        {
            return events_ & kWriteEvent;
        }
這裡寫程式碼片

Channel的主要功能為管理各種註冊給poller的套接字描述符及其上發生的事件,以及事件發生了所調的回撥函式

Channel的主要作用如下

1.首先我們給定Channel所屬的loop以及其要處理的fd
2.接著我們開始註冊fd上需要監聽的事件,如果是常用事件(讀寫等)的話,我們可以直接呼叫介面enable***來註冊對應fd上的事件,與之對應的是disable*用來銷燬特定的事件
3.在然後我們通過set***Callback來事件發生時的回撥

2.I/O複用類Poller

Poller類是個基類,它的定義如下

class Poller : boost::noncopyable
    {
        public:
            typedef std::vector<Channel *> ChannelList;

            Poller(EventLoop *loop);

            virtual ~Poller();

            //不許在I/O執行緒中呼叫,I/O複用的封裝
            virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels) = 0;

            //跟新Channel
            virtual void updateChannel(Channel *channel) = 0;

            //移除Channel
            virtual void removeChannel(Channel *channel) = 0;

            //這個channel是否在map中存在
            virtual bool hasChannel(Channel *channel)const;

            //預設poller方式
            static Poller *newDefaultPoller(EventLoop *loop);

            void assertInLoopThread()const
            {
                ownerLoop_->assertInLoopThread();
            }

        protected:
            typedef std::map<int,Channel*> ChannelMap;
            ChannelMap Channels_;           //儲存事件分發器的map
        private:
            EventLoop *owerLoop_;           //屬於哪個loop
    };

需要注意的是我們的事件分發器channel集用關聯容器map來儲存,map的關鍵字為channel所管理的fd,這樣我們在更新已有的channel時時間複雜多會將為O(1)

接下來我們談談epoll對Poller的實現
EPollPoller類的定義為

class EPollPoller : public Poller
{
    public:
        EPollPoller(EventLoop *loop);
        virtual Timestamp poll(int timeoutMs,ChannelList *activeChannels);   //內部呼叫epoll_wait函式

        virtual void updateChannel(Channel *channel);

        virtual void removeChannel(Channel *Channel);       
    private:
        static const int kInitEventListSize = 16;           //epoll事件表的大小

        static const char *operatoionToString(int op);

        void fillActiveChannels(int numEvents,ChannelList *activeChannels)const; //將epoll返回的活躍事件填充到activeChannel

        void update(int operation,Channel *channel);    //對Channel的更改操作

        typedef std::vector<struct epoll_event> EventList;

        int epollfd_;       //epoll的事件表fd

        EventList events_;  //epoll事件陣列
};

Poller類的主要功能如下

.呼叫poll函式監聽註冊了事件的檔案描述符
.當poll返回時將發生事件的事件集裝入channel中
.可以控制channel中事件的增刪改

3.定時器TimerQueue

EventLoop直接呼叫的是定時器佇列類TimerQueue類,該類的資料定義如下

typedef std::pair<Timestamp,Timer *> Entry;     //定時器入口
            typedef sta::set<Entry> TimerList;              //定時器集合
            typedef std::pair<Timer *,int64_t> ActiveTimer; //到時的定時器
            typedef std::set<ActiveTimeri> ActiveTimerSet;  //活躍的定時器集合

EventLoop *loop_;       //定時器所在的I/O迴圈
            const int timerfd_;     //定時器描述符
            Channel timerfdChannel_;

            TimerList timers_;      //定時器集合

            //for cancel()
            ActiveTimerSet activeTimers_;
            bool callingExpiredTimers_;
            ActiveTimerSet cancelingTimers_;

由上圖我們發現我們用來儲存定時器集的容器使用了set,set針對定時器有它天然的優勢,首先set的特性是所儲存的元素為預設升序的,這樣當我們某個事件點取到期的定時器,就直接取該時間點之前的所有定時器就好,其次我們往set中新增定時器的效率也相對較好為0(logn)。但是用set有個問題,我們如何儲存倆個定時時間一樣的定時器呢?muduo的解決方案就是使用一個pair型別,pair為pair

增加一個定時器
 TimerId addTimer(const TimerCallback &cb,
                            Timestamp when,
                            double interval);

刪除一個定時器
            void cancel(TimerId timerId);

上述介面函式,都不是執行緒安全的,但是muduo中並沒有用加鎖來解決問題,而是通過EventLoop中runInloop函式來使,上述函式在主I/O執行緒中執行,這樣做的好處是,我們可以在上述函式實現中不使用鎖,程式在執行過程中就會減少由於鎖而引起的上下文切換,由於主I/O執行緒也不一定是一直忙,所以這種做法可能會在效率上有所提升

4.EventLoop類的實現

EventLoop類實現了reactor的基本模式
它的資料定義如下

typedef std::vector<Channel *> ChannelList; //事件分發器列表

            bool looping_;                  //是否在執行
            bool quit_;                     //是否退出事件迴圈
            bool eventHandling_;
            bool callingPendingFunctors_;
            int64_t iteration_;             //事件迴圈的次數
            const pid_t threadId_;          //執行loop的執行緒ID
            Timestamp pollReturnTime_;      //poll阻塞的時間
            boost::scoped_ptr<Poller> poller_;  //IO複用
            boost::scoped_ptr<TimerQueue> TimerQueue_;//定時器佇列
            int wakeupFd_;                            //喚醒套接字
            boost::scoped_ptr<Channel> wakeupChannel_; //封裝事件描述符 
            boost::any context_;

            ChannelList activeChannel_;             //以活躍的事件集
            Channel *currentActiveChannel_;         //當前處理的事件集

            MutexLock mutex_;                       //封裝的互斥鎖
            std::vector<Functor> pendingFunctors_;  //需要在主I/O執行緒執行的任務

EventLoop通過boost庫下的智慧指標scoped_ptr來管理Poller_,TimerQueue_,wakeupChannel_物件,這樣不容易發生記憶體顯露,其中變數pendingFunctors_為需要在I/O執行緒中執行的任務集,例如上面所講的定時器的增刪介面的執行,就會先放在此集合裡,然後有主I/O執行緒來執行,那麼主執行緒在呼叫loop函式之後會阻塞在poller函式中,此時我們應該如何喚醒I/O執行緒呢?muduo中採用了linux的新特性eventfd來喚醒I/O執行緒,他的具體用法在我的前幾篇部落格中有介紹

Eventloop的主要功能如下

.首先我們因該呼叫updateChannel來新增一些事件
.接著我們就可以呼叫loop函式來執行事件迴圈了,在執行事件迴圈的過程中,我們會阻塞在poller_poll呼叫處,當有事件發生時,Poller類就會把活躍的事件放在activeChannel集合中
.之後我們呼叫Channel中的handleEvent來處理事件發生時對應的回撥函式,處理完事件函式後還會處理必須有I/O執行緒來完成的doPendingFuncors函式

當然我們可以在中間的過程中註冊一些普通事件或通過run*類函式來註冊定時事件,我們也可以呼叫updateChannel和removeChannel來增刪該Channel

EventLoop的原始碼如下

EventLoop.h

#ifndef MUDUO_NET_EVENTLOOP_H
#define MUDUO_NET_EVENTLOOP_H

#include <vector>

#include <boost/any.hpp>
#include <boost/function.hpp>
#include <boost/noncopyable.hpp>
#include <boost/scoped_ptr.hpp>

#include <muduo/base/Mutex.h>
#include <muduo/base/CurrentThread.h>
#include <muduo/base/Timestamp.h>
#include <muduo/net/Callbacks.h>
#include <muduo/net/TimerId.h>

namespace muduo
{
namespace net
{
    class Channel;
    class Poller;
    Class TimerQueue;

    class EventLoop : boost::noncopyable
    {
        public:
            typedef boost::function<void()> Functor;

            EventLoop();
            ~EventLoop();

            //主迴圈
            void loop();

            //退出主迴圈
            void quit();

            //poll延遲的時間
            Timestamp pollReturnTime()const
            {
                return pollReturnTime_;
            }

            //迭代次數
            int64_t iteration()const
            {
                return iteration_;
            }

            //在主迴圈中執行
            void runInLoop(const Functor &cb);

            //插入主迴圈任務佇列
            void queueInLoop(const Functor &cb);

            //某個時間點執行定時回撥
            TimerId runAt(const Timestamp &time,const TimerCallback &cb);

            //某個時間點之後執行定時回撥
            TimerId runAfter(double delay,const TimerCallback & cb);

            //在每個時間間隔處理某個回撥事件
            TimerId runEvery(double interval,const TimerCallback &cb);

            //刪除某個定時器
            void cancel(TimerId timerId);

            //喚醒事件通知描述符
            void wakeup();

            //跟新某個事件分發器
            void updateChannel(Channel *channel);

            //移除某個事件分發器
            void removeChannel(Channel *channel);

            bool hasChannel(Channel *channel);

            //如果不在I/O執行緒中則退出程式
            void assertInLoopThread()
            {
                if(!isInLoopThread())
                {
                    abortNotInLoopThread();
                }
            }

           // 檢測是否在I/O執行緒中
            bool isInLoopThread()const
            {
                return threadId_ == CurrentThread::tid();
            }

            //是否正在處理事件
            bool eventHandling()const { return eventHandling_;}

            void setContext(const boost::any &contex)
            {
                return context_;
            }

            const boost::any &getContext()const
            {
                return context_;
            }

            boost::any *getMutableContext()
            {
                return &context_;
            }

            //判斷當前執行緒是否為I/O執行緒
            static EventLoop *getEventLoopOfCurrentThread();

        private:
            //不在主I/O執行緒
            void abortNotInLoopThread();

            //將事件通知描述符裡的內容讀走,以便讓其繼續檢測事件通知
            void handleRead();

            //執行轉交給I/O的任務
            void doPendingFunctors();

            //將發生的事件寫入日誌
            void printActiveChannels()const;

            typedef std::vector<Channel *> ChannelList; //事件分發器列表

            bool looping_;                  //是否在執行
            bool quit_;                     //是否退出事件迴圈
            bool eventHandling_;
            bool callingPendingFunctors_;
            int64_t iteration_;             //事件迴圈的次數
            const pid_t threadId_;          //執行loop的執行緒ID
            Timestamp pollReturnTime_;      //poll阻塞的時間
            boost::scoped_ptr<Poller> poller_;  //IO複用
            boost::scoped_ptr<TimerQueue> TimerQueue_;//定時器佇列
            int wakeupFd_;                            //喚醒套接字
            boost::scoped_ptr<Channel> wakeupChannel_; //封裝事件描述符 
            boost::any context_;

            ChannelList activeChannel_;             //以活躍的事件集
            Channel *currentActiveChannel_;         //當前處理的事件集

            MutexLock mutex_;                       //封裝的互斥鎖
            std::vector<Functor> pendingFunctors_;  //需要在主I/O執行緒執行的任務

    };
}
}
#endif MUDUO_NET_ENENTLOOP_H

EventLoop.cc

#include <muduo/net/EventLoop.h>

#include <muduo/base/Logging.h>
#include <muduo/base/Mutex.h>
#include <muduo/net/Channel.h>
#include <muduo/net/Poller.h>
#include <muduo/net/SocketsOps.h>
#include <muduo/net/TimerQueue.h>

#include <boost/bind.hpp>

#include <signal.h>
#include <sys/eventfd.h>

using namespace muduo;
using namespace muduo::net;

namespace
{
    __thread EventLoop *t_loopInThisThread = 0;

    const int kPollTimeMs = 10000;

    int createEventfd()
    {
        int evtfd = ::eventfd(0,EFD_NONBLOCK | EFD_CFD_CLOEXEC);

        if(evtfd < 0)
        {
            LOG_SYSERR << "Failed in eventfd";
            abort();
        }
        return evtfd;
    }
}


EventLoop *EventLoop::getEventLoopOfCurrentThread()
{
    return t_loopInThisThread;
}

EventLoop::EventLoop()
    :looping_(false),
    quit_(false),
    eventHandling_(false),
    callingPendingFunctors_(false),
    iteration_(0),
    threadId_(CurrentThread::tid()),
    poller_(Poller::newDefaultPoller(this)),
    timerQueue_(new TimerQueue(this)),
    wakeupFd_(createEventfd()),
    wakeupChannel_(new Channel(this,wakeupFd_)),
    currentActiveChannel_(NULL)
{
    LOG_DEBUG << "EventLoop created " << this << "in thread" << threadId_;
    if(t_loopInThisThread)
    {
        LOG_FATAL << "Another EventLoop " << t_loopInThisThread << "exists in this thread" << threadId_;
    }

    else
    {
        t_loopInThisThread = this;
    }

    //註冊讀完成時的回撥函式
    wakeupChannel_->setReadCallback(boost::bind(&EventLoop::handleRead,this));

    //註冊可讀事件
    wakeupChannel_->enableReading();
}


EventLoop::~EventLoop()
{
    LOG_DEBUG << "EventLoop" <<this<<"of thread" << threadId_
              <<"destructs in thread" << CurrentThread::tid();
    wakeupChannel_->disableAll();
    wakeupChannel_->remove();
    ::close(wakeupFd_);
    t_loopInThisThread = NULL;
}

void EventLoop::loop()
{
    assert(!looping_);
    assertInLoopThread();
    looping_ = true;
    quit_ = false;

    LOG_TRACE << "EventLoop" << this << "start looping";

    while(!quit_)
    {
        activeChannels_.clear();
        pollReturnTime_ = poller_->poll(kPollTimeMs,&activeChannels_);
        //計算執行多少次I/O返回
        if(Logger::LogLevel() <= Logger::TRACE)
        {
            printActiveChannels();
        }

        eventHandling_ = true;

        //處理就緒事件
        for(ChannelList::iterator it = activeChannels_.begin();
                it != activeChannels_.end(); ++it)
        {
            currentActiveChannel_ = *it;
            currentActiveChannel_->handleEvent(pollReturnTime_);
        }

        currentActiveChannel_ = NULL;
        eventHandling_ = false;
        //處理一些其他任務
        doPendingFunctors();
    }

    LOG_TRACE << "EventLoop" << this << "stop looping";
    looping_ = false;
}

void EventLoop::quit()
{
    quit_ = true;

    if(!isInLoopThread())
    {
        wakeup();
    }
}

void EventLoop::runInLoop(const Functor &cb)
{
    if(isInLoopThread())
    {
        cb();
    }
    else
    {
        queueInLoop(cb);
    }
}


void EventLoop::queueInLoop(const Functor &cb)
{
    {
        MutexLockGuard lock(mutex_);
        pendingFunctors_.push_back(cb);
    }

    if(!isInLoopThread() || callingPendingFunctors_)
    {
        wakeup();
    }
}


TimerId EventLoop::runAt(const Timestamp &time,const TimerCallback &cb)
{
    return timerQueue_->addTimer(cb,time,0.0);
}

TimerId EventLoop::runAfter(double delay,const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now(),delay));
    return runAt(time,cb);
}

TimerId EventLoop::runEvery(double interval,const TimerCallback &cb)
{
    Timestamp time(addTime(Timestamp::now()),interval);
    return timerQueue_->addTimer(cb,time,interval);
}


void  EventLoop::cancel(TimerId timerId)
{
    return timerQueue_->cancel(timerId);
}


void EventLoop::updateChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    poller_->updateChannel(channel);
}


void EventLoop::removeChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    if(eventHandling_)
    {
        assert(currentActiveChannel_ == channel ||
                std::find(activeChannels_.begin(),activeChannels_.end(),channel) == activeChannels_.end());
    }

    poller_->removeChannel(channel);
}

bool EventLoop::hasChannel(Channel *channel)
{
    assert(channel->ownerLoop() == this);
    assertInLoopThread();
    return poller_->hasChannel(channel);
}

void EventLoop::abortNotInLoopThread()
{
    LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop" << this 
              << "was created in threadId_ = " << threadId_
              << ",current thread id = " << CurrentThread::tid();
}

void EventLoop::wakeup()
{
    uint64_t one = 1;
    ssize_t n = sockets::write(wakeupFd_,&one,sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR << "EventLoop::wakeup() writes" << n << "bytes instead of 8";
    }
}

void EventLoop::handleRead()
{
    uint64_t one = 1;
    ssize_t n = sockets::read(wakeupFd_,&one,sizeof one);
    if(n != sizeof one)
    {
        LOG_ERROR << "EventLoop::handleRead() reads" << n << "bytes instead of 8";
    }
}

void EventLoop::doPendingFunctors()
{
    std::vector<Functor> Functors;
    callingPendingFunctors_ = true;

    {
        MutexLockGuard lock(mutex_);
        Functors.swap(pendingFunctors_);    //提高效率且防止死鎖
    }

    for(ssize_t i = 0;i < Functors.size();++i)
    {
        Functors[i]();
    }
    callingPendingFunctors_ = false;
}