1. 程式人生 > >muduo原始碼分析之實現TCP網路庫(連線的接收和關閉)

muduo原始碼分析之實現TCP網路庫(連線的接收和關閉)

EventLoopChannelPoller三個類中完成對一般描述符、事件迴圈(poll)的封裝。實現了Reactor的基本功能,接下來則需要將網路套接字描述符、I/O函式、等進行封裝。

1.傳統的TcpServer

在進行封裝之前需要明確我們需要封裝的內容有哪些?複習最簡單的TcpServer大概需要經歷如下步驟:

listenfd= socket(AF_INET,SOCK_STREAM,0);
strcut sockaddr_in servaddr;
bind(listenfd,(SA*)&servaddr,sizeof(servaddr));
listen(listenfd,LISTENQ);
struct
pollfd fds[1024]; fds[0].fd=listenfd; fds[0].events=POLLIN; while(1) { ret=poll(fds,fds.size(),timeout); //handle_event if(fds[0].revents&POLLIN) { //new connection connectedfd = accept(listenfd,clienAddr,sizeof(clienAddr)); //add connectedfd into POLL } }

這是一個比較典型的reactor TcpServer,poll呼叫的部分已經封裝,目前來看至少需要封裝的內容有:
1.網路套接字描述符fd
2.描述地址結構體strcut sockaddr_in


3.socket相關的系統呼叫

2.socket操作封裝

Endian.h

封裝了位元組序轉換函式(全域性函式,位於muduo::net::sockets名稱空間中)。

SocketsOps.h/ SocketsOps.cc

封裝了socket相關係統呼叫(全域性函式,位於muduo::net::sockets名稱空間中)。

Socket.h/Socket.cc(Socket類)

用RAII方法封裝socket file descriptor,包含操作:listenbindaccept這些操作將呼叫上述封裝的內容。

InetAddress.h/InetAddress.cc(InetAddress類)

網際地址sockaddr_in封裝

3.使用Acceptor類封裝監聽描述符

這個類用於處理accept呼叫,事實上是對監聽套接字Listenfd的封裝。

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr)
  : loop_(loop),
    acceptSocket_(sockets::createNonblockingOrDie()),//create socket
    acceptChannel_(loop, acceptSocket_.fd()),//Channel construct
    listenning_(false)
{
  acceptSocket_.setReuseAddr(true);//不用等待Time_Wait狀態結束
  acceptSocket_.bindAddress(listenAddr);//bind
  acceptChannel_.setReadCallback(
      boost::bind(&Acceptor::handleRead, this));
}

從Acceptor的建構函式可以看出它包含了Socket物件,也就是說它本質上是一個描述符。
通過Channel型別的物件,設定了當Acceptor所管理的描述符可讀時,進行所執行的回撥handleRead
handleRead函式:

void Acceptor::handleRead()
{
  loop_->assertInLoopThread();
  InetAddress peerAddr(0);
  //FIXME loop until no more
  int connfd = acceptSocket_.accept(&peerAddr);//accpet
  if (connfd >= 0) {
    if (newConnectionCallback_) {
      newConnectionCallback_(connfd, peerAddr);//使用者回撥
    } else {
      sockets::close(connfd);
    }
  }
}

當listenfd可讀時,說明有新連線,這個時候呼叫accept建立已連線套接字,並且執行相應的使用者回撥。
到目前為止,完成了對監聽套接字的簡單封裝,這個封裝是不完全的,因為事實上,當有了TcpServer的概念後,監聽套接字對於使用者來說應該是不可見的。

不過在對TcpServer進行封裝之前,可以測試Acceptor的功能:

void newConnection(int sockfd, const muduo::InetAddress& peerAddr)
{
  printf("newConnection(): accepted a new connection from %s\n",
         peerAddr.toHostPort().c_str());
  ::write(sockfd, "How are you?\n", 13);
  muduo::sockets::close(sockfd);
}

int main()
{
  printf("main(): pid = %d\n", getpid());

  muduo::InetAddress listenAddr(9981);

  muduo::EventLoop loop;

  muduo::Acceptor acceptor(&loop, listenAddr);

  acceptor.setNewConnectionCallback(newConnection);

  acceptor.listen();

  loop.loop();
}
4.TcpServer接收新的連線

看了上述最後一個例子我們發現,監聽套接字的封裝暴露給了使用者。所以設計的TcpServer類就需要對Acceptor再封裝一層了。

TcpServer::TcpServer(EventLoop* loop, const InetAddress& listenAddr)
  : loop_(CHECK_NOTNULL(loop)),
    name_(listenAddr.toHostPort()),//
    acceptor_(new Acceptor(loop, listenAddr)),//Accept的封裝
    started_(false),
    nextConnId_(1)//記錄連線數,當有新連線的時候會自增
{
  acceptor_->setNewConnectionCallback(
      boost::bind(&TcpServer::newConnection, this, _1, _2));
}

我們看在之前的Acceptor中,只是在main中設定了使用者回撥,向已連線套接字write了一段文字,便close掉了,對已連線套接字並沒有處理。
現在在Acceptor的回撥中設定TcpServer::newConnection

