Boos::asio伺服器開發之連線管理
boost.asio相信很多人聽說過,作為一個跨平臺的通訊庫,它的效能是很出色的,然而它卻談不上好用,裡面有很多地方稍不注意就會出錯,要正確的用好asio還是需要花一番精力去學習和實踐的,本文將通過介紹如何寫一個簡單的通訊程式來告訴讀者如何使用asio,希望對asio的初學者有所幫助。由於只是介紹其基本用法,作為例子的簡單示例並不考慮很多的業務邏輯和異常處理,只是介紹基本用法,讓初學者入門。
使用asio容易出錯的一個主要原因是因為它是基於proactor模式實現的,asio有很多非同步操作介面,這些非同步介面稍不注意就會出現莫名奇妙的錯誤,所以要用好asio的第一步是理解其非同步操思想。
非同步操作思想
使用者發起非同步事件,asio將這些非同步事件投遞到一個佇列中,使用者發起的操作就返回了,io_service::run會處理非同步事件佇列中的所有的非同步事件,它會將這些事件交給作業系統處理,作業系統處理完成之後會丟到asio的事件完成的佇列中,io_service發現有完成佇列中有完成事件了,就會通知使用者處理完成事件。 所以使用者要發起一個非同步操作需要做三件事:
- 呼叫asio非同步操作介面,發起非同步操作;如:async_connect、async_read、async_write,這些非同步介面需要一個回撥函式入參,這個回撥函式在事件完成時,由io_service觸發。
- 呼叫io_service::run處理非同步事件;發起一個非同步操作,必須要保證io_service::run,因為io_service通過一個迴圈去處理這些非同步操作事件的,如果沒有事件就會退出,所以要保證非同步事件發起之後,io_service::run還在執行。要保證一直run的一個簡單辦法就是使用io_service::work,它可以保證io_service一直run。
- 處理非同步操作完成事件;在呼叫非同步介面時會傳入一個回撥函式,這個回撥函式就是處理操作完成事件的,比如讀完成了,使用者需要對這些資料進行業務邏輯的處理。
下圖描述了一個非同步操作的過程:
asio的的核心是io_service, 理解了asio非同步介面的機制就容易找出使用asio過程中出現的問題了,在這裡把一些常見的問題列出來,並分析原因和提出解決方法。
- 問題1:為什麼我發起了非同步操作,如連線或者寫,對方都沒有反應,好像沒有收到連線請求或者沒有收到資料? 答案:一個很可能的原因是io_service在非同步操作發起之後沒有run,解決辦法是保持io_service的run。
- 問題2:為什麼傳送資料會報錯? 答案:一個可能的原因是傳送的資料失效了,非同步傳送要求傳送的資料在回撥完成之前都有效,非同步操作只是將非同步事件控制代碼投遞到io_service佇列中就返回了,並不是阻塞的,不注意這一點,如果是臨時變數的資料,除了作用域就失效了,導致非同步事件還沒完成時資料就失效了。解決辦法,保證傳送資料在事件完成之前一直有效。
- 問題3:為什麼監聽socket時,會報“函式不正確”的異常? 答案:因為監聽時,也要保證這個socket一直有效,如果是一個臨時變數socket,在呼叫非同步監聽後超出作用域就失效了,解決辦法,將監聽的socket儲存起來,使它的生命週期和acceptor一樣長。
- 問題4:為什麼連續呼叫非同步操作時會報錯? 答案:因為非同步操作必須保證當前非同步操作完成之後再發起下一次非同步操作。解決辦法:在非同步完成事件處理完成之後再發起新的非同步操作即可。
- 問題5:為什麼對方半天收不到資料,過了半天才一下子收到之前傳送的資料? 答案:因為socket是流資料,一次傳送多少資料不是外界能控制的,這也是所謂的粘包問題。解決辦法,可以在接收時指定至少收多少的條件,或者做tcp分包處理。
說了這麼多,還是來看看例子吧,一個簡單的通訊程式:服務端監聽某個埠,允許多個客戶端連線上來,伺服器將客戶端發來的資料打印出來。 先看看服務端的需求,需求很簡單,第一,要求能接收多個客戶端;第二,要求把收到的資料打印出來。
要求能接收多個客戶端是第一個要解決的問題,非同步接收需要用到acceptor::async_accept,它接收一個socket和一個完成事件的回撥函式。前面的問題3中提到監聽的這個socket不能是臨時變數,我們要把它儲存起來,最好是統一管理起來。可以考慮用一個map去管理它們,每次一個新連線過來時,伺服器自動分配一個連線號給這個連線,以方便管理。然而,socket是不允許拷貝的,所以不能直接將socket放入容器中,還需要外面包裝一層才可以。
第二個問題是列印來自客戶端的資料,既然要列印就需要非同步讀資料了。非同步讀是有socket完成,這個socket還要完成讀寫功能,為了簡化使用者操作,我將socket封裝到一個讀寫事件處理器中,這個事件處理器只具備具備讀和寫的功能。伺服器每次監聽的時候我都會建立一個新的事件處理器並放到一個map中,客戶端成功連線後就由這個事件處理器去處理各種讀寫事件了。 根據問題1,非同步讀寫時要保證資料的有效性,這裡我將一個固定大小的緩衝區作為讀緩衝區。為了簡單起見我使用同步傳送,非同步接收。
服務端程式碼:
#ifndef MESSAGE_H
#define MESSAGE_H
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
class Message
{
public:
enum { header_length = 4 };
enum { max_body_length = 512 };
Message()
: body_length_(0)
{
}
const char* data() const
{
return data_;
}
char* data()
{
return data_;
}
size_t length() const
{
return header_length + body_length_;
}
const char* body() const
{
return data_ + header_length;
}
char* body()
{
return data_ + header_length;
}
size_t body_length() const
{
return body_length_;
}
void body_length(size_t new_length)
{
body_length_ = new_length;
if (body_length_ > max_body_length)
body_length_ = max_body_length;
}
bool decode_header()
{
char header[header_length + 1] = "";
strncat(header, data_, header_length);
body_length_ = atoi(header);
if (body_length_ > max_body_length)
{
body_length_ = 0;
return false;
}
return true;
}
void encode_header()
{
char header[header_length + 1] = "";
sprintf(header, "%4d", body_length_);
memcpy(data_, header, header_length);
}
private:
char data_[header_length + max_body_length];
size_t body_length_;
};
#endif
#ifndef RW_HANDLER_H
#define RW_HANDLER_H
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost::asio::ip;
using namespace boost::asio;
using namespace std;
const int MAX_IP_PACK_SIZE = 65536;
const int HEAD_LEN = 4;
class RWHandler
{
public:
RWHandler(io_service& ios) : m_sock(ios)
{
}
~RWHandler()
{
}
void HandleRead()
{
//三種情況下會返回:1.緩衝區滿;2.transfer_at_least為真(收到特定數量位元組即返回);3.有錯誤發生
async_read(m_sock, buffer(m_buff), transfer_at_least(HEAD_LEN), [this](const boost::system::error_code& ec, size_t size)
{
if (ec != nullptr)
{
HandleError(ec);
return;
}
cout << m_buff.data() + HEAD_LEN << endl;
HandleRead();
});
}
void HandleWrite(char* data, int len)
{
boost::system::error_code ec;
write(m_sock, buffer(data, len), ec);
if (ec != nullptr)
HandleError(ec);
}
tcp::socket& GetSocket()
{
return m_sock;
}
void CloseSocket()
{
boost::system::error_code ec;
m_sock.shutdown(tcp::socket::shutdown_send, ec);
m_sock.close(ec);
}
void SetConnId(int connId)
{
m_connId = connId;
}
int GetConnId() const
{
return m_connId;
}
template<typename F>
void SetCallBackError(F f)
{
m_callbackError = f;
}
private:
void HandleError(const boost::system::error_code& ec)
{
CloseSocket();
cout << ec.message() << endl;
if (m_callbackError)
m_callbackError(m_connId);
}
private:
tcp::socket m_sock;
std::array<char, MAX_IP_PACK_SIZE> m_buff;
int m_connId;
std::function<void(int)> m_callbackError;
};
#endif
#ifndef SERVER_H
#define SERVER_H
#include "Message.h"
#include "RWHandler.h"
#include <boost/asio/buffer.hpp>
#include <boost/unordered_map.hpp>
const int MaxConnectionNum = 65536;
const int MaxRecvSize = 65536;
class Server
{
public:
Server(io_service& ios, short port) : m_ios(ios), m_acceptor(ios, tcp::endpoint(tcp::v4(), port)), m_cnnIdPool(MaxConnectionNum)
{
int current = 0;
std::generate_n(m_cnnIdPool.begin(), MaxConnectionNum, [¤t]{return ++current; });
}
~Server()
{
}
void Accept()
{
cout << "Start Listening " << endl;
std::shared_ptr<RWHandler> handler = CreateHandler();
m_acceptor.async_accept(handler->GetSocket(), [this, handler](const boost::system::error_code& error)
{
if (error)
{
cout << error.value() << " " << error.message() << endl;
HandleAcpError(handler, error);
}
m_handlers.insert(std::make_pair(handler->GetConnId(), handler));
cout << "current connect count: " << m_handlers.size() << endl;
handler->HandleRead();
Accept();
});
}
private:
void HandleAcpError(std::shared_ptr <RWHandler> eventHanlder, const boost::system::error_code& error)
{
cout << "Error,error reason:" << error.value() << error.message() << endl;
//關閉socket,移除讀事件處理器
eventHanlder->CloseSocket();
StopAccept();
}
void StopAccept()
{
boost::system::error_code ec;
m_acceptor.cancel(ec);
m_acceptor.close(ec);
m_ios.stop();
}
std::shared_ptr<RWHandler> CreateHandler()
{
int connId = m_cnnIdPool.front();
m_cnnIdPool.pop_front();
std::shared_ptr<RWHandler> handler = std::make_shared<RWHandler>(m_ios);
handler->SetConnId(connId);
handler->SetCallBackError([this](int connId)
{
RecyclConnid(connId);
});
return handler;
}
void RecyclConnid(int connId)
{
auto it = m_handlers.find(connId);
if (it != m_handlers.end())
m_handlers.erase(it);
cout << "current connect count: " << m_handlers.size() << endl;
m_cnnIdPool.push_back(connId);
}
private:
io_service& m_ios;
tcp::acceptor m_acceptor;
boost::unordered_map<int, std::shared_ptr<RWHandler>> m_handlers;
list<int> m_cnnIdPool;
};
#endif
//#include "Message.h"
//#include "RWHandler.h"
#include "Server.h"
int main(int argc, char* argv[])
{
boost::asio::io_service ios;
//boost::asio::io_service::work work(ios);
//std::thread thd([&ios]{ios.run(); });
Server server(ios, 8100);
server.Accept();
ios.run();
//thd.join();
return 0;
}