1. 程式人生 > >muduo網路庫學習筆記(五) 連結器Connector與監聽器Acceptor

muduo網路庫學習筆記(五) 連結器Connector與監聽器Acceptor

本篇繼續為前面封裝的EventLoop新增事件,到現在共給EventLoop添加了兩個fd,Timerfd,EventFd分別用於處理定時任務和通知事件.
今天新增的Acceptor會增加另一個fd,此fd是是一個socket,用於監聽套接字連線.同時封裝非組賽網路程式設計中的connect(2)的使用Connector.

Connector

在非阻塞網路程式設計中,發起連線的基本方式是呼叫connect(2),當socket變得可寫時表明連線建立完畢,其中要處理各種型別的錯誤,我們把它封裝為Connector class.
Connector 和 Acceptor 設計思路基本一致,只是Acceptor通過判斷套接字是否可讀來執行回撥,而Connector是判斷套接字是否可寫來執行回撥.
還有一點就是錯誤處理,socket可寫不一定就是連線建立好了 , 當連線建立出錯時,套介面描述符變成既可讀又可寫,這時我們可以通過呼叫getsockopt來得到套介面上待處理的錯誤(SO_ERROR).

其次非阻塞網路程式設計中connect(2)的sockfd是一次性的,一旦出錯(比如對方拒絕連線),就無法恢復,只能關閉重來。但Connector是可以反覆使用的, 因此每次嘗試連線都要使用新的socket檔案描述符和新的Channel物件。要注意的就是Channel的生命期管理了.

系統函式connect

   #include <sys/types.h>          /* See NOTES */
   #include <sys/socket.h>
   
   int connect(int sockfd, const struct sockaddr *addr,
               socklen_t addrlen);
               

sockfd 試圖製作的一個連線到被繫結到addr指定地址的套接字。
addraddrlen 服務端地址和長度.

retrun:
成功 返回0 , 失敗 返回 -1.

處理非阻塞connect的步驟:

第一步:建立非阻塞socket,返回套介面描述符;
第二步:connect(2)開始建立連線;
第三步:判斷連線是否成功建立:

A:如果connect返回0,表示連線建立成功, 如果錯誤為EINPROGRESS 表示連線正在進行,可以等待select()變的可寫,通過getsockopt()來來得到套介面上待處理的錯誤(SO_ERROR),連線是否建立成功.如果連線建立成功,這個錯誤值將是0,如果建立連線時遇到錯誤,則這個值是連線錯誤所對應的errno值(比如:ECONNREFUSED,ETIMEDOUT等).
B: EAGAIN、EADDRINUSE、EADDRNOTAVAIL、ECONNREFUSED、ENETUNREACH 像EAGAIN 這類表明本機臨時埠暫時用完的錯誤、可以嘗試重連。
C: EACCES、EPERM、EAFNOSUPPORT、EALREADY、EBADF、EFAULT、ENOTSOCK 其他真錯誤像無許可權,協議錯誤,等直接關閉套接字.

Connector正是按這個步驟處理的連線.
暴露的介面只有start()和stop()
start()執行上述connect的步驟.
stop()關閉套接字,刪除註冊的通道,停止進行連線.

class Connector
{
public:
  typedef std::function<void (int sockfd)> NewConnectionCallback;

  Connector(EventLoop* loop, const InetAddress& serverAddr);
  ~Connector();

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  { m_newConnectionCallBack = cb; }

  void start();// can be called in any thread
  void stop(); // can be called in any thread

private:

  enum States { kDisconnected, kConnecting, kConnected };
  static const int kMaxRetryDelayMs = 30*1000;
  static const int kInitRetryDelayMs = 500;

  void connect();
  void connecting(int sockfd);

  void handleWrite();
  void handleError();

  void retry(int sockfd);
  int removeAndResetChannel();
  void resetChannel();

  void setState(States s) { m_state = s; }
  void startInLoop();
  void stopInLoop();

  EventLoop* p_loop;
  int m_retryDelayMs;
  InetAddress m_serverAddr;

  States m_state;

  std::unique_ptr<Channel> p_channel;
  NewConnectionCallback m_newConnectionCallBack;
};

Connetor時序圖

Connector::Connector(EventLoop* loop, const InetAddress& serverAddr)
  :p_loop(loop),
  m_serverAddr(serverAddr),
  m_state(kDisconnected),
  m_retryDelayMs(kInitRetryDelayMs)
{
  LOG_DEBUG << "ctor[" << this << "]";
}

Connector::~Connector()
{
  LOG_DEBUG << "dtor[" << this << "]";
  assert(!p_channel);
}

void Connector::start()
{

  p_loop->runInLoop(std::bind(&Connector::startInLoop, this));
}

void Connector::startInLoop()
{
  p_loop->assertInLoopThread();
  assert(m_state == kDisconnected);

  connect();
}

void Connector::stop()
{
  p_loop->queueInLoop(std::bind(&Connector::stopInLoop, this));
}

