【網路元件】客戶端TcpClient
本節主要研究主動發起連線Connector和客戶端TcpClient的主要實現;
整體類圖
主動發起連線Connector
(1)Connector只負責建立Socket連線,不負責建立TcpConnection,由TcpClient提供的NewConnectionCallback回撥,來建立TcpConnection;
(2)當使用connect對非阻塞性的socket描述符發起主動連線時,一般會返回EINPROGRESS(正在連線),此時我們應該關注描述符的可寫事件;當有可寫事件時,還要使用getsockopt來獲取SO_ERROR型別,再次確認一下是否已經連線;
(3)當主動連線不能成功時,Connector將採用退避時間的方式來再次發起連線。當然嘗試一定次數以後,如果仍然不能連線,那麼將會放棄連線;
客戶端TcpClient
(1)TcpClient只還有一個TcpConnection,也就是Connector主動連線成功後建立的TcpConnection;TcpClient本身含有Connector,Connector的生命週期由其控制;
(2)TcpClient含有一個EventLoop,負責TcpConnection事件的監聽;
(3)MessageCallback,ConnectionCallback為使用者提供的資料接收,連線建立和斷開的回撥函式;
接受新連線示意圖如下
接受資料示意圖如下
連線斷開示意圖如下
Connector
Connector宣告
說明幾點:class Event; class EventLoop; class Connector final { public: Connector(const Connector&) = delete; Connector& operator=(const Connector&) = delete; Connector(EventLoop* loop); void setNewConnectionCallback(const NewConnectionCallback& cb) { _newConnectionCallback = cb; } void connect(const InetAddress& serverAddr); private: void _bind(); void _retry(); void _handleWrite(); int _connectfd; EventLoop* _loop; std::unique_ptr<Event> _connectEvent; InetAddress _serverAddr; bool _connected; int _interval = 2; //C++ 11 int _currentTryCounts = 0; //C++ 11 static const int _maxTryCounts = 10; NewConnectionCallback _newConnectionCallback; };
(1)_interval為主動連線失敗後,退避的時間計數;_currentTryCounts為當前已嘗試的次數, _maxTryCounts為嘗試的最大次數;在C++11之前,是非static成員變數是不允許類內初始值的;
(2)當使用connect對非阻塞性的socket描述符發起主動連線時,一般會返回EINPROGRESS(正在連線),此時我們應該關注描述符的可寫事件,_handleWrite就是有寫事件時的回撥函式;當有可寫事件時,還要使用getsockopt來獲取SO_ERROR型別,再次確認一下是否已經連線;
(3)_serverAddr為發起連線的伺服器的地址;
建構函式
Connector::Connector(EventLoop* loop) :
_connectfd(sockets::createNonBlockingSocket()),
_loop(loop),
_connectEvent(new Event(_connectfd, _loop)),
_connected(false)
{
}
主動發起連線
void Connector::connect(const InetAddress& serverAddr)
{
_serverAddr = serverAddr;
int err = sockets::connect(_connectfd, _serverAddr.address());
if (err == 0)
{
if (_newConnectionCallback)
{
_newConnectionCallback(_connectfd, _serverAddr);
}
else
{
::close(_connectfd);
}
}
else
{
if (errno == EINPROGRESS)
{
_connectEvent->setWriteCallback(std::bind(&Connector::_handleWrite, this));
_connectEvent->enableWriting();
}
else
{
_retry();
}
}
}
說明幾點:
(1)如果err為0,說明第一次嘗試連線就成功了,此時就回調TcpClient提供建立TcpConnection的函式;否則如果errno為EINPROGRESS,那就要註冊寫事件要進行進一步的判斷;
(2)errno不為EINPROGRESS時,說明connect出錯了,那麼我們還要通過_retry繼續重新嘗試連線;
寫事件進一步判斷
void Connector::_handleWrite()
{
LOG_DEBUG << "Connector::_handleWrite";
if (_connectEvent->isWriting())
{
LOG_DEBUG << "Connector::_handleWrite isWriting";
_connectEvent->disableWriting();
_loop->removeEvent(_connectEvent.get());
int err = sockets::socketError(_connectfd);
if (err == 0)
{
if (_newConnectionCallback)
{
_newConnectionCallback(_connectfd, _serverAddr);
}
else
{
::close(_connectfd);
}
}
else
{
_retry();
}
}
}
說明幾點:
(1)socketError為getsockopt來獲取SO_ERROR型別的封裝,詳細介紹請看下文;
(2)err為0說明連線已經建立成功,此時就回調TcpClient提供建立TcpConnection的函式;當err仍然不為0時,我們還要通過_retry繼續重新嘗試連線;
重新嘗試連線
void Connector::_retry()
{
LOG_DEBUG << "in retry";
if (_currentTryCounts > _maxTryCounts)
{
::close(_connectfd);
return;
}
_loop->addSingleTimer(std::bind(&Connector::connect, this, _serverAddr), _interval);
_interval = _interval * 2;
++_currentTryCounts;
}
說明幾點:
(1)當已嘗試連線的次數_currentTryCounts大於最大的嘗試次數_maxTryCounts,就不再嘗試了,關閉連線;
(2)如果需要嘗試連線,就在_loop的註冊一個嘗試連線的定時函式;退避的時間_interval採用指數增長的方式;
TcpClient
TcpClient宣告
class Connector;
class TcpConnection;
class EventLoop;
class TcpClient final
{
public:
TcpClient(const TcpClient&) = delete;
TcpClient& operator=(const TcpClient&) = delete;
explicit TcpClient(EventLoop* loop, const InetAddress& serverAddr);
typedef std::shared_ptr<TcpConnection> TcpConnectionPtr;
void setMessageCallback(const MessageCallback& cb)
{
_messageCallback = cb;
}
void setConnectionCallback(const ConnectionCallback& cb)
{
_connectionCallback = cb;
}
void start();
private:
void _newConnection(int connfd, const InetAddress& peerAddr);
void _removeConnection(const TcpConnectionPtr& conn);
void _removeConnectionInLoop(const TcpConnectionPtr& conn);
InetAddress _serverAddr;
EventLoop* _loop;
std::unique_ptr<Connector> _connector;
TcpConnectionPtr _connection;
MessageCallback _messageCallback;
ConnectionCallback _connectionCallback;
};
說明幾點:
(1)MessageCallback為連線上有接收資料時呼叫的回撥(處理業務邏輯),這些資料存放在連線的應用層接收緩衝區中;ConnectionCallback在連線建立和斷開時,都會呼叫的回撥,主要是通知使用者進行相應的處理;
(2)在TcpClient中,只使用一個TcpConnectionPtr來儲存連線建立後的TcpConnection;
(3)TcpClient有一個EventLoop,由使用者來傳入,主要用於對TcpConnection的事件的監聽;
(4)std::unique_ptr<Connector> _connector是TcpClient的成員,用於主動發起連線,_newConnection為註冊給_connector的回撥,當有新連線時,會回撥該函式;
(5)_serverAddr為發起主動連線的伺服器地址;
建構函式
TcpClient::TcpClient(EventLoop* loop, const InetAddress& serverAddr) :
_serverAddr(serverAddr),
_loop(loop),
_connector(new Connector(loop))
{
_connector->setNewConnectionCallback(std::bind(&TcpClient::_newConnection, this, std::placeholders::_1, std::placeholders::_2));
}
發起主動連線
void TcpClient::start()
{
_loop->runInLoop(std::bind(&Connector::connect, _connector.get(), _serverAddr));
}
說明幾點
(1)為了保證執行緒安全性,發起主動連線的Connector的connect在TcpClient的loop中進行;
接受新連線
void TcpClient::_newConnection(int connfd, const InetAddress& peerAddr)
{
LOG_INFO << "TcpClient::_newConnection fd [" << connfd << "] from " << peerAddr.hostNameString();
InetAddress clientAddr;
sockets::getLockAddress(connfd, clientAddr.address());
TcpConnectionPtr conn(new TcpConnection(connfd, _loop, peerAddr, clientAddr));
_connection = conn;
conn->setMessageCallback(_messageCallback);
conn->setConnectionCallback(_connectionCallback);
conn->setCloseConnectionCallback(std::bind(&TcpClient::_removeConnection, this, std::placeholders::_1));
conn->connectEstablished();
}
說明幾點:
(1)該函式只要由Connector中connect後新連線的回撥函式;
(2) _conn放入 _connection中,這樣TcpClient也就持有了TcpConnection,如果TcpClient不刪除TcpConnection,那麼TcpConnection是不會析構的;
(3)setMessageCallback,setConnectionCallback,setCloseConnectionCallback分別為設定資訊接收時的使用者回撥,連線建立和斷開時的使用者回撥,TcpClient連線斷開回調;connectEstablished為讓IO執行緒開始接管TcpConnection上的事件監聽;
移除連線
void TcpClient::_removeConnection(const TcpConnectionPtr& conn) //thread safe
{
_loop->queueInLoop(std::bind(&TcpClient::_removeConnectionInLoop, this, conn));
}
void TcpClient::_removeConnectionInLoop(const TcpConnectionPtr& conn)
{
LOG_INFO << "TcpClient::_removeConnection fd [" << conn->connfd() << "]";
_connection.reset();
}
說明幾點:
(1)_removeConnection為上述TcpConnection通過setCloseConnectionCallback設定的TcpClient連線斷開回調,最後連線斷開時,為保證執行緒安全性,將會在TcpClient本身的EventLoop中進行真正的刪除;
SocketOps本節相關的函式
connect
int sockets::connect(int sockfd, const struct sockaddr_in& serverAddr)
{
int err = ::connect(sockfd, reinterpret_cast<const sockaddr*>(&serverAddr), sizeof serverAddr);
if (err < 0)
{
int savederrno = errno;
switch (savederrno)
{
case ETIMEDOUT:
case EINTR:
case ECONNREFUSED:
case EHOSTUNREACH:
case ENETUNREACH:
case EINPROGRESS:
errno = savederrno;
LOG_SYSERR << "connect system expected error: " << strError();
break;
case ENOTSOCK:
case EFAULT:
case EISCONN:
LOG_SYSERR << "connect system unexpected error: " << strError();
abort();
break;
default:
LOG_SYSERR << "connect system unknown error: " << strError();
abort();
break;
}
}
return err;
}
獲取SO_ERROR
int sockets::socketError(int sockfd)
{
int optval = 1;
socklen_t len = sizeof optval;
if (getsockopt(sockfd, SOL_SOCKET, SO_ERROR, &optval, &len) < 0)
{
LOG_SYSERR << "getsockopt system error: " << strError();
return errno;
}
else
{
return optval;
}
}