1. 程式人生 > >muduo原始碼分析:TcpServer類

muduo原始碼分析:TcpServer類

上篇博文學習了Acceptor class 的實現,它僅僅是對Channel和Socket的簡單封裝,對使用者來說簡單易用。這得益於底層架構Reactor。接下來,開始學習muduo對於建立連線的處理。這屬於muduo提到的三個半事件中的第一個。可以想一下,TcpServer class應該也是對Acceptor,Poller的封裝。
 

連線處理過程

首先TcpServer通過Acceptor向Poller註冊了一個Channel,該Channel關注acceptSocket的readable事件,並設定了回撥函式Acceptor::newConnectionCallback 

為 TcpServer::newConnection 。

    acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));	//繫結Acceptor::newConnectionCallback回撥函式

然後,當有client連線時,Poller返回該Channel,接著呼叫該Channel::handleEvent–>handleRead。在Acceptor中accept該連線,然後呼叫設定好的 Acceptor::newConnectionCallback

,即 TcpServer::newConnection 。

接著,對於每個連線,TcpServer會建立一個TcpConnnection來管理。TcpConnection是最為複雜的一個class,使用shared_ptr管理,因為它的生命週期比較模糊,這一點後面再分析。

最後,會呼叫TcpConnnection::connectEstablish,它會回撥使用者設定好的回撥函式 connectionCallback。 

(類與類之間通過回撥函式聯絡在了一起)

 

TcpServer.h

class TcpServer : boost::noncopyable
{
 public:
  typedef boost::function<void(EventLoop*)> ThreadInitCallback;
  enum Option
  {
    kNoReusePort,
    kReusePort,
  };

  //TcpServer(EventLoop* loop, const InetAddress& listenAddr);
  TcpServer(EventLoop* loop,				//建構函式
            const InetAddress& listenAddr,
            const string& nameArg,
            Option option = kNoReusePort);
  ~TcpServer();  // force out-line dtor, for scoped_ptr members.

  const string& ipPort() const { return ipPort_; }
  const string& name() const { return name_; }
  EventLoop* getLoop() const { return loop_; }


  void setThreadNum(int numThreads);			//設定server中需要執行多少個Loop執行緒
  void setThreadInitCallback(const ThreadInitCallback& cb)	
  { threadInitCallback_ = cb; }
  /// valid after calling start()
  boost::shared_ptr<EventLoopThreadPool> threadPool()
  { return threadPool_; }

  void start();			//啟動該TcpServer架構

  void setConnectionCallback(const ConnectionCallback& cb)	//設定新連接回調
  { connectionCallback_ = cb; }

  void setMessageCallback(const MessageCallback& cb)		//設定訊息回撥
  { messageCallback_ = cb; }

  void setWriteCompleteCallback(const WriteCompleteCallback& cb)	//設定寫完成回撥
  { writeCompleteCallback_ = cb; }

 private:

  void newConnection(int sockfd, const InetAddress& peerAddr);	//被設定為Acceptor::newConnectionCallback()回撥函式
 
  void removeConnection(const TcpConnectionPtr& conn);

  void removeConnectionInLoop(const TcpConnectionPtr& conn);

  typedef std::map<string, TcpConnectionPtr> ConnectionMap;	//使用map關聯容器維護一個連線列表

  EventLoop* loop_;  // the acceptor loop
  const string ipPort_;	//埠號
  const string name_;	//名字
  boost::scoped_ptr<Acceptor> acceptor_;				//用於接受連線的Acceptor
  boost::shared_ptr<EventLoopThreadPool> threadPool_;		
  ConnectionCallback connectionCallback_;				//新連接回調
  MessageCallback messageCallback_;					//訊息回撥
  WriteCompleteCallback writeCompleteCallback_;				//寫完成回撥
  ThreadInitCallback threadInitCallback_;
  AtomicInt32 started_;			//啟動標記
  // always in loop thread
  int nextConnId_;			//下一個連線ID
  ConnectionMap connections_;	        //連線列表
};

幾個重要成員:

boost::scoped_ptr<Acceptor> acceptor_; 這是上篇文章分析的用於接收連線的class,只在TcpServer內部使用,因此使用scoped_ptr管理

EventLoop* loop_; Reactor的關鍵class