void Connector::stopInLoop()
{
  p_loop->assertInLoopThread();

  if(m_state == kConnecting)
  {
    int sockfd = removeAndResetChannel();
    sockets::close(sockfd);
    setState(kDisconnected);
  }
}

void Connector::connect()
{
  int sockfd = sockets::createNonblockingOrDie(m_serverAddr.family());
  int ret = sockets::connect(sockfd, m_serverAddr.getSockAddr());
  int savedErrno = (ret == 0) ? 0 : errno;

  if(ret != 0) LOG_TRACE << "connect error ("<< savedErrno << ") : " << strerror_tl(savedErrno);

  switch(savedErrno)
  {
    case 0:
    case EINPROGRESS:      //Operation now in progress
    case EINTR:            //Interrupted system call
    case EISCONN:          //Transport endpoint is already connected
      connecting(sockfd);
      break;

    case EAGAIN:
    case EADDRINUSE:
    case EADDRNOTAVAIL:
    case ECONNREFUSED:
    case ENETUNREACH:
      retry(sockfd);
      LOG_SYSERR << "reSave Error. " << savedErrno;
      break;

    case EACCES:
    case EPERM:
    case EAFNOSUPPORT:
    case EALREADY:
    case EBADF:
    case EFAULT:
    case ENOTSOCK:
      LOG_SYSERR << "connect error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      break;

    default:
      LOG_SYSERR << "Unexpected error in Connector::startInLoop " << savedErrno;
      sockets::close(sockfd);
      // connectErrorCallback_();
      break;
  }

}

void Connector::connecting(int sockfd)
{
  LOG_TRACE << "Connector::connecting] sockfd : " << sockfd;
  setState(kConnecting);
  assert(!p_channel);
  p_channel.reset(new Channel(p_loop, sockfd));
  p_channel->setWriteCallBack(std::bind(&Connector::handleWrite, this));
  //p_channel->setErrorCallback()

  //enableWriting if Channel Writeable ,Connect Success.
  p_channel->enableWriting();
}

void Connector::retry(int sockfd)
{
  sockets::close(sockfd);
  setState(kDisconnected);

  LOG_INFO << "Connector::retry - Retry connecting to " << m_serverAddr.toIpPort()
           << " in " << m_retryDelayMs << " milliseconds. ";

  p_loop->runAfter(m_retryDelayMs/1000.0, std::bind(&Connector::startInLoop, this));
  m_retryDelayMs = std::min(m_retryDelayMs * 2, kMaxRetryDelayMs);
}

int Connector::removeAndResetChannel()
{
  p_channel->disableAll();
  p_channel->remove();

  int sockfd = p_channel->fd();

  p_loop->queueInLoop(std::bind(&Connector::resetChannel, this));

  return sockfd;
}

void Connector::resetChannel()
{
  LOG_TRACE << "Connector::resetChannel()";
  p_channel.reset();
}

void Connector::handleWrite()
{
  LOG_TRACE << "Connector::handleWrite ";

  if(m_state == kConnecting)
  {
    int sockfd = removeAndResetChannel();
    int err = sockets::getSocketError(sockfd);

    if(err)
    {
      LOG_WARN << "Connector::handleWrite - SO_ERROR = "
               << err << " " << strerror_tl(err);
      retry(sockfd);
    }
    /*else if (sockets::isSelfConnect(sockfd))
    {

    }*/
    else
    {
      setState(kConnected);
      m_newConnectionCallBack(sockfd);
    }

  }
  else
  {
    assert(m_state == kDisconnected);
  }

}

void Connector::handleError()
{
  LOG_ERROR << "Connector::handleError States " << m_state;

  if(m_state == kConnecting)
  {
    int sockfd = removeAndResetChannel();
    int err = sockets::getSocketError(sockfd);
    LOG_TRACE << "SOCK_ERROR = " << err << " " << strerror_tl(err);
    retry(sockfd);
  }
}

Acceptor

相較於Connector更簡單,只要有socket可讀,即可確認連線建立.

系統函式accept

 #include <sys/types.h>          /* See NOTES */
#include <sys/socket.h>

       int accept(int sockfd, struct sockaddr addr, socklen_t addrlen);

       #define _GNU_SOURCE             /* See feature_test_macros(7) */
       #include <sys/socket.h>

       int accept4(int sockfd, struct sockaddr addr,
                   socklen_t 
addrlen, int flags);

