1. 程式人生 > >【網路元件】客戶端TcpClient

【網路元件】客戶端TcpClient

      本節主要研究主動發起連線Connector和客戶端TcpClient的主要實現;

整體類圖


主動發起連線Connector

(1)Connector只負責建立Socket連線,不負責建立TcpConnection,由TcpClient提供的NewConnectionCallback回撥,來建立TcpConnection;

(2)當使用connect對非阻塞性的socket描述符發起主動連線時,一般會返回EINPROGRESS(正在連線),此時我們應該關注描述符的可寫事件;當有可寫事件時,還要使用getsockopt來獲取SO_ERROR型別,再次確認一下是否已經連線;

(3)當主動連線不能成功時,Connector將採用退避時間的方式來再次發起連線。當然嘗試一定次數以後,如果仍然不能連線,那麼將會放棄連線;

客戶端TcpClient

(1)TcpClient只還有一個TcpConnection,也就是Connector主動連線成功後建立的TcpConnection;TcpClient本身含有Connector,Connector的生命週期由其控制;

(2)TcpClient含有一個EventLoop,負責TcpConnection事件的監聽;

(3)MessageCallback,ConnectionCallback為使用者提供的資料接收,連線建立和斷開的回撥函式;

接受新連線示意圖如下


接受資料示意圖如下


連線斷開示意圖如下


Connector

Connector宣告

class Event;
class EventLoop;

class Connector final
{
public:
  Connector(const Connector&) = delete;
  Connector& operator=(const Connector&) = delete;

  Connector(EventLoop* loop);

  void setNewConnectionCallback(const NewConnectionCallback& cb)
  {
    _newConnectionCallback = cb;
  }

  void connect(const InetAddress& serverAddr);

private:
  void _bind();

  void _retry();

  void _handleWrite();

  int _connectfd;
  EventLoop* _loop;
  std::unique_ptr<Event>  _connectEvent;

  InetAddress _serverAddr;
  bool _connected;

  int _interval = 2;         //C++ 11
  int _currentTryCounts = 0; //C++ 11
  static const int _maxTryCounts = 10;

  NewConnectionCallback _newConnectionCallback;
};
說明幾點:

(1)_interval為主動連線失敗後,退避的時間計數;_currentTryCounts為當前已嘗試的次數, _maxTryCounts為嘗試的最大次數;在C++11之前,是非static成員變數是不允許類內初始值的;

(2)當使用connect對非阻塞性的socket描述符發起主動連線時,一般會返回EINPROGRESS(正在連線),此時我們應該關注描述符的可寫事件,_handleWrite就是有寫事件時的回撥函式;當有可寫事件時,還要使用getsockopt來獲取SO_ERROR型別,再次確認一下是否已經連線;

(3)_serverAddr為發起連線的伺服器的地址;

建構函式

Connector::Connector(EventLoop* loop) :
    _connectfd(sockets::createNonBlockingSocket()),
    _loop(loop),
    _connectEvent(new Event(_connectfd, _loop)),
    _connected(false)
{

}

主動發起連線

void Connector::connect(const InetAddress& serverAddr)
{
  _serverAddr = serverAddr;
  int err = sockets::connect(_connectfd, _serverAddr.address());
  if (err == 0)
    {
      if (_newConnectionCallback)
        {
          _newConnectionCallback(_connectfd, _serverAddr);
        }
      else
        {
          ::close(_connectfd);
        }
    }
  else
    {
      if (errno == EINPROGRESS)
        {
          _connectEvent->setWriteCallback(std::bind(&Connector::_handleWrite, this));
          _connectEvent->enableWriting();
        }
      else
        {
          _retry();
        }
    }
}

說明幾點:

(1)如果err為0,說明第一次嘗試連線就成功了,此時就回調TcpClient提供建立TcpConnection的函式;否則如果errno為EINPROGRESS,那就要註冊寫事件要進行進一步的判斷;

(2)errno不為EINPROGRESS時,說明connect出錯了,那麼我們還要通過_retry繼續重新嘗試連線;

寫事件進一步判斷

void Connector::_handleWrite()
{

  LOG_DEBUG << "Connector::_handleWrite";

  if (_connectEvent->isWriting())
    {
      LOG_DEBUG << "Connector::_handleWrite isWriting";
      _connectEvent->disableWriting();
      _loop->removeEvent(_connectEvent.get());

      int err = sockets::socketError(_connectfd);
      if (err == 0)
        {

          if (_newConnectionCallback)
            {
              _newConnectionCallback(_connectfd, _serverAddr);
            }
          else
            {
              ::close(_connectfd);
            }

        }
      else
        {
          _retry();
        }
    }
}

說明幾點:

(1)socketError為getsockopt來獲取SO_ERROR型別的封裝,詳細介紹請看下文;

(2)err為0說明連線已經建立成功,此時就回調TcpClient提供建立TcpConnection的函式;當err仍然不為0時,我們還要通過_retry繼續重新嘗試連線;

重新嘗試連線

void Connector::_retry()
{
  LOG_DEBUG << "in retry";
  if (_currentTryCounts > _maxTryCounts)
  {
    ::close(_connectfd);
    return;
  }
    
  _loop->addSingleTimer(std::bind(&Connector::connect, this, _serverAddr), _interval);

  _interval = _interval * 2;
  ++_currentTryCounts;
}
說明幾點:

(1)當已嘗試連線的次數_currentTryCounts大於最大的嘗試次數_maxTryCounts,就不再嘗試了,關閉連線;