map<string, TcpConnectionPtr> connections_; 管理TcpConnection的容器,確切的講應該是TcpServer通過shared_ptr管理TcpConnection(即TcpConnectionPtr),主要是因為TcpConnection擁有模糊的生命週期。muduo網路庫的使用這也會使用TcpConnectionPtr作為引數。每個連線有一個唯一的名字,在建立時生成。
 


TcpServer::TcpServer()

TcpServer::TcpServer(EventLoop* loop,
                     const InetAddress& listenAddr,
                     const string& nameArg,
                     Option option)
  : loop_(CHECK_NOTNULL(loop)),
    ipPort_(listenAddr.toIpPort()),
    name_(nameArg),
    acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
    threadPool_(new EventLoopThreadPool(loop, name_)),
    connectionCallback_(defaultConnectionCallback),
    messageCallback_(defaultMessageCallback),
    nextConnId_(1)
{
  acceptor_->setNewConnectionCallback(boost::bind(&TcpServer::newConnection, this, _1, _2));	//繫結newConnectionCallback回撥函式
}

 

TcpServer::~TcpServer()

TcpServer::~TcpServer()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpServer::~TcpServer [" << name_ << "] destructing";

  for (ConnectionMap::iterator it(connections_.begin());
      it != connections_.end(); ++it)
  {
    TcpConnectionPtr conn(it->second);
    it->second.reset();
    conn->getLoop()->runInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
  }
}

 

TcpServer::newConnection

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)	//新連線處理函式
{
  loop_->assertInLoopThread();
  EventLoop* ioLoop = threadPool_->getNextLoop();		//輪詢呼叫執行緒池的EventLoop迴圈
  char buf[64];
  snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_);    //生成唯一的name
  ++nextConnId_;i			//++之後就是下一個連線id
  string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toIpPort();
  InetAddress localAddr(sockets::getLocalAddr(sockfd));	//構造本地地址
  // FIXME poll with zero timeout to double confirm the new connection
  // FIXME use make_shared if necessary
  TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr));	//構造新的TcpConnection,將獲取的EventLoop的地址傳給新連線物件。
  
  connections_[connName] = conn;		//將該TcpConnection加入到TcpServer的map容器中
  
  //設定TcpConnection三個半事件回撥函式,將使用者給TcpServer設定的回撥傳遞給TCPConnection
  conn->setConnectionCallback(connectionCallback_);
  conn->setMessageCallback(messageCallback_);
  conn->setWriteCompleteCallback(writeCompleteCallback_);
  conn->setCloseCallback(
      boost::bind(&TcpServer::removeConnection, this, _1)); // FIXME: unsafe
  
  //呼叫conn->connectEstablished()
  ioLoop->runInLoop(boost::bind(&TcpConnection::connectEstablished, conn));
}

TcpServer::removeConnection()

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  // FIXME: unsafe
  loop_->runInLoop(boost::bind(&TcpServer::removeConnectionInLoop, this, conn));
}

void TcpServer::removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnectionInLoop [" << name_
           << "] - connection " << conn->name();
  size_t n = connections_.erase(conn->name());		//將該TcpConnection從map容器中刪除
  (void)n;
  assert(n == 1);
  EventLoop* ioLoop = conn->getLoop();
  ioLoop->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
}

 

使用示例

void onConnection(const muduo::net::TcpConnectionPtr& conn)
{
    if(conn->connected()) {
        std::cout << "New connection" << std::endl;
    } else {
        std::cout << "Connection failed" << std::endl;
    }
}

void onMessage(const muduo::net::TcpConnectionPtr& conn,
               muduo::net::Buffer *buffer)
              //const char* data,
              //ssize_t len)
{
    const std::string readbuf = buffer->retrieveAllAsString();
    std::cout << "Receive :" << readbuf.size()<< " bytes." << std::endl
              << "Content:"  << readbuf << std::endl; 
}

int main()
{

    muduo::net::EventLoop loop;
    muduo::net::TcpServer server(&loop, "8090");
    server.setConnectionCallback(onConnection);
    server.setMessageCallback(onMessage);
    server.start();
    loop.loop();
}

可以看到TcpServer使用比較方便,只需要設定好相應的回撥函式,然後start()。 
TcpServer在後臺默默地做了很多事情:socket、bind、listen、epoll_wait、accept等等。