sockfd socket(2)建立的檔案描述符, 且已被bind(2)繫結本地地址,listen(2)使能監聽.
addr 用於填充遠端套接字地址, 如果不需要知道遠端地址,可以添NULL.
addrlen 用於填充遠端地址大小.
flags
如果flags為0  等同於 accept.

       SOCK_NONBLOCK  在新開啟的檔案描述符設定 O_NONBLOCK 標記。在 fcntl(2) 中儲存這個標記可以得到相同的效果。

       SOCK_CLOEXEC  在新開啟的檔案描述符裡設定 close-on-exec (FD_CLOEXEC) 標記。參看在open(2)裡關於 O_CLOEXEC標記的描述來了解這為什麼有用。

  int connfd = ::accept4(sockfd, (struct sockaddr *)(addr),
                         &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
 
flags 會對返回的fd  connfd  設定SOCK_NONBLOCK | SOCK_CLOEXEC 標記.

如果用於監聽的檔案描述符沒有設定nonblocking標誌,且監聽佇列上沒有掛起的連線, accept()會阻塞直到有新的連線到來. 如果此socket設定了nonblocking標記,accept() 會立即返回失敗並設定 error 為 EAGAIN or EWOULDBLOCK.

Socket的封裝

Socket類封裝一個套接字 fd 析構的時候close 管理套接字的生命期.

class Socket{
public:
  explicit Socket(int sockfd) : m_sockfd(sockfd) { }
  ~Socket();

  int fd() const { return m_sockfd; }
 
  void bindAddress(const InetAddress& localaddr);
  void listen();
  int accept(int sockfd, struct sockaddr_in6* addr);

  int accept(InetAddress* peeraddr);

private:
  const int m_sockfd;
};

Acceptor的封裝

Acceptor的資料成員包含Socket和Channel,Acceptor的Socket是服務端的監聽socket,Channel用於觀察此socket上的readable事件.並回調Acceptor:: handleRead(),handleRead()會呼叫accept(2)來接受新連線, 並回呼叫戶callback。

class Acceptor{
public:
  typedef std::function<void (int sockfd, const InetAddress&)> NewConnectionCallBack;

  Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport = true);
  ~Acceptor();

  void listen();
  bool listenning() const { return m_listenning; } // get listen status.

  void setNewConnectionCallBack(const NewConnectionCallBack& cb) { m_newConnectionCallBack = cb; }

private:
  void handleRead(); //處理新到的連線.

  EventLoop* p_loop;
  Socket m_acceptSocket;
  Channel m_acceptChannel;
  NewConnectionCallBack m_newConnectionCallBack;
  bool m_listenning;
  int m_idleFd;
};

Acceptor時序圖.

Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
  :p_loop(loop),
  m_acceptSocket(sockets::createNonblockingOrDie(listenAddr.family())),
  m_acceptChannel(loop, m_acceptSocket.fd()),
  m_listenning(false),
  m_idleFd(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
  assert(m_idleFd >= 0);
  m_acceptSocket.setReuseAddr(true);
  m_acceptSocket.setReuseAddr(reuseport);
  m_acceptSocket.bindAddress(listenAddr);
  m_acceptChannel.setReadCallBack(
    std::bind(&Acceptor::handleRead, this));
}

Acceptor::~Acceptor()
{
  m_acceptChannel.disableAll();
  m_acceptChannel.remove();
  ::close(m_idleFd);
}

void Acceptor::listen()
{
  p_loop->assertInLoopThread();
  m_listenning = true;
  m_acceptSocket.listen();
  m_acceptChannel.enableReading();
}

void Acceptor::handleRead()
{
  p_loop->assertInLoopThread();
  InetAddress peerAddr;
  int connfd = m_acceptSocket.accept(&peerAddr);
  if(connfd >= 0)
  {
    if(m_newConnectionCallBack)
    {
      m_newConnectionCallBack(connfd, peerAddr);
    }
    else
    {
      sockets::close(connfd);
    }
  }
  else
  {
    LOG_SYSERR << "in Acceptor::handleRead";
    if(errno == EMFILE)
    {
      ::close(m_idleFd);
      m_idleFd = ::accept(m_acceptSocket.fd(), NULL, NULL);
      ::close(m_idleFd);
      m_idleFd = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
    }
  }

簡單測試程式

Acceptor

void newConnetion(int sockfd, const InetAddress& peeraddr)
{
  LOG_DEBUG << "newConnetion() : accepted a new connection from";
  ::sockets::close(sockfd);
}

int main()
{
  InetAddress listenAddr(8888);
  EventLoop loop;
  Acceptor acceptor(&loop, listenAddr);
  acceptor.setNewConnectionCallBack(newConnetion);
  acceptor.listen();

  loop.loop();

}

Connctor

EventLoop* g_loop;

void newConnetion(int sockfd)
{
  LOG_DEBUG << "newConnetion() : Connected a new connection.";
  sockets::close(sockfd);
  g_loop->quit();
}

int main()
{
  EventLoop loop;
  g_loop = &loop;

  InetAddress serverAddr("127.0.0.1", 8888);
  Connector client(&loop, serverAddr);
  client.setNewConnectionCallback(newConnetion);
  client.start();

  loop.loop();

}

執行日誌

作者 —— 艾露米婭娜 

出處:http://www.cnblogs.com/ailumiyana/ 

鄭州治療婦科費用

鄭州做包皮手術

鄭州專業婦科醫院

鄭州專業婦科