(2)如果需要嘗試連線,就在_loop的註冊一個嘗試連線的定時函式;退避的時間_interval採用指數增長的方式;

TcpClient

TcpClient宣告

class Connector;
class TcpConnection;
class EventLoop;

class TcpClient final
{
public:
  TcpClient(const TcpClient&) = delete;
  TcpClient& operator=(const TcpClient&) = delete;

  explicit TcpClient(EventLoop* loop, const InetAddress& serverAddr);

  typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;

  void setMessageCallback(const MessageCallback& cb)
  {
    _messageCallback = cb;
  }

  void setConnectionCallback(const ConnectionCallback& cb)
  {
    _connectionCallback = cb;
  }

  void start();

private:
  void _newConnection(int connfd, const InetAddress& peerAddr);

  void _removeConnection(const TcpConnectionPtr&  conn);
  void _removeConnectionInLoop(const TcpConnectionPtr& conn);

  InetAddress _serverAddr;

  EventLoop* _loop;

  std::unique_ptr<Connector> _connector;

  TcpConnectionPtr _connection;

  MessageCallback _messageCallback;
  ConnectionCallback _connectionCallback;
};
說明幾點:

(1)MessageCallback為連線上有接收資料時呼叫的回撥(處理業務邏輯),這些資料存放在連線的應用層接收緩衝區中;ConnectionCallback在連線建立和斷開時,都會呼叫的回撥,主要是通知使用者進行相應的處理;

(2)在TcpClient中,只使用一個TcpConnectionPtr來儲存連線建立後的TcpConnection;

(3)TcpClient有一個EventLoop,由使用者來傳入,主要用於對TcpConnection的事件的監聽;

(4)std::unique_ptr<Connector> _connector是TcpClient的成員,用於主動發起連線,_newConnection為註冊給_connector的回撥,當有新連線時,會回撥該函式;

(5)_serverAddr為發起主動連線的伺服器地址;

建構函式

TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr) :
    _serverAddr(serverAddr),
    _loop(loop),
    _connector(new Connector(loop))
{
  _connector->setNewConnectionCallback(std::bind(&TcpClient::_newConnection, this, std::placeholders::_1, std::placeholders::_2));
}

發起主動連線

void TcpClient::start()
{
  _loop->runInLoop(std::bind(&Connector::connect, _connector.get(), _serverAddr));
}
說明幾點

(1)為了保證執行緒安全性,發起主動連線的Connector的connect在TcpClient的loop中進行;

接受新連線

void TcpClient::_newConnection(int connfd, const InetAddress& peerAddr)
{
  LOG_INFO << "TcpClient::_newConnection fd [" << connfd << "] from " << peerAddr.hostNameString();

  InetAddress clientAddr;
  sockets::getLockAddress(connfd, clientAddr.address());
  TcpConnectionPtr conn(new TcpConnection(connfd, _loop, peerAddr, clientAddr));

  _connection = conn;

  conn->setMessageCallback(_messageCallback);
  conn->setConnectionCallback(_connectionCallback);
  conn->setCloseConnectionCallback(std::bind(&TcpClient::_removeConnection, this, std::placeholders::_1));
  conn->connectEstablished();
}

說明幾點:

(1)該函式只要由Connector中connect後新連線的回撥函式;

(2) _conn放入 _connection中,這樣TcpClient也就持有了TcpConnection,如果TcpClient不刪除TcpConnection,那麼TcpConnection是不會析構的;

(3)setMessageCallback,setConnectionCallback,setCloseConnectionCallback分別為設定資訊接收時的使用者回撥,連線建立和斷開時的使用者回撥,TcpClient連線斷開回調;connectEstablished為讓IO執行緒開始接管TcpConnection上的事件監聽;

移除連線

void TcpClient::_removeConnection(const TcpConnectionPtr& conn) //thread safe
{
  _loop->queueInLoop(std::bind(&TcpClient::_removeConnectionInLoop, this, conn));
}

void TcpClient::_removeConnectionInLoop(const TcpConnectionPtr& conn)
{
  LOG_INFO << "TcpClient::_removeConnection fd [" << conn->connfd() << "]";
  _connection.reset();
}

說明幾點:

(1)_removeConnection為上述TcpConnection通過setCloseConnectionCallback設定的TcpClient連線斷開回調,最後連線斷開時,為保證執行緒安全性,將會在TcpClient本身的EventLoop中進行真正的刪除;

SocketOps本節相關的函式

connect

int sockets::connect(int sockfd, const struct sockaddr_in& serverAddr)
{
  int err = ::connect(sockfd, reinterpret_cast<const sockaddr*>(&serverAddr), sizeof serverAddr);

  if (err < 0)
    {
      int savederrno = errno;
      switch (savederrno)
        {
        case ETIMEDOUT:
        case EINTR:
        case ECONNREFUSED:
        case EHOSTUNREACH:
        case ENETUNREACH:
        case EINPROGRESS:
          errno = savederrno;
          LOG_SYSERR << "connect system expected error: " << strError();
          break;

        case ENOTSOCK:
        case EFAULT:
        case EISCONN:
          LOG_SYSERR << "connect system unexpected error: " << strError();
          abort();
          break;

        default:
          LOG_SYSERR << "connect system unknown error: " << strError();
          abort();
          break;
        }
    }

  return err;
}

獲取SO_ERROR

int sockets::socketError(int sockfd)
{
  int optval = 1;
  socklen_t len = sizeof optval;

  if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &len) < 0)
    {
      LOG_SYSERR << "getsockopt system error: " << strError();
      return errno;
    }
  else
    {
      return optval;
    }
}