void TcpServer::newConnection(int sockfd, const InetAddress& peerAddr)
{
    //sockfd 是已連線套接字
  loop_->assertInLoopThread();
  char buf[32];
  snprintf(buf, sizeof buf, "#%d", nextConnId_);
  ++nextConnId_;//連線數+1
  std::string connName = name_ + buf;

  LOG_INFO << "TcpServer::newConnection [" << name_
           << "] - new connection [" << connName
           << "] from " << peerAddr.toHostPort();
  //列印特定訊息

  InetAddress localAddr(sockets::getLocalAddr(sockfd));

  //TcpConnection類用來管理已連線套接字
  TcpConnectionPtr conn(
      new TcpConnection(loop_, //EventLoop 
      connName,//ConnectionName
      sockfd, //accepted fd
      localAddr,//
      peerAddr));//

  connections_[connName] = conn;//map
  conn->setConnectionCallback(connectionCallback_);//設定使用者回撥
  conn->setMessageCallback(messageCallback_);//設定使用者回撥
  conn->connectEstablished();//在這個函式中,會執行onConnection的使用者回撥
}

在這個回撥中,建立了一個關鍵的物件,TcpConnection,並用shared_ptr管理,該Class用來管理已連線套接字。

TcpConnection::TcpConnection(EventLoop* loop,
                             const std::string& nameArg,
                             int sockfd,
                             const InetAddress& localAddr,
                             const InetAddress& peerAddr)
  : loop_(CHECK_NOTNULL(loop)),//EventLoop
    name_(nameArg),//
    state_(kConnecting),
    socket_(new Socket(sockfd)),//已連線套接字
    channel_(new Channel(loop, sockfd)),//Channel
    localAddr_(localAddr),//
    peerAddr_(peerAddr)//
{
  LOG_DEBUG << "TcpConnection::ctor[" <<  name_ << "] at " << this
            << " fd=" << sockfd;
  channel_->setReadCallback(
      boost::bind(&TcpConnection::handleRead, this));//當已連線套接字可讀時
}

當已連線套接字可讀將回調TcpConnection::handleRead

void TcpConnection::handleRead()
{
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);//read
  messageCallback_(shared_from_this(), buf, n);//onMessage使用者回撥
  // FIXME: close connection if n == 0
}

busy loop事件

對於執行s05/test8.cc它是一個discard服務,當客戶端關閉連線,將進入busy loop,原因在與當客戶端斷開連線時,客戶端傳送一個FIN段,服務端返回一個ACK,當服務端TCP收到FIN時,read就返回0(表示EOF),而此時這種狀態總是readable,將會不停觸發poll返回,就出現busy loop事件了。

5.TcpConnection關閉連線

關閉連線相對於建立連線要麻煩,因為需要考慮TcpConnection的生命週期。

監聽套接字可讀事件是POLLIN; 已連線套接字正常可讀是POLLIN; 正常可寫是POLLOUT; 對等方close/shutdown關閉連線,已連線套接字可讀是POLLIN | POLLHUP;

在TcpConnection 建構函式中再新增:

// 連線關閉,回撥TcpConnection::handleClose
channel_->setCloseCallback(
    boost::bind(&TcpConnection::handleClose, this));
// 發生錯誤,回撥TcpConnection::handleError
channel_->setErrorCallback(
    boost::bind(&TcpConnection::handleError, this));

在 TcpServer::newConnection() 中再新增:

void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr)
{
    .....
    conn->setCloseCallback(
        boost::bind(&TcpServer::removeConnection, this, _1));
}

在TcpConnection::handleRead() 中再新增:

void TcpConnection::handleRead()
{
  char buf[65536];
  ssize_t n = ::read(channel_->fd(), buf, sizeof buf);
  if (n > 0) {
    messageCallback_(shared_from_this(), buf, n);
  } else if (n == 0) {
    handleClose();
  } else {
    handleError();
  }
}

在來看看TcpConnection::handleClose()函式

void TcpConnection::handleClose()
{
  loop_->assertInLoopThread();
  LOG_TRACE << "TcpConnection::handleClose state = " << state_;
  assert(state_ == kConnected);
  // we don't close fd, leave it to dtor, so we can find leaks easily.
  channel_->disableAll();
  // must be the last line
  closeCallback_(shared_from_this());
}

回撥來回調去的有點暈,整理一下整個過程。
當已連線套接字發生可讀事件,poll返回,將呼叫呼叫Channel::handleEvent()處理活動通道,呼叫TcpConnection::handleRead(),::read() 返回0,進而呼叫TcpConnection::handleClose()

handleClose()函式中,呼叫TcpConnection的closeCallback_,這個回撥函式是在TcpServer裡面設定:

conn->setCloseCallback(
        boost::bind(&TcpServer::removeConnection, this, _1));

進而呼叫TcpServer::removeConnection

void TcpServer::removeConnection(const TcpConnectionPtr& conn)
{
  loop_->assertInLoopThread();
  LOG_INFO << "TcpServer::removeConnection [" << name_
           << "] - connection " << conn->name();
  size_t n = connections_.erase(conn->name());
  assert(n == 1); (void)n;
  loop_->queueInLoop(
      boost::bind(&TcpConnection::connectDestroyed, conn));
}

最後將呼叫TcpConnection::connectDestroy

void TcpConnection::connectDestroyed()
{
  loop_->assertInLoopThread();
  assert(state_ == kConnected);
  setState(kDisconnected);
  channel_->disableAll();
  connectionCallback_(shared_from_this());//回撥使用者onConnection Callback

  loop_->removeChannel(get_pointer(channel_));//Poll不再關注此通道
}
6.參考