muduo原始碼分析:TcpConnection類
前言
前面學習了TcpServer的實現,TcpServer對每個連線都會新建一個TcpConnection(使用shared_ptr管理)。接下來學習一下TcpConnection的設計細節。
連線狀態
muduo對於一個連線的從生到死進行了狀態的定義,類似一個狀態機。
enum States { kDisconnected, kConnecting, kConnected, kDisconnecting };
分別代表:已經斷開、初始狀態、已連線、正在斷開
TcpConnection.h
class TcpConnection : boost::noncopyable, public boost::enable_shared_from_this<TcpConnection> { public: TcpConnection(EventLoop* loop, const string& name, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr); ~TcpConnection(); EventLoop* getLoop() const { return loop_; } const string& name() const { return name_; } const InetAddress& localAddress() const { return localAddr_; } const InetAddress& peerAddress() const { return peerAddr_; } bool connected() const { return state_ == kConnected; } bool disconnected() const { return state_ == kDisconnected; } // return true if success. bool getTcpInfo(struct tcp_info*) const; string getTcpInfoString() const; // void send(string&& message); //下面三個send()函式給連線傳送資料 void send(const void* message, int len); void send(const StringPiece& message); void send(Buffer* message); // this one will swap data void shutdown(); //關閉該連結寫端 void forceClose(); //強制關閉該連線 void forceCloseWithDelay(double seconds); void setTcpNoDelay(bool on); void startRead(); void stopRead(); bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop void setContext(const boost::any& context) { context_ = context; } const boost::any& getContext() const { return context_; } boost::any* getMutableContext() { return &context_; } //以下介面為設定連線對應的各種回撥: void setConnectionCallback(const ConnectionCallback& cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback& cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback& cb) { writeCompleteCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; } /// Advanced interface Buffer* inputBuffer() { return &inputBuffer_; } Buffer* outputBuffer() { return &outputBuffer_; } /// Internal use only. void setCloseCallback(const CloseCallback& cb) { closeCallback_ = cb; } void connectEstablished(); // should be called only once void connectDestroyed(); // should be called only once private: enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting }; void handleRead(Timestamp receiveTime); //處理讀事件 void handleWrite(); //處理寫事件 void handleClose(); //處理關閉事件 void handleError(); //處理錯誤事件 void sendInLoop(const StringPiece& message); void sendInLoop(const void* message, size_t len); void shutdownInLoop(); void forceCloseInLoop(); void setState(StateE s) { state_ = s; } const char* stateToString() const; void startReadInLoop(); void stopReadInLoop(); EventLoop* loop_; const string name_; StateE state_; // FIXME: use atomic variable bool reading_; // we don't expose those classes to client. boost::scoped_ptr<Socket> socket_; //連線對應的套接字 boost::scoped_ptr<Channel> channel_; //對應的事件分發器channel const InetAddress localAddr_; const InetAddress peerAddr_; //關注三個半事件 (這幾個回撥函式通過handle**那四個事件處理函式呼叫) ConnectionCallback connectionCallback_; //新連線建立回撥函式 MessageCallback messageCallback_; //訊息到達回撥函式 WriteCompleteCallback writeCompleteCallback_; //寫完畢回撥函式 HighWaterMarkCallback highWaterMarkCallback_; CloseCallback closeCallback_; //連線關閉回撥函式 size_t highWaterMark_; //輸入輸出緩衝區 Buffer inputBuffer_; Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer. boost::any context_; };
先理解上面的 loop_
, socket_
, channel_
好了,不明白請翻閱前幾篇文章。
TcpConnection::TcpConnection()
TcpConnection::TcpConnection(EventLoop* loop, //建構函式 const string& nameArg, int sockfd, const InetAddress& localAddr, const InetAddress& peerAddr) : loop_(CHECK_NOTNULL(loop)), name_(nameArg), state_(kConnecting), //初始狀態為kConnection reading_(true), socket_(new Socket(sockfd)),//RAII管理已連線套接字 channel_(new Channel(loop, sockfd)), //使用Channel管理套接字上的讀寫 localAddr_(localAddr), peerAddr_(peerAddr), highWaterMark_(64*1024*1024) { //設定事件分發器的各事件回撥 (將TcpConnection類的四個事件處理函式設定為事件分發器對應的回撥函式) channel_->setReadCallback( boost::bind(&TcpConnection::handleRead, this, _1)); channel_->setWriteCallback( boost::bind(&TcpConnection::handleWrite, this)); channel_->setCloseCallback( boost::bind(&TcpConnection::handleClose, this)); channel_->setErrorCallback( boost::bind(&TcpConnection::handleError, this)); LOG_DEBUG << "TcpConnection::ctor[" << name_ << "] at " << this << " fd=" << sockfd; socket_->setKeepAlive(true); }
建構函式在初始化列表中對socket、channel等進行了初始化,在函式體中設定了回撥函式。
TcpConnection::~TcpConnection()
TcpConnection::~TcpConnection() { LOG_DEBUG << "TcpConnection::dtor[" << name_ << "] at " << this << " fd=" << channel_->fd() << " state=" << stateToString(); assert(state_ == kDisconnected); }
TcpConnection::handleRead()
void TcpConnection::handleRead(Timestamp receiveTime) //讀事件處理,呼叫設定的messageCallback_函式
{
loop_->assertInLoopThread();
int savedErrno = 0;
ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
if (n > 0)
{
//呼叫回撥函式,使用shared_from_this()得到自身的shared_ptr,延長了該物件的生命期,保證了它的生命期長過messageCallback_函式,messageCallback_能安全的使用它。
messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
}
else if (n == 0)
{
handleClose();
}
else
{
errno = savedErrno;
LOG_SYSERR << "TcpConnection::handleRead";
handleError();
}
}
前面提到了,在已連線套接字可讀時,呼叫TcpConnection::handleRead
,進而呼叫使用者設定的回撥函式messageCallback_
TcpConnection::handleWrite()
void TcpConnection::handleWrite() //寫事件處理,呼叫設定的writeCompleteCallback函式
{
loop_->assertInLoopThread();
if (channel_->isWriting())
{
ssize_t n = sockets::write(channel_->fd(),
outputBuffer_.peek(),
outputBuffer_.readableBytes());
if (n > 0)
{
outputBuffer_.retrieve(n);
if (outputBuffer_.readableBytes() == 0)
{
channel_->disableWriting();
if (writeCompleteCallback_)
{
loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
}
if (state_ == kDisconnecting)
{
shutdownInLoop();
}
}
}
else
{
LOG_SYSERR << "TcpConnection::handleWrite";
// if (state_ == kDisconnecting)
// {
// shutdownInLoop();
// }
}
}
else
{
LOG_TRACE << "Connection fd = " << channel_->fd()
<< " is down, no more writing";
}
}
TcpConnection::handleClose()
void TcpConnection::handleClose() //連線關閉處理函式,呼叫設定的connectionCallback_和closeCallback_回撥
{
loop_->assertInLoopThread();
LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
assert(state_ == kConnected || state_ == kDisconnecting);
// we don't close fd, leave it to dtor, so we can find leaks easily.
setState(kDisconnected); //設定狀態為kDisconnected,表示已斷開
channel_->disableAll(); //移除註冊的事件,使用epoll時是EPOLL_CTL_DEL
TcpConnectionPtr guardThis(shared_from_this()); //延長本物件的生命週期,引用技術為2
//呼叫使用者回撥函式
connectionCallback_(guardThis); //引數為shared_ptr,保證了 connectionCallback_能安全的使用本物件
// 呼叫TcpServer::removeConnection
closeCallback_(guardThis);
}
連線斷開時,會呼叫TcpConnection::handleClose;接著呼叫使用者回撥connectionCallback_;最後呼叫closeCallback_,即TcpServer::removeConnection(TcpServer建立TcpConnection時設定的)
TcpServer::removeConnection()
void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}
void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
<< "] - connection " << conn->name();
// 根據conn的name,從map容器中刪除,此時引用計數會減1。erase之前引用計數為2(由前面的shared_from_this()保證),所以執行完erase,引用計數變為1
size_t n = connections_.erase(conn->name());
assert(n == 1);
// 然後呼叫conn->connectDestroyed
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop(
boost::bind(&TcpConnection::connectDestroyed, conn)); // bind延長了conn的生命期,connectDestroyed完成後,TcpConnection被析構。
// FIXME wake up ?
}
TcpServer先將該conn從map容器中刪除,因為erase之前使用了shared_from_this,所以erase之前引用計數為2,那麼erase之後引用計數將變為1。
如果沒用shared_from_this,僅僅傳遞了一個裸指標過來,erase之後引用計數變為0,那麼該TcpConnection會被析構!這意味著TcpConnection的Channel也會被析構,可是你現在正在使用該Channel啊(結合上圖看),怎麼能在使用某個物件的時候把它析構呢,這是嚴重的錯誤。所以muduo使用shared_ptr管理TcpConnection,避免了上述問題。
最後queueInLoop就是將TcpConnection::connectDestroyed函式移動到EventLoop中執行,執行位置就是在Channel->handleEvent之後,此時可以安全的析構TcpConnection。(這麼做的原因見前面)
注意上面最後的boost::bind,它讓TcpConnection的生命期長到呼叫connectDestroyed的時刻。在connectDestroyed執行完之後,TcpConnection才被析構。
TcpConnection::connectDestroyed
void TcpConnection::connectDestroyed()
{
loop_->assertInLoopThread();
if (state_ == kConnected)
{
setState(kDisconnected);
channel_->disableAll();
connectionCallback_(shared_from_this());
}
// 將EventLoop.Poller中的該channel從容器中刪除
loop_->removeChannel(get_pointer(channel_));
}
TcpConnection::connectDestroyed
是該物件析構前呼叫的最後一個成員函式,它會通知使用者連線已經斷開。