1. 程式人生 > >muduo原始碼分析:TcpConnection類

muduo原始碼分析:TcpConnection類

前言

前面學習了TcpServer的實現,TcpServer對每個連線都會新建一個TcpConnection(使用shared_ptr管理)。接下來學習一下TcpConnection的設計細節。

 

連線狀態

muduo對於一個連線的從生到死進行了狀態的定義,類似一個狀態機。

enum States { kDisconnected, kConnecting, kConnected, kDisconnecting };

分別代表:已經斷開、初始狀態、已連線、正在斷開

 

TcpConnection.h

class TcpConnection : boost::noncopyable,
                      public boost::enable_shared_from_this<TcpConnection>
{
 public:
  TcpConnection(EventLoop* loop,
                const string& name,
                int sockfd,
                const InetAddress& localAddr,
                const InetAddress& peerAddr);
  ~TcpConnection();

  EventLoop* getLoop() const { return loop_; }
  const string& name() const { return name_; }
  const InetAddress& localAddress() const { return localAddr_; }
  const InetAddress& peerAddress() const { return peerAddr_; }
  bool connected() const { return state_ == kConnected; }
  bool disconnected() const { return state_ == kDisconnected; }
  // return true if success.
  bool getTcpInfo(struct tcp_info*) const;
  string getTcpInfoString() const;

  // void send(string&& message);	//下面三個send()函式給連線傳送資料
  void send(const void* message, int len);
  void send(const StringPiece& message);
  void send(Buffer* message);  // this one will swap data
  
  void shutdown();				//關閉該連結寫端
  
  void forceClose();				//強制關閉該連線
  void forceCloseWithDelay(double seconds);
  
  void setTcpNoDelay(bool on);
  void startRead();
  void stopRead();
  bool isReading() const { return reading_; }; // NOT thread safe, may race with start/stopReadInLoop

  void setContext(const boost::any& context)
  { context_ = context; }

  const boost::any& getContext() const
  { return context_; }

  boost::any* getMutableContext()
  { return &context_; }

//以下介面為設定連線對應的各種回撥:
  void setConnectionCallback(const ConnectionCallback& cb)
  { connectionCallback_ = cb; }

  void setMessageCallback(const MessageCallback& cb)
  { messageCallback_ = cb; }

  void setWriteCompleteCallback(const WriteCompleteCallback& cb)
  { writeCompleteCallback_ = cb; }

  void setHighWaterMarkCallback(const HighWaterMarkCallback& cb, size_t highWaterMark)
  { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }

  /// Advanced interface
  Buffer* inputBuffer()
  { return &inputBuffer_; }

  Buffer* outputBuffer()
  { return &outputBuffer_; }

  /// Internal use only.
  void setCloseCallback(const CloseCallback& cb)
  { closeCallback_ = cb; }

  void connectEstablished();   // should be called only once

  void connectDestroyed();  // should be called only once

 private:
  enum StateE { kDisconnected, kConnecting, kConnected, kDisconnecting };
  void handleRead(Timestamp receiveTime);		//處理讀事件
  void handleWrite();					//處理寫事件
  void handleClose();					//處理關閉事件
  void handleError();					//處理錯誤事件

  void sendInLoop(const StringPiece& message);
  void sendInLoop(const void* message, size_t len);
  void shutdownInLoop();

  void forceCloseInLoop();
  void setState(StateE s) { state_ = s; }
  const char* stateToString() const;
  void startReadInLoop();
  void stopReadInLoop();

  EventLoop* loop_;
  const string name_;
  StateE state_;  // FIXME: use atomic variable
  bool reading_;
  // we don't expose those classes to client.
  boost::scoped_ptr<Socket> socket_;			//連線對應的套接字	
  boost::scoped_ptr<Channel> channel_;			//對應的事件分發器channel
  const InetAddress localAddr_;
  const InetAddress peerAddr_;

  //關注三個半事件  (這幾個回撥函式通過handle**那四個事件處理函式呼叫)
  ConnectionCallback connectionCallback_;		//新連線建立回撥函式
  MessageCallback messageCallback_;			//訊息到達回撥函式
  WriteCompleteCallback writeCompleteCallback_;	        //寫完畢回撥函式
  HighWaterMarkCallback highWaterMarkCallback_;
  CloseCallback closeCallback_;				//連線關閉回撥函式
  size_t highWaterMark_;

  //輸入輸出緩衝區
  Buffer inputBuffer_;	
  Buffer outputBuffer_; // FIXME: use list<Buffer> as output buffer.
  boost::any context_;

};

先理解上面的 loop_socket_, channel_好了,不明白請翻閱前幾篇文章。

 

TcpConnection::TcpConnection()

TcpConnection::TcpConnection(EventLoop* loop,			//建構函式
                             const string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(nameArg),
    state_(kConnecting),		//初始狀態為kConnection
    reading_(true),		
    socket_(new Socket(sockfd)),//RAII管理已連線套接字
    channel_(new Channel(loop, sockfd)),	//使用Channel管理套接字上的讀寫
    localAddr_(localAddr),
    peerAddr_(peerAddr),
    highWaterMark_(64*1024*1024)
{
	//設定事件分發器的各事件回撥  (將TcpConnection類的四個事件處理函式設定為事件分發器對應的回撥函式)
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this, _1));
  channel_->setWriteCallback(
      boost::bind(&TcpConnection::handleWrite, this));
  channel_->setCloseCallback(
      boost::bind(&TcpConnection::handleClose, this));
  channel_->setErrorCallback(
      boost::bind(&TcpConnection::handleError, this));
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  socket_->setKeepAlive(true);
}

