1. 程式人生 > >muduo庫中TcpServer一次完整的工作流程

muduo庫中TcpServer一次完整的工作流程

函數 sep sock accep mes pin map all some

模擬單線程情況下muduo庫的工作情況

muduo的源代碼對於一個初學者來說還是有一些復雜的,其中有很多的回調函數以及交叉的組件,下面我將追蹤一次TCP連接過程中發生的事情,不會出現用戶態的源碼,都是庫內部的運行機制。下文筆者將描述一次連接發生的過程,將Channel到加入到loop循環為止。

監聽套接字加入loop循環的完整過程

  • 首先創建一個TcpServer對象,在的創建過程中,首先new出來自己的核心組件(Acceptor,loop,connectionMap,threadPool)之後TcpServer會向Acceptor註冊一個新連接到來時的Connection回調函數。loop是由用戶提供的,並且在最後向Acceptor註冊一個回調對象,用於處理:一個新的Client連接到來時該怎麽處理。
    TcpServer向Acceptor註冊的回調代碼主要作用是:當一個新的連接到來時,根據Acceptor創建的可連接描述符和客戶的地址,創建一個Connection對象,並且將這個對象加入到TcpServer的ConnectionMap中,由TcpServer來管理上述新建con對象。但是現在監聽套接字的事件分發對象Channel還沒有加入loop,就先不多提這個新的連接到到來時的處理過程。
    ```
    TcpServer::TcpServer(EventLoop* loop,const InetAddress& listenAddr,const string& nameArg,Option option)
    : loop_(CHECK_NOTNULL(loop)),
    ipPort_(listenAddr.toIpPort()),name_(nameArg),acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
    {//上面的loop是用戶提供的loop
    acceptor_->setNewConnectionCallback(
    boost::bind(&TcpServer::newConnection, this, _1, _2));//註冊給acceptor的回調
    }//將在Acceptor接受新連接的時候

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{//將本函數註冊個acceptor
loop_->assertInLoopThread();//斷言是否在IO線程
EventLoop* ioLoop = threadPool_->getNextLoop();//獲得線程池中的一個loop
char buf[64];//獲得線程池map中的string索引
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));//獲得本地的地址,用於構建Connection
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));//構建了一個connection
connections_[connName] = conn;//將新構建的con加入server的map中
conn->setConnectionCallback(connectionCallback_);//muduo默認的
conn->setMessageCallback(messageCallback_);//moduo默認的
conn->setWriteCompleteCallback(writeCompleteCallback_);//??
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//在某個線程池的loop中加入這個con
}

- 下面接著講述在TcpServer的構造過程中發生的事情:創建Acceptor對象。TcpServer用unique_ptr持有唯一的指向Acceptor的指針。Acceptor的構造函數完成了一些常見的選項。最後的一個向Acceptor->Channel註冊一個回調函數,用於處理:listening可讀時(新的連接到來),該怎麽辦?答案是:當新的連接到來時,創建一個已連接描述符,然後調用TcpServe註冊給Acceptor的回調函數,用於處理新的連接。

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
boost::bind(&Acceptor::handleRead, this));//Channel設置回調,當sockfd可讀時掉用設置的回調
}

void Acceptor::handleRead()
{
loop_->assertInLoopThread();//判斷是否在IO線程
InetAddress peerAddr;//客戶的地址
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);//獲得連接的描述符
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);//TcpServer註冊的,創建新的con,並且加入TcpServer的ConnectionMap中。
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can‘t" in libev‘s doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}

- 在上述Acceptor對象的創建過程中,Acceptor會創建一個用於處理監聽套接字事件的Channel對象,以下Acceptor的Channel對象的創造過程,很常規的處理過程。

Channel::Channel(EventLoop* loop, int fd_)
: loop
(loop),
fd_(fd_),
events
(0),
revents_(0),
index_(-1),
logHup_(true),
tied_(false),
eventHandling_(false),
addedToLoop_(false)
{
}

- 到此,在muduo庫內部的初始化過程已經基本處理完畢,然後由用戶調用TcpServer的setThreadNum()和start()函數。在start()函數中會將打開Acceptor對象linten套接字。

void TcpServer::setThreadNum(int numThreads)
{//設置線程池的開始數目
assert(0 <= numThreads);
threadPool_->setThreadNum(numThreads);
}

void TcpServer::start()
{//TcpServer開始工作
if (started_.getAndSet(1) == 0)//獲得原子計數
{
threadPool_->start(threadInitCallback_);//線程池開始工作

assert(!acceptor_->listenning());//打開accepor的監聽狀態
loop_->runInLoop(
    boost::bind(&Acceptor::listen, get_pointer(acceptor_)));//打開acceptor的listening

}
}

- 打開Acceptor對象的listenfd的詳細過程。

void Acceptor::listen()
{
loop_->assertInLoopThread();//判斷是否在IO線程
listenning_ = true;//進入監聽模式
acceptSocket_.listen();
acceptChannel_.enableReading();//讓監聽字的channel關註可讀事件
}

- 接著使用了Channel對象中的的enableReading()函數,讓這個Channel對象關註可讀事件。關鍵在於更新過程,應該是這個流程中最重要的操作。

void enableReading() { events_ |= kReadEvent; update(); }//將關註的事件變為可讀,然後更新

- 使用了Channel的更新函數:update()

void Channel::update()
{
addedToLoop_ = true;//更新channel的狀態
loop_->updateChannel(this);//調用POLLER的更新功能
}

- EventLoop持有唯一的Poller,也就是說,這個Poller將負責最後的更新過程。如果是新的Channel對象,則在Poller的pollfd數組中增加席位;如果不是新的Channel對象,則更新它目前所發生的事件(將目前發生的事件設置為0)。

void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);//判斷channel的LOOP是否是當前的LOOP
assertInLoopThread();//判斷是否在IO線程
poller_->updateChannel(channel);//使用POLLER來更新channel
}

- 緊接著使用了Poller的updateChannel函數

void PollPoller::updateChannel(Channel* channel)
{//將channel關註的事件與pollfd同步
Poller::assertInLoopThread();//如果不再loop線程直接退出
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)//獲得channel在map中的位置
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;//新建一個pfd與channel相關聯
pfd.fd = channel->fd();
pfd.events = static_cast

- 至此,調用EventLoop的loop函數,進行loop循環,開始處理事件。

void EventLoop::loop()
{
assert(!looping_);//判斷是否在LOOPING
assertInLoopThread();//判斷這個函數在LOOP線程調用
looping_ = true;//進入LOOPING狀態
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
LOG_TRACE << "EventLoop " << this << " start looping";

while (!quit_)
{
activeChannels_.clear();//將活動線程隊列置空
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);//獲得活動文件描述符的數量,並且獲得活動的channel隊列
++iteration_;//增加Poll次數
if (Logger::logLevel() <= Logger::TRACE)
{
printActiveChannels();
}
// TODO sort channel by priority
eventHandling_ = true;//事件處理狀態
for (ChannelList::iterator it = activeChannels_.begin();
it != activeChannels_.end(); ++it)
{
currentActiveChannel_ = *it;//獲得當前活動的事件
currentActiveChannel_->handleEvent(pollReturnTime_);//處理事件,傳遞一個poll的阻塞時間
}
currentActiveChannel_ = NULL;//將當前活動事件置為空
eventHandling_ = false;//退出事件處理狀態
doPendingFunctors();//處理用戶在其他線程註冊給IO線程的事件
}

LOG_TRACE << "EventLoop " << this << " stop looping";
looping_ = false;//推出LOOPING狀態
}

一個監聽套接字已經進入循環,如果此時一個新的連接到來又會發生什麽事情呢?
###一個新連接到達時的處理過程。
- 此時在loop循環中的監聽套接字變得可讀,然後便調用一個可讀事件的處理對象。首先調用Acceptor註冊的handleRead對象,完成連接套接字的創建,其次在handleRead對象的內部調用TcpServer註冊給Acceptor的函數對象,用於將新建con對象加入TcpServer的ConnectionMap中去。 

void Channel::handleEvent(Timestamp receiveTime)
{
boost::shared_ptr

void Channel::handleEventWithGuard(Timestamp receiveTime)
{//真正的處理各種事件
eventHandling_ = true;//處理事件狀態
LOG_TRACE << reventsToString();
if ((revents_ & POLLHUP) && !(revents_ & POLLIN))
{
if (logHup_)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLHUP";
}
if (closeCallback_) closeCallback_();
}

if (revents_ & POLLNVAL)
{
LOG_WARN << "fd = " << fd_ << " Channel::handle_event() POLLNVAL";
}

if (revents_ & (POLLERR | POLLNVAL))
{
if (errorCallback_) errorCallback_();
}
if (revents_ & (POLLIN | POLLPRI | POLLRDHUP))
{
if (readCallback_) readCallback_(receiveTime);
}
if (revents_ & POLLOUT)
{
if (writeCallback_) writeCallback_();
}
eventHandling_ = false;
}

- 此時,監聽套接字處理的時可讀事件,調用之前由Acceptor註冊的handleRead回調函數

void Acceptor::handleRead()
{
loop_->assertInLoopThread();//判斷是否在IO線程
InetAddress peerAddr;//客戶的地址
//FIXME loop until no more
int connfd = acceptSocket_.accept(&peerAddr);//獲得連接的描述符
if (connfd >= 0)
{
// string hostport = peerAddr.toIpPort();
// LOG_TRACE << "Accepts of " << hostport;
if (newConnectionCallback_)
{
newConnectionCallback_(connfd, peerAddr);//這是個關鍵步驟,重點在於這個回調是誰註冊的
}
else
{
sockets::close(connfd);
}
}
else
{
LOG_SYSERR << "in Acceptor::handleRead";
// Read the section named "The special problem of
// accept()ing when you can‘t" in libev‘s doc.
// By Marc Lehmann, author of libev.
if (errno == EMFILE)
{
::close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL);
::close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}

- 在上述函數中又調用,由TcpServer註冊給Acceptor的回調函數

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{//將本函數註冊個acceptor
loop_->assertInLoopThread();//斷言是否在IO線程
EventLoop* ioLoop = threadPool_->getNextLoop();//獲得線程池中的一個loop
char buf[64];//獲得線程池map中的string索引
snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);
++nextConnId_;
string connName = name_ + buf;

LOG_INFO << "TcpServer::newConnection [" << name_
<< "] - new connection [" << connName
<< "] from " << peerAddr.toIpPort();
InetAddress localAddr(sockets::getLocalAddr(sockfd));//獲得本地的地址,用於構建Connection
// FIXME poll with zero timeout to double confirm the new connection
// FIXME use make_shared if necessary
TcpConnectionPtr conn(new TcpConnection(ioLoop,
connName,
sockfd,
localAddr,
peerAddr));//構建了一個connection
connections_[connName] = conn;//將新構建的con加入server的map中
conn->setConnectionCallback(connectionCallback_);//muduo默認的
conn->setMessageCallback(messageCallback_);//moduo默認的
conn->setWriteCompleteCallback(writeCompleteCallback_);//??
conn->setCloseCallback(
boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));//在某個線程池的loop中加入這個con
}

- 上述對象的最後一行,是調用新建的TcpConnection對象的函數,用設置新建的con對象中的channel的關註事件。

void TcpConnection::connectEstablished()
{//建立連接
loop_->assertInLoopThread();//斷言是否在IO線程
assert(state_ == kConnecting);//正處於連接建立過程
setState(kConnected);
channel_->tie(shared_from_this());//使channel的tie的指向不為空
channel_->enableReading();//將connection設置為可讀的

connectionCallback_(shared_from_this());//用戶提供的回調函數,muduo有提供默認的
}

- 至此以後的過程與將listen->channel添加到loop中的過程一樣。

void enableReading() { events_ |= kReadEvent; update(); }//將關註的事件變為可讀,然後更新

- 使用了Channel的更新函數:update()

void Channel::update()
{
addedToLoop_ = true;//更新channel的狀態
loop_->updateChannel(this);//調用POLLER的更新功能
}

- 使用了EventLoop的updateChannel()功能

void EventLoop::updateChannel(Channel* channel)
{
assert(channel->ownerLoop() == this);//判斷channel的LOOP是否是當前的LOOP
assertInLoopThread();//判斷是否在IO線程
poller_->updateChannel(channel);//使用POLLER來更新channel
}

- 在poller中更新channel

void PollPoller::updateChannel(Channel* channel)
{//將channel關註的事件與pollfd同步
Poller::assertInLoopThread();//如果不再loop線程直接退出
LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events();
if (channel->index() < 0)//獲得channel在map中的位置
{
// a new one, add to pollfds_
assert(channels_.find(channel->fd()) == channels_.end());
struct pollfd pfd;//新建一個pfd與channel相關聯
pfd.fd = channel->fd();
pfd.events = static_cast

最後一個連接的channel加入loop循環,新的循環已經開始了。
##模擬單線程情況下muduo庫的工作情況
在上篇中,筆者追蹤了Connetfd(連接套接字)和Listenfd(監聽套接字)的Channel對象加入到loop循環的過程。其中包括了網絡連接過程中,muduo會創建的對象。本文將會追蹤Connetfd(連接套接字)和Listenfd(監聽套接字)從loop循環退出並且銷毀,一直到main函數終止的過程。
###連接套接字正常情況下完整的銷毀情況(read == 0)
由TcpConnection對象向自己所擁有的Channel對象註冊的可讀事件結束時,會出現` read == 0 `的情況,此時會直接調用TcpConnection對象的handleClose函數。因為在向Channel對象註冊可讀事件時,使用了如下的語句:

channel_->setReadCallback(&TcpConnection::handleRead,this);

**this**使得Channel對象可以直接在TcpConnection向它註冊的handleClose函數內部使用TcpConnetion的函數。

void TcpConnection::handleRead(Timestamp receiveTime)
{//都是向channel註冊的函數
loop_->assertInLoopThread();//斷言在loop線程
int savedErrno = 0;//在讀取數據之後調用用戶提供的回調函數
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{//這個應該時用戶提供的處理信息的回調函數
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{//讀到了0,直接關閉
handleClose();
}
else
{//如果有錯誤
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();//處理關閉
}
}
void TcpConnection::handleClose()
{//處理關閉事件
loop_->assertInLoopThread();//斷言是否在loop線程
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don‘t close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected);//設置關閉狀態
channel_->disableAll();//不再關註任何事情

TcpConnectionPtr guardThis(shared_from_this());//獲得shared_ptr交由tcpsever處理
connectionCallback_(guardThis);//這他媽就是記錄一點日誌
// must be the last line
closeCallback_(guardThis);
}

在以上的handleClose代碼中,首先會設置TcpConnection對象的關閉狀態,其次讓自己Channel對象不再關註任何事情。
因為TcpConnection在創建時使用了如下語句:

class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this

便可以使用shared_fron_this()獲得指向本TcpConnection對象的shared_ptr指針,然後在後續的過程中,對指向本對象的
shared_ptr進行操作,則可以安全的將本對象從其他依賴類中安全的移除。
繼續跟蹤上述的最後一句,且closeCallback是由TcpServer在創建TcpConnection對象時向它註冊的:

conn->setCloseCallback(boost::bind(&TcpServer::removeConnection, this, _1));

目的在於在TcpServer的TcpconnectionMap中移除指向指向這個TcpConnection對象的指針。

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
// FIXME: unsafe
loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));//註冊到loop線程中移除這個con
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();//斷言是否在IO線程
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
size_t n = connections_.erase(conn->name());//刪除該con
(void)n;
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();//獲得線程Loop
ioLoop->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn));//將線程銷毀動作添加到loop中去
}

目前的步驟還在於處理TcpConnection對象。

void TcpConnection::connectDestroyed()
{//銷毀連接
loop_->assertInLoopThread();//斷言是否在loop線程
if (state_ == kConnected)//如果此時處於連接狀態
{
setState(kDisconnected);//將狀態設置為不可連接狀態
channel_->disableAll();//channel不再關註任何事件

connectionCallback_(shared_from_this());//記錄作用,好坑的一個作用

}
channel_->remove();//在poller中移除channel
}

TcpConnection對象的聲明周期隨著將Channel對象移除出loop循環而結束。

void Channel::remove()
{//將channel從loop中移除
assert(isNoneEvent());//判斷此時的channel是否沒有事件發生
addedToLoop_ = false;//此時沒有loop擁有此channel
loop_->removeChannel(this);//調用POLLER的刪除功能
}

因為EventLoop對象中的poller對象也持有Channel對象的指針,所以需要將channel對象安全的從poller對象中移除。

void EventLoop::removeChannel(Channel* channel)
{//每次間接的調用的作用就是將需要改動的東西與當前調用的類撇清關系
assert(channel->ownerLoop() == this);
assertInLoopThread();//如果沒有在loop線程調用直接退出
if (eventHandling_)//判斷是否在事件處理狀態。判斷當前是否在處理這個將要刪除的事件以及活動的事件表中是否有這個事件
{
assert(currentActiveChannel_ == channel ||
std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end());
}
poller_->removeChannel(channel);//在POLLER中刪除這個事件分發表
}

以下時poller對象移除Channel對象的具體操作步驟。

void PollPoller::removeChannel(Channel* channel)
{
Poller::assertInLoopThread();//判斷是否在IO線程
LOG_TRACE << "fd = " << channel->fd();
assert(channels_.find(channel->fd()) != channels_.end());
assert(channels_[channel->fd()] == channel);
assert(channel->isNoneEvent());
int idx = channel->index();//獲得pfd位置的索引
assert(0 <= idx && idx < static_cast

以上便是一個連接的銷毀過程,現在依然讓人迷惑的時Channel對象到底被誰持有過?以及TcpConnection對象的生命周期到底在什麽時候結束?
###Channel與TcpConnection對象的創建與銷毀
####創建
下面,在讓我們進入上一篇文章,具體的看看Channel對象的生命期到底是個什麽樣子?
- 當新連接過來時,由TcpServer創建一個TcpConnection對象,這個對象中包括一個與此連接相關的Channel對象。
- 然後緊接著TcpServer使用創建的TcpConnection對象向Loop中註冊事件。此時的控制權回到TcpConnection對象手中。它操作自己的Channel對象更新EventLoop。
- 最後由EventLoop對象去操作自己的Poller更新Poller的Channel隊列。
在上述過程中,Channel對象的創建操作有這樣的順序:

TcpServer->TcpConnection->Channel->EventLoop->Poller

TcpConnection對象的創建過程相比於Channel簡單的多:

TcpServer->TcpConnection

在TcpServer中創建Connection對象,然後讓TcpConnection對象去操作自己的Channel對象,將Channel加入到EventLoop中去,最後由EventLoop操作自己的Poller收尾。總而言之,Channel對象在整個過程中只由Poller和TcpConnection對象持有,銷毀時也應該是如此過程。
####銷毀
由於Channel是TcpConnection對象的一部分,所以Channel的生命周期一定會比TcpConnection的短。
Channel與TcpConnection對象的銷毀基本與上述創建過程相同:

TcpConnection->TcpServer->Channel->EventLoop->Poller
```
隨著,Channel對象從Poller中移除,TcpConnection的生命周期也隨之結束。
TcpConnection對象在整個生命周期中只由TcpServer持有,但是TcpConnetion對象中的Channel又由Poller持有,Poller又是EventLoop的唯一成員,所以造成了如此麻煩的清理與創建過程。那如果能將Channel移出TcpConnection對象,那muduo的創建與清理工作會不會輕松很多?

muduo庫中TcpServer一次完整的工作流程