moduo網路庫的reactor模式(中)
moduo網路庫的reactor模式基本構成為“non-blocking I/O + I/O multiplexing”,程式的基本結構是一個事件迴圈(event loop),以事件驅動(event-driven)和事件回撥(event callback)的方式實現業務邏輯。在moduo網路庫的reactor模式(上)理清了事件迴圈(event loop)的基本框架,在此基礎上此文加上事件驅動(event-driven)和事件回撥(event callback)結構,即reactor最核心的I/O多路複用(I/O mutiplexing)和事件分發(dispatching)機制:進行I/O多路複用並將拿到的I/O事件分發給各個檔案描述符(fd)的事件處理函式。
1、事件驅動與分發:I/O多路複用與事件分發
(1)Channel類:封裝檔案描述符(fd)實現事件分發
每個Channel物件都只屬於某一個EventLoop,因此只屬於某一個I/O執行緒。每個Channel物件只負責一個檔案描述符的I/O事件分發,Channel會把不同的I/O事件(讀、寫、錯誤等)分發為不同的回撥。Channel類就是對檔案描述符的封裝,建構函式Channel(EventLoop* loop, int fd)即將此Channel物件與唯一所屬的EventLoop以及檔案描述符(fd)綁定了起來。Channel類資料成員events_表示fd事件,用於更新I/O多路複用poll(2)。資料成員revents_表示現正要執行的fd事件,用於事件回撥。在目前程式中
1)Channel::enableReading()、Channel::enableWriting()等為設定檔案描述符(fd)事件的介面函式:
首先設定fd事件events_,然後執行update()將該Channel的新事件更新到I/O多路複用器poll(2)(update()是通過資料成員EventLoop* loop_,即自己所屬的EventLoop物件指標呼叫EventLoop::updateChannel(),再呼叫Poller::updateChannel()間接更新poll(2)中的事件。此處疑問:為什麼不直接在Channel中新增資料成員Poller指標直接更新事件到poll(2),而是要繞一圈間接更新事件?)
void enableReading() { events_ |= kReadEvent; update(); }
// void enableWriting() { events_ |= kWriteEvent; update(); }
void Channel::update()
{
loop_->updateChannel(this);
}
2)Channel::setReadCallback(const EventCallback& cb)、Channel::setWriteCallback(const EventCallback& cb)、Channel::setErrorCallback(const EventCallback& cb)為設定fd對應事件的使用者回撥函式的介面函式。
void setReadCallback(const EventCallback& cb) { readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb) { writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb) { errorCallback_ = cb; }
3)Channel::handleEvent()是Channel的核心,實現事件分發功能,它由EventLoop::loop()呼叫,它的功能是根據revents_的值分別呼叫不同的使用者呼叫。而revents_則是在Poller::poll()中得以更新的。
void Channel::handleEvent()
{
if (revents_ & POLLNVAL)
LOG_WARN << "Channel::handle_event() POLLNVAL";
if (revents_ & (POLLERR | POLLNVAL))
if (errorCallback_)
errorCallback_();
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
if (readCallback_)
readCallback_();
if (revents_ & POLLOUT)
if (writeCallback_)
writeCallback_();
}
//inside of function EventLoop::loop():
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
doPendingFunctors();
}
(2)Poller類:封裝I/O多路複用poll(2)實現I/O多路複用
首先看系統呼叫poll(2)的函式原型為
#include <poll.h>
int poll(struct pollfd fd[], nfds_t nfds, int timeout);
struct pollfd的結構為
struct pollfd{
int fd; // 檔案描述符
short event;// 請求的事件
short revent;// 返回的事件
}
1)為poll(2)提供資料準備:
Poller封裝類首先是通過Poller::updateChannel(Channel* channel)將檔案描述符(fd)封裝類Channel儲存到資料成員std::map<int, Channel*> ChannelMap中(具體實現過程大致為:使用者呼叫Channel::update()----->EventLoop::updateChannel()----->Poller::updateChannel())。接著則可將各個Channel物件的資料成員更新給poll(2),並在每次poll(2)後更新各個Channel的revents_。
2)封裝I/O多路複用(I/O multiplexing):
Poller::poll(int timeoutMs, ChannelList* activeChannels)函式中首先進行I/O多路複用poll(2),所需引數從資料成員std::map<int, Channel*> ChannelMap中獲得,阻塞直到有fd事件發生或定時時間到時,返回事件發生數量numEvents。然後繼續執行內部函式Poller::fillActiveChannels(numEvents, activeChannels)將可發生fd事件對應的Channel反饋給外界,即在資料成員std::map<int, Channel*> ChannelMap中找出事件可發生的fd對應的Channel,存放到外部引數ChannelList* activeChannels中。
注意 2)中並未將I/O多路複用與事件分發合在一起,而是隻實現了I/O多路複用,事件分發Channel::handleEvent()則是在EventLoop::loop()中實現。一方面是程式安全方面的考慮,另一方面則是為了方便替換為其他更高效的I/O多路複用機制,如epoll(4)。
至此,一個完整的Reactor模式基本框架就完成了。
2、測試:利用timerfd(2)
有了Reactor基本框架後,我們使用timerfd給EventLoop加上一個定時器功能,對這個框架進行測試。
#include <sys/timerfd.h>
int timerfd_create(int clockid, int flags);
int timerfd_settime(int fd, int flags, const struct itimerspec *new_value, struct itimerspec *old_value);
int timerfd_gettime(int fd, struct itimerspec *curr_value);
傳統的Reactor通過控制poll(2)或select(2)的等待時間來實現定時,而現在linux有了timerfd,我們可以用和處理I/O事件相同的方式來處理定時。具體原因參考文章Muduo 網路程式設計示例之三:定時器
#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include <sys/timerfd.h>
#include <unistd.h>
#include <string.h>
#include <memory>
#include <iostream>
using namespace std;
EventLoop* loop;
void timeout()
{
cout<<"tid "<<CurrentThreadtid()<<": Timeout!"<<endl;
loop->quit();
}
int main()
{
EventLoopThread ELThread;
loop = ELThread.startLoop();//thread2
int timerfd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK|TFD_CLOEXEC);
struct itimerspec howlong;
bzero(&howlong, sizeof howlong);
howlong.it_value.tv_sec=3;
timerfd_settime(timerfd,0,&howlong,NULL);
Channel channel(loop,timerfd);
channel.setReadCallback(timeout);
channel.enableReading();
sleep(5);//ensure the main thread do not exit faster than thread2
close(timerfd);
return 0;
}
[email protected]:~/Documents/Reactor/s1.1$ g++ -std=c++11 -pthread -o test1 MutexLockGuard.hpp Condition.hpp Thread.hpp Thread.cpp Channel.hpp Channel.cpp EventLoop.hpp EventLoop.cpp Poller.hpp Poller.cpp EventLoopThread.hpp testEventLoopThread.cpp
[email protected]:~/Documents/Reactor/s1.1$ /usr/bin/valgrind ./testTimerDemo
==25681== Memcheck, a memory error detector
==25681== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==25681== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==25681== Command: ./testTimerDemo
==25681==
tid 25681: create a new thread
tid 25681: waiting
tid 25682: Thread::func_() started!
tid 25682: notified
tid 25681: received notification
tid 25682: start looping...
tid 25682: Timeout!
tid 25682: end looping...
tid 25682: Thread end!
==25681==
==25681== HEAP SUMMARY:
==25681== in use at exit: 0 bytes in 0 blocks
==25681== total heap usage: 16 allocs, 16 frees, 74,552 bytes allocated
==25681==
==25681== All heap blocks were freed -- no leaks are possible
==25681==
==25681== For counts of detected and suppressed errors, rerun with: -v
==25681== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
muduo將timerfd封裝到channel,此流程和普通檔案描述符的處理過程一致,也是將timerfd放入到I/O多路複用,其實現思路類似下一小節。然後建立TimerQueue類實現定時器功能。此處省略。
3、使用者喚醒執行緒(擴充套件功能):利用eventfd(2)
由於I/O執行緒平時阻塞在事件迴圈EventLoop::loop()的poll(2)呼叫中,為了讓I/O執行緒能立刻執行使用者回撥,我們需要設法喚醒它。這裡用到了eventfd(2)。該函式返回一個檔案描述符,類似於其他的檔案描述符操作,可以對該描述符進行一系列的操作,如讀、寫、poll、select等,當然這裡我們僅僅考慮read、write。
#include <sys/eventfd.h>
int eventfd(unsigned int initval, int flags);
程式中,在EventLoop建構函式中則已完成構造eventfd儲存到EventLoop::wakeupFd_、封裝成Channel儲存到EventLoop::wakeupChannel_、更新eventfd的讀事件到I/O多路複用poll(2)中。當poll(2)響應eventfd的讀事件(使用者觸發)時回撥EventLoop::handleRead()函式(eventfd讀操作)讀取wakeup()(eventfd寫操作)時所寫資料。此過程即為喚醒。注意此流程和普通檔案描述符的處理過程一致,也是將喚醒操作也放入到I/O多路複用。
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = ::read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = ::write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
在目前程式中,EventLoop::runInLoop(const Functor& cb)為介面函式,供使用者呼叫,在EventLoop的I/O執行緒內執行某個使用者任務回撥。注意這個介面函式EventLoop::runInLoop(const Functor& cb)允許跨執行緒使用,這就帶來了執行緒安全性方面的問題,此處的解決辦法不是加鎖,而是把對回撥函式cb()的操作轉移到I/O執行緒來進行:
(1)如果使用者在當前I/O執行緒呼叫這個函式,回撥會同步進行(即執行cb();)。
(2)如果使用者在其他執行緒呼叫這個函式,cb會被加入佇列,I/O執行緒會被喚醒來呼叫這個回撥函式(即執行EventLoop::queueInLoop(cb);)。有了這個功能,我們就能輕易地線上程間調配任務,這樣可以在不用鎖的情況下保證執行緒安全性。
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
cb();
else
queueInLoop(cb);
}
在上述(2)情況下,EventLoop::queueInLoop(cb)函式內部首先將回調函式指標cb儲存到函式佇列std::vector<Functor> pendingFunctors_中,然後執行wakeup()(eventfd寫操作)喚醒所屬I/O執行緒。執行緒被喚醒之後則會從I/O多路複用poll(2)中返回,執行後續的EventLoop::doPendingFunctors();遍歷函式佇列執行函式。
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
wakeup();
}
//inside of EventLoop::loop()
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
(*it)->handleEvent();
}
doPendingFunctors();
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
值得學習的地方是在EventLoop::doPendingFunctors()函式中,不是簡單地在臨界區內依次呼叫Functor,而是把回撥佇列swap()到區域性變數functors中,既減小了臨界區的長度,也避免了死鎖,同時也對 EventLoop::pendingFunctors_進行了清空。
3、程式測試
測試檔案test.cpp
#include "EventLoopThread.hpp"
#include "EventLoop.hpp"
#include "Thread.hpp"
#include <iostream>
#include <memory>
using namespace std;
void test()
{
cout<<"tid "<<CurrentThreadtid()<<": runInLoop..."<<endl;
}
int main()
{
cout<<"Main: pid: "<<getpid()<<" tid: "<<CurrentThreadtid()<<endl;//main thread
//sleep(1);
EventLoopThread ELThread1;
EventLoop* loop1 = ELThread1.startLoop();//thread 2
sleep(1);
loop1->runInLoop(test);
EventLoopThread ELThread2;
EventLoop* loop2 = ELThread2.startLoop();//thread 3
sleep(1);
loop2->runInLoop(test);
loop1->loop(); //test "one thread one loop"
loop2->loop(); //test "one thread one loop"
sleep(1);
//loop1->quit();
loop1->runInLoop(bind(&EventLoop::quit,loop1));
//loop2->quit();
loop2->runInLoop(bind(&EventLoop::quit,loop2));
sleep(1);
return 0;
}
[email protected]:~/Documents/Reactor/s1.1$ g++ -std=c++11 -pthread -o test MutexLockGuard.hpp Condition.hpp Thread.hpp Thread.cpp Channel.hpp Channel.cpp EventLoop.hpp EventLoop.cpp Poller.hpp Poller.cpp EventLoopThread.hpp testEventLoopThread.cpp
[email protected]:~/Documents/Reactor/s1.1$ which valgrind
/usr/bin/valgrind
[email protected]:~/Documents/Reactor/s1.1$ /usr/bin/valgrind ./test
==22825== Memcheck, a memory error detector
==22825== Copyright (C) 2002-2017, and GNU GPL'd, by Julian Seward et al.
==22825== Using Valgrind-3.13.0 and LibVEX; rerun with -h for copyright info
==22825== Command: ./test
==22825==
Main: pid: 22825 tid: 22825
tid 22825: create a new thread
tid 22825: waiting
tid 22826: Thread::func_() started!
tid 22826: notified
tid 22825: received notification
tid 22826: start looping...
tid 22825: create a new thread
tid 22826: runInLoop...
tid 22825: waiting
tid 22827: Thread::func_() started!
tid 22827: notified
tid 22825: received notification
tid 22827: start looping...
tid 22825: This EventLoop had been created!
tid 22825: This EventLoop had been created!
tid 22827: runInLoop...
tid 22826: end looping...
tid 22827: end looping...
tid 22827: Thread end!
tid 22826: Thread end!
==22825==
==22825== HEAP SUMMARY:
==22825== in use at exit: 0 bytes in 0 blocks
==22825== total heap usage: 34 allocs, 34 frees, 75,472 bytes allocated
==22825==
==22825== All heap blocks were freed -- no leaks are possible
==22825==
==22825== For counts of detected and suppressed errors, rerun with: -v
==22825== ERROR SUMMARY: 0 errors from 0 contexts (suppressed: 0 from 0)
介面函式EventLoopThread::startLoop()建立新執行緒開始執行EventLoop::loop()並返回所屬執行緒的EventLoop物件的指標loop*,供使用者呼叫。如使用者用loop*呼叫介面函式EventLoop::runInLoop(Functor cb),使用者可在EventLoop所屬I/O執行緒內呼叫回撥函式cb()。如使用者用loop*呼叫EventLoop::loop()會失敗,因為該Reactor模式遵循“one loop per thread”。
在 moduo網路庫的reactor模式(上),執行緒同步封裝類MutexLockGuard及Condition、執行緒封裝類Thread、事件迴圈封裝類EventLoop及EventLoopThread都已完成,實現了事件迴圈框架。本文中則再添上Channel類、Poller類以及擴充套件功能後的EventLoop類,實現事件驅動和分發功能。在上一章中,我是把類的宣告和實現都放在一起,導致在編譯時出現重複定義問題。所以需要養成良好的程式設計習慣,將類的宣告放在標頭檔案.h,而將類的實現放在.c檔案。全域性變數或函式也需要規範,將定義放在.c檔案而在.h檔案使用extern關鍵字宣告即可。
Channel類
#ifndef CHANNEL_H_
#define CHANNEL_H_
#include <functional>
#include <poll.h>
/// A selectable I/O channel.
///
/// This class doesn't own the file descriptor.
/// The file descriptor could be a socket,
/// an eventfd, a timerfd, or a signalfd
class EventLoop;
class Channel //: boost::noncopyable
{
public:
typedef std::function<void()> EventCallback;
Channel(EventLoop* loop, int fdArg);
void handleEvent();
void setReadCallback(const EventCallback& cb)
{ readCallback_ = cb; }
void setWriteCallback(const EventCallback& cb)
{ writeCallback_ = cb; }
void setErrorCallback(const EventCallback& cb)
{ errorCallback_ = cb; }
int fd() const { return fd_; }
int events() const { return events_; }
void set_revents(int revt) { revents_ = revt; }
bool isNoneEvent() const { return events_ == kNoneEvent; }
void enableReading() { events_ |= kReadEvent; update(); }
// void enableWriting() { events_ |= kWriteEvent; update(); }
// void disableWriting() { events_ &= ~kWriteEvent; update(); }
// void disableAll() { events_ = kNoneEvent; update(); }
// for Poller
int index() { return index_; }
void set_index(int idx) { index_ = idx; }
EventLoop* ownerLoop() { return loop_; }
private:
void update();
static const int kNoneEvent;
static const int kReadEvent;
static const int kWriteEvent;
EventLoop* loop_;
const int fd_;
int events_;
int revents_;
int index_; // used by Poller.
EventCallback readCallback_;
EventCallback writeCallback_;
EventCallback errorCallback_;
};
#endif
#include "Channel.hpp"
#include "EventLoop.hpp"
#include <poll.h>
const int Channel::kNoneEvent = 0;
const int Channel::kReadEvent = POLLIN | POLLPRI;
const int Channel::kWriteEvent = POLLOUT;
Channel::Channel(EventLoop* loop, int fdArg)
: loop_(loop),
fd_(fdArg),
events_(0),
revents_(0),
index_(-1)
{}
void Channel::update()
{
loop_->updateChannel(this);
}
void Channel::handleEvent()
{
if (revents_ & POLLNVAL) {
//LOG_WARN << "Channel::handle_event() POLLNVAL";
}
if (revents_ & (POLLERR | POLLNVAL))
if (errorCallback_)
errorCallback_();
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
if (readCallback_)
readCallback_();
if (revents_ & POLLOUT)
if (writeCallback_)
writeCallback_();
}
Poller類
#ifndef POLLER_H_
#define POLLER_H_
#include <map>
#include <vector>
/// IO Multiplexing with poll(2).
///
/// This class doesn't own the Channel objects.
struct pollfd;
class Channel;
class EventLoop;
class Poller //: boost::noncopyable
{
public:
typedef std::vector<Channel*> ChannelList;
Poller(EventLoop* loop);
~Poller();
/// Polls the I/O events.
/// Must be called in the loop thread.
void poll(int timeoutMs, ChannelList* activeChannels);
/// Changes the interested I/O events.
/// Must be called in the loop thread.
void updateChannel(Channel* channel);
void assertInLoopThread();
private:
void fillActiveChannels(int numEvents,
ChannelList* activeChannels) const;
typedef std::vector<struct pollfd> PollFdList;
typedef std::map<int, Channel*> ChannelMap;
EventLoop* ownerLoop_;
PollFdList pollfds_;
ChannelMap channels_;
};
#endif
#include "Poller.hpp"
#include "Channel.hpp"
#include "EventLoop.hpp"
#include <assert.h>
#include <poll.h>
Poller::Poller(EventLoop* loop) : ownerLoop_(loop) {}
Poller::~Poller() {}
void Poller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::poll(&*pollfds_.begin(), pollfds_.size(), timeoutMs);
if (numEvents > 0) {
fillActiveChannels(numEvents, activeChannels);
} else if (numEvents == 0) {
} else {
}
}
void Poller::assertInLoopThread()
{
// ownerLoop_->assertInLoopThread();
}
void Poller::fillActiveChannels(int numEvents,
ChannelList* activeChannels) const
{
for (PollFdList::const_iterator pfd = pollfds_.begin();
pfd != pollfds_.end() && numEvents > 0; ++pfd)
{
if (pfd->revents > 0)
{
--numEvents;
ChannelMap::const_iterator ch = channels_.find(pfd->fd);
//assert(ch != channels_.end());
Channel* channel = ch->second;
//assert(channel->fd() == pfd->fd);
channel->set_revents(pfd->revents);
// pfd->revents = 0;
activeChannels->push_back(channel);
}
}
}
void Poller::updateChannel(Channel* channel)
{
//assertInLoopThread();
//LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0) {
// a new one, add to pollfds_
//assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;
pfd.fd = channel->fd();
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
pollfds_.push_back(pfd);
int idx = static_cast<int>(pollfds_.size())-1;
channel->set_index(idx);
channels_[pfd.fd] = channel;
} else {
// update existing one
//assert(channels_.find(channel->fd()) != channels_.end());
//assert(channels_[channel->fd()] == channel);
int idx = channel->index();
//assert(0 <= idx && idx < static_cast<int>(pollfds_.size()));
struct pollfd& pfd = pollfds_[idx];
//assert(pfd.fd == channel->fd() || pfd.fd == -1);
pfd.events = static_cast<short>(channel->events());
pfd.revents = 0;
if (channel->isNoneEvent()) {
// ignore this pollfd
pfd.fd = -1;
}
}
}
EventLoop類
#include <poll.h>
#include <unistd.h>
#include <functional>
#include <memory>
#include <vector>
#include <iostream>
#include <sys/syscall.h>
#include "Thread.hpp"
#include "Channel.hpp"
#include "MutexLockGuard.hpp"
#include "Poller.hpp"
class EventLoop{
public:
typedef std::function<void()> Functor;
EventLoop();
~EventLoop();
bool isInLoopThread() const { return threadId_==CurrentThreadtid(); }
void loop();
void quit();
void runInLoop(const Functor& cb);
/// Queues callback in the loop thread.
/// Runs after finish pooling.
/// Safe to call from other threads.
void queueInLoop(const Functor& cb);
// internal use only
void wakeup();
void updateChannel(Channel* channel);
private:
void handleRead(); // waked up
void doPendingFunctors();
typedef std::vector<Channel*> ChannelList;
bool looping_; /* atomic */
bool quit_; /* atomic */
bool callingPendingFunctors_; /* atomic */
const pid_t threadId_;
//Timestamp pollReturnTime_;
std::unique_ptr<Poller> poller_;
//std::unique_ptr<TimerQueue> timerQueue_;
int wakeupFd_;
// unlike in TimerQueue, which is an internal class,
// we don't expose Channel to client.
std::unique_ptr<Channel> wakeupChannel_;
ChannelList activeChannels_;
MutexLock mutex_;
std::vector<Functor> pendingFunctors_; // @GuardedBy mutex_
};
#endif
#include "EventLoop.hpp"
#include "Channel.hpp"
#include "Poller.hpp"
#include "Thread.hpp"
#include "MutexLockGuard.hpp"
#include <thread>
#include <poll.h>
#include <unistd.h>
#include <functional>
#include <memory>
#include <vector>
#include <iostream>
#include <sys/syscall.h>
#include <sys/eventfd.h>
const int kPollTimeMs = 10000;
static int createEventfd()
{
int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
if (evtfd < 0)
{
//LOG_SYSERR << "Failed in eventfd";
//abort();
}
return evtfd;
}
EventLoop::EventLoop()
: looping_(false),
quit_(false),
callingPendingFunctors_(false),
threadId_(CurrentThreadtid()),
poller_(new Poller(this)),
//timerQueue_(new TimerQueue(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_))
{
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
EventLoop::~EventLoop()
{
//assert(!looping_);
close(wakeupFd_);
}
void EventLoop::loop()
{
if( !isInLoopThread() ){
std::cout<<"tid "<<CurrentThreadtid()<<": This EventLoop had been created!"<<std::endl;
}else{
std::cout<<"tid "<<CurrentThreadtid()<<": start looping..."<<std::endl;
quit_=false;
while (!quit_)
{
activeChannels_.clear();
poller_->poll(kPollTimeMs, &activeChannels_);
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it){
(*it)->handleEvent();
}
doPendingFunctors();
}
std::cout<<"tid "<<CurrentThreadtid()<<": end looping..."<<std::endl;
}
}
void EventLoop::quit()
{
quit_ = true;
if (!isInLoopThread())
{
wakeup();
}
}
void EventLoop::runInLoop(const Functor& cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(cb);
}
}
void EventLoop::queueInLoop(const Functor& cb)
{
{
MutexLockGuard lock(mutex_);
pendingFunctors_.push_back(cb);
}
if (!isInLoopThread() || callingPendingFunctors_)
{
wakeup();
}
}
void EventLoop::updateChannel(Channel* channel)
{
//assert(channel->ownerLoop() == this);
//assertInLoopThread();
poller_->updateChannel(channel);
}
void EventLoop::wakeup()
{
uint64_t one = 1;
ssize_t n = write(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
//LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8";
}
}
void EventLoop::handleRead()
{
uint64_t one = 1;
ssize_t n = read(wakeupFd_, &one, sizeof one);
if (n != sizeof one)
{
//LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8";
}
}
void EventLoop::doPendingFunctors()
{
std::vector<Functor> functors;
callingPendingFunctors_ = true;
{
MutexLockGuard lock(mutex_);
functors.swap(pendingFunctors_);
}
for (size_t i = 0; i < functors.size(); ++i)
{
functors[i]();
}
callingPendingFunctors_ = false;
}
至於其他類,此處只是將在它們在上一章的程式從.h檔案分開到.h宣告和.c實現:
Thread類
#ifndef THREAD_H_
#define THREAD_H_
#include <thread>
#include <memory>
#include <functional>
#include <string>
#include <iostream>
#include <unistd.h>
#include <sys/syscall.h>
//global variable and function
extern __thread pid_t t_cachedTid;
extern pid_t gettid();
extern pid_t CurrentThreadtid();
class Thread
{
public:
//typedef void* (*ThreadFunc)(void*);
typedef std::function<void ()> ThreadFunc;
Thread(ThreadFunc func);
~Thread();
void start();
private:
ThreadFunc func_;
std::string name_;
pid_t tid_;
pthread_t tidp_;
};
#endif
#include "Thread.hpp"
#include <thread>
#include <memory>
#include <functional>
#include <string>
#include <unistd.h>
#include <iostream>
#include <sys/syscall.h>
namespace detail
{
struct ThreadData
{
typedef std::function<void ()> ThreadFunc;
ThreadFunc func_;
std::string name_;
pid_t tid_;
ThreadData(ThreadFunc func, const std::string& name, pid_t tid)
: func_(std::move(func)),
name_(name),
tid_(tid)
{ }
void runInThread()
{
tid_ = CurrentThreadtid();
std::cout<<"tid "<<CurrentThreadtid()<<": Thread::func_() started!"<<std::endl;
func_();
name_=std::string("finished");
}
};
void* startThread(void* obj)
{
ThreadData* data = static_cast<ThreadData*>(obj);
data->runInThread();
delete data;
std::cout<<"tid "<<CurrentThreadtid()<<": Thread end!"<<std::endl;
return NULL;
}
}
pid_t gettid()
{
return static_cast<pid_t>(syscall(SYS_gettid));
}
__thread pid_t t_cachedTid = 0;
pid_t CurrentThreadtid()
{
if (t_cachedTid == 0)
{
t_cachedTid = gettid();
}
return t_cachedTid;
}
Thread::Thread(ThreadFunc func) : func_(func), tidp_(0) {}
Thread::~Thread()
{
pthread_detach(tidp_);//let system itself recovers the resources or it will cause memory leak!
}
void Thread::start()
{
detail::ThreadData* data = new detail::ThreadData(func_, name_, tid_);
std::cout<<"tid "<<CurrentThreadtid()<<": create a new thread"<<std::endl;
if(pthread_create(&tidp_, NULL, &detail::startThread, data))
{
delete data;
std::cout<<"thread create error"<<std::endl;
}
}
MutexLockGuard類
#ifndef MUTEXLOCKGUARD_H_
#define MUTEXLOCKGUARD_H_
#include <pthread.h>
#include "Thread.hpp"
#include <iostream>
class MutexLock //: boost::noncopyable
{
private:
pthread_mutex_t mutex_;
MutexLock(const MutexLock&);
MutexLock& operator=(const MutexLock&);
public:
MutexLock()
{
pthread_mutex_init(&mutex_,NULL);
//std::cout<<"MutexLock create!"<<std::endl;
}
~MutexLock()
{
pthread_mutex_destroy(&mutex_);
//std::cout<<"MutexLock destroy!"<<std::endl;
}
void lock() { pthread_mutex_lock(&mutex_); }
void unlock() { pthread_mutex_unlock(&mutex_); }
pthread_mutex_t* getPthreadMutex() { return &mutex_; }
};
class MutexLockGuard //: boost::noncopyable
{
private:
MutexLock& mutex_; //???此處加&的目的是什麼
MutexLockGuard(const MutexLockGuard&);
MutexLockGuard& operator=(const MutexLockGuard&);
public:
explicit MutexLockGuard( MutexLock& mutex )
: mutex_(mutex)
{
mutex_.lock();
//std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard lock!"<<std::endl;
}
~MutexLockGuard()
{
mutex_.unlock();
//std::cout<<"tid "<<CurrentThreadtid()<<": MutexLockGuard unlock!"<<std::endl;
}
};
#define MutexLockGuard(x) static_assert(false, "missing mutex guard var name")
//C++0x中引入了static_assert這個關鍵字,用來做編譯期間的斷言,因此叫做靜態斷言。
//如果第一個引數常量表達式的值為false,會產生一條編譯錯誤,錯誤位置就是該static_assert語句所在行,第二個引數就是錯誤提示字串。
#endif // MUTEXLOCKGUARD_H_
Condition類
#ifndef CONDITION_H_
#define CONDITION_H_
#include "MutexLockGuard.hpp"
#include <pthread.h>
class Condition
{
public:
explicit Condition(MutexLock& mutex) : mutex_(mutex)
{
pthread_cond_init(&pcond_, NULL);
}
~Condition()
{
pthread_cond_destroy(&pcond_);
}
void wait()
{
pthread_cond_wait(&pcond_, mutex_.getPthreadMutex());
}
void notify()
{
pthread_cond_signal(&pcond_);
}
void notifyAll()
{
pthread_cond_broadcast(&pcond_);
}
private:
MutexLock& mutex_;
pthread_cond_t pcond_;
};
#endif
EventLoopThread類
#ifndef EVENT_LOOP_THREAD_H_
#define EVENT_LOOP_THREAD_H_
#include "EventLoop.hpp"
#include "Thread.hpp"
#include "MutexLockGuard.hpp"
#include "Condition.hpp"
#include <memory>
#include <iostream>
class EventLoopThread
{
public:
EventLoopThread()
: loop_(NULL), exiting_(false), thread_(std::bind(&EventLoopThread::ThreadFunc, this)), mutex_(), cond_(mutex_) {}
//~EventLoopThread();
EventLoop* startLoop();
private:
void ThreadFunc();
EventLoop* loop_;
bool exiting_;
Thread thread_;
MutexLock mutex_;
Condition cond_;
};
EventLoop* EventLoopThread::startLoop()
{
//assert(!thread_.started());
thread_.start();
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
std::cout<<"tid "<<CurrentThreadtid()<<": waiting"<<std::endl;
cond_.wait();
}
std::cout<<"tid "<<CurrentThreadtid()<<": received notification"<<std::endl;
}
return loop_;
}
void EventLoopThread::ThreadFunc()
{
EventLoop loop;
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();
std::cout<<"tid "<<CurrentThreadtid()<<": notified"<<std::endl;
}
loop.loop();
//assert(exiting_);
}
#endif