建構函式在初始化列表中對socket、channel等進行了初始化,在函式體中設定了回撥函式。

 

TcpConnection::~TcpConnection()

TcpConnection::~TcpConnection()
{
  LOG_DEBUG << "TcpConnection::dtor[" <<  name_ << "] at " << this
            << " fd=" << channel_->fd()
            << " state=" << stateToString();
  assert(state_ == kDisconnected);
}

 

TcpConnection::handleRead()

void TcpConnection::handleRead(Timestamp receiveTime)		//讀事件處理,呼叫設定的messageCallback_函式
{
  loop_->assertInLoopThread();
  int savedErrno = 0;
  ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno);
  if (n > 0)
  {
	//呼叫回撥函式,使用shared_from_this()得到自身的shared_ptr,延長了該物件的生命期,保證了它的生命期長過messageCallback_函式,messageCallback_能安全的使用它。
    messageCallback_(shared_from_this(), &inputBuffer_, receiveTime);
  }
  else if (n == 0)
  {
    handleClose();
  }
  else
  {
    errno = savedErrno;
    LOG_SYSERR << "TcpConnection::handleRead";
    handleError();
  }
}

前面提到了,在已連線套接字可讀時,呼叫TcpConnection::handleRead,進而呼叫使用者設定的回撥函式messageCallback_


 

TcpConnection::handleWrite()

void TcpConnection::handleWrite()			//寫事件處理,呼叫設定的writeCompleteCallback函式
{
  loop_->assertInLoopThread();
  if (channel_->isWriting())
  {
    ssize_t n = sockets::write(channel_->fd(),
                               outputBuffer_.peek(),
                               outputBuffer_.readableBytes());
    if (n > 0)
    {
      outputBuffer_.retrieve(n);
      if (outputBuffer_.readableBytes() == 0)
      {
        channel_->disableWriting();
        if (writeCompleteCallback_)
        {
          loop_->queueInLoop(boost::bind(writeCompleteCallback_, shared_from_this()));
        }
        if (state_ == kDisconnecting)
        {
          shutdownInLoop();
        }
      }
    }
    else
    {
      LOG_SYSERR << "TcpConnection::handleWrite";
      // if (state_ == kDisconnecting)
      // {
      //   shutdownInLoop();
      // }
    }
  }
  else
  {
    LOG_TRACE << "Connection fd = " << channel_->fd()
              << " is down, no more writing";
  }
}

 

TcpConnection::handleClose()

void TcpConnection::handleClose()				//連線關閉處理函式,呼叫設定的connectionCallback_和closeCallback_回撥
{
  loop_->assertInLoopThread();
  LOG_TRACE << "fd = " << channel_->fd() << " state = " << stateToString();
  assert(state_ == kConnected || state_ == kDisconnecting);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  setState(kDisconnected);		//設定狀態為kDisconnected,表示已斷開
  channel_->disableAll();		//移除註冊的事件,使用epoll時是EPOLL_CTL_DEL

  TcpConnectionPtr guardThis(shared_from_this());	//延長本物件的生命週期,引用技術為2
  
  //呼叫使用者回撥函式
  connectionCallback_(guardThis);	//引數為shared_ptr,保證了 connectionCallback_能安全的使用本物件
  
  // 呼叫TcpServer::removeConnection
  closeCallback_(guardThis);
}

連線斷開時,會呼叫TcpConnection::handleClose;接著呼叫使用者回撥connectionCallback_;最後呼叫closeCallback_,即TcpServer::removeConnection(TcpServer建立TcpConnection時設定的)


 

TcpServer::removeConnection()

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();

  // 根據conn的name,從map容器中刪除,此時引用計數會減1。erase之前引用計數為2(由前面的shared_from_this()保證),所以執行完erase,引用計數變為1
  size_t n = connections_.erase(conn->name());
  assert(n == 1);

  // 然後呼叫conn->connectDestroyed
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn)); // bind延長了conn的生命期,connectDestroyed完成後,TcpConnection被析構。
  // FIXME wake up ?
}

TcpServer先將該conn從map容器中刪除,因為erase之前使用了shared_from_this,所以erase之前引用計數為2,那麼erase之後引用計數將變為1。 
如果沒用shared_from_this,僅僅傳遞了一個裸指標過來,erase之後引用計數變為0,那麼該TcpConnection會被析構!這意味著TcpConnection的Channel也會被析構,可是你現在正在使用該Channel啊(結合上圖看),怎麼能在使用某個物件的時候把它析構呢,這是嚴重的錯誤。所以muduo使用shared_ptr管理TcpConnection,避免了上述問題。

最後queueInLoop就是將TcpConnection::connectDestroyed函式移動到EventLoop中執行,執行位置就是在Channel->handleEvent之後,此時可以安全的析構TcpConnection。(這麼做的原因見前面)

注意上面最後的boost::bind,它讓TcpConnection的生命期長到呼叫connectDestroyed的時刻。在connectDestroyed執行完之後,TcpConnection才被析構。

 

TcpConnection::connectDestroyed

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  if (state_ == kConnected)
  {
    setState(kDisconnected);
    channel_->disableAll();

    connectionCallback_(shared_from_this());
  }

  // 將EventLoop.Poller中的該channel從容器中刪除
  loop_->removeChannel(get_pointer(channel_));
}

TcpConnection::connectDestroyed是該物件析構前呼叫的最後一個成員函式,它會通知使用者連線已經斷開。