1. 程式人生 > >boost庫之tcp例項(非同步方式)

boost庫之tcp例項(非同步方式)

//服務端

#define	TCP_RECV_DATA_PACKAGE_MAX_LENGTH			2048
#define	TCP_SEND_DATA_PACKAGE_MAX_LENGTH			2048

#include <iostream>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/shared_ptr.hpp>

using namespace boost::asio;

std::string make_daytime_string()
{
	using namespace std;
	time_t now = std::time(NULL);
	return ctime(&now);
}

//發生資料回撥函式
typedef void (CALLBACK *SendDataCallback)(const boost::system::error_code& error,std::size_t bytes_transferred,DWORD dwUserData1,DWORD dwUserData2);

//接收資料回撥函式
typedef void (CALLBACK *RecvDataCallback)(const boost::system::error_code& error,char *pData,int nDataSize,DWORD dwUserData1,DWORD dwUserData2);

//tcp connection
class TcpConnection
{
public:
	static TcpConnection * create(io_service& ioService)
	{
		return new TcpConnection(ioService);
	}

	virtual ~TcpConnection()
	{
		m_bDisconnect = true;
		m_socket.close();
	}

	ip::tcp::socket& socket()
	{
		return m_socket;
	}

	//傳送資料
	int sendData(char *pData,int nDataSize,SendDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2)
	{
		if(fnCallback == NULL)
		{
			//同步
			if(!m_socket.is_open())
			{
				return 0;
			}
			std::size_t  nSendedSize = boost::asio::write(m_socket,boost::asio::buffer(pData,nDataSize));
			if(nDataSize == nSendedSize)
			{
				return 0;
			}
			else
			{
				return nSendedSize;
			}
		}
		else
		{
			//非同步
			if(!m_socket.is_open())
			{
				return 0;
			}
			memcpy(m_sendBuf.data(),pData,nDataSize);
			boost::asio::async_write(
				m_socket, 
				boost::asio::buffer(m_sendBuf.data(),nDataSize), 
				boost::bind(&TcpConnection::handle_write, this,
				boost::asio::placeholders::error, 
				boost::asio::placeholders::bytes_transferred,
				fnCallback,dwUserData1,dwUserData2));
		}
	
		return 0;
	}
	//接收資料(同步)
	int	recvDataBySync()
	{
		if(!m_socket.is_open())
		{
			return 0;
		}
		boost::system::error_code error;
		std::size_t nSize = m_socket.read_some(boost::asio::buffer(m_recvBuf),error);
		if(error != NULL)
		{
			//錯誤
			return 0;
		}

		return nSize;
	}
	//接收資料(非同步)
	int	recvDataByAsync(RecvDataCallback  fnCallback,DWORD dwUserData1,DWORD dwUserData2)
	{
		m_socket.async_read_some(boost::asio::buffer(m_recvBuf),
			boost::bind(&TcpConnection::handle_read, this, 
			boost::asio::placeholders::error, 
			boost::asio::placeholders::bytes_transferred,
			fnCallback,dwUserData1,dwUserData2));

		return 0;
	}

private:
	TcpConnection(io_service& ioService)
		: m_socket(ioService)
	{
		m_bDisconnect = false;
	}

	void handle_write(const boost::system::error_code& error,size_t bytes_transferred,SendDataCallback fnCallback,DWORD dwUserData1,DWORD dwUserData2)
	{
		if(fnCallback != NULL)
		{
			fnCallback(error,bytes_transferred,dwUserData1,dwUserData2);
		}
		if(error != NULL)
		{
			m_bDisconnect = true;
			if(m_socket.is_open())
			{
				m_socket.close();
			}
			if(!m_bDisconnect)
			{
				printf("close connect \n");
			}

			//傳送資料失敗
			return;
		}

		printf("write data!!!");
	}

	void handle_read(const boost::system::error_code& error,size_t bytes_transferred,RecvDataCallback  fnCallback,DWORD dwUserData1,DWORD dwUserData2)
	{
		if(fnCallback != NULL)
		{
			fnCallback(error,m_recvBuf.data(),bytes_transferred,dwUserData1,dwUserData2);
		}
if(error != NULL)
{
		if (error == boost::asio::error::eof || error == boost::asio::error::connection_reset)
		{
			//boost::asio::error::eof  --對端方關閉連線(正常關閉套接字)
			//boost::asio::error::connection_reset --對端方關閉連線(暴力關閉套接字)
			//對端方關閉連線
			if(m_socket.is_open())
			{
				m_socket.close();
			}
			printf("close connect \n");
                }
else
{
			if(m_socket.is_open())
			{
				m_socket.close();
			}
}
return;
}
		char szMsg[128] = {0};
		memcpy(szMsg,m_recvBuf.data(),bytes_transferred);
		printf("%s \n",szMsg);
	}

	bool	m_bDisconnect;											//是否斷開連線
	ip::tcp::socket m_socket;
	boost::array<char,TCP_RECV_DATA_PACKAGE_MAX_LENGTH> m_recvBuf;	//接收資料緩衝區
	boost::array<char,TCP_SEND_DATA_PACKAGE_MAX_LENGTH> m_sendBuf;	//傳送資料緩衝區
};

class TcpServer
{
public:
	TcpServer(io_service& ioService) : m_acceptor(ioService, ip::tcp::endpoint(ip::tcp::v4(), 10005))
	{
		m_funcConnectionHandler = NULL;
		m_nUserData = 0;
		m_bStop = false;
	}

	//新建連接回調函式
	typedef boost::function<void (TcpConnection * new_connection,int nUserData)>	CreateConnnectionCallbackHandler;

	//設定新建連接回調函式
	void setNewConnectionCallback(CreateConnnectionCallbackHandler fnHandler,int nUserData)
	{
		m_funcConnectionHandler = fnHandler;
		m_nUserData = nUserData;
	}

	//開始工作
	void startWork()
	{
		m_bStop = false;
		start_accept();
	}

	//停止工作
	void stopWork()
	{
		m_bStop = true;
		m_acceptor.close();
	}

private:
	void start_accept()
	{
		if(m_bStop)
		{
			return;
		}

		TcpConnection *new_connection = TcpConnection::create(m_acceptor.get_io_service());

		m_acceptor.async_accept(
			new_connection->socket(), 
			boost::bind(&TcpServer::handle_accept, 
			this, 
			new_connection, 
			boost::asio::placeholders::error));
	}

	void handle_accept(TcpConnection * new_connection,
		const boost::system::error_code& error)
	{
		if (!error)
		{
			if(m_funcConnectionHandler != NULL)
			{
				m_funcConnectionHandler(new_connection,m_nUserData);
			}

			new_connection->sendData("abcdefg",strlen("abcdefg"),NULL,0,0);
			start_accept();
		}
	}

	ip::tcp::acceptor m_acceptor;
	CreateConnnectionCallbackHandler m_funcConnectionHandler;
	int				  m_nUserData;
	bool			  m_bStop;
};

//所有連線
std::list<TcpConnection *> g_listConnection;
//無效連線
std::list<TcpConnection *> g_listNoEffectConnection;

//插入無效連線
void InsertNoEffectConnection(TcpConnection * pConnnection)
{
	bool bFind = false;
	TcpConnection * pTcpConnectionTmpObject = NULL;
	std::list<TcpConnection *>::iterator iter,iterEnd;
	iter = g_listNoEffectConnection.begin();
	iterEnd = g_listNoEffectConnection.end();
	for(iter; iter!=iterEnd; iter++)
	{
		pTcpConnectionTmpObject = *iter;
		if(pTcpConnectionTmpObject == pConnnection)
		{
			bFind = true;
			break;
		}
	}

	//未找到,插入
	if(!bFind)
	{
		g_listNoEffectConnection.push_back(pTcpConnectionTmpObject);
	}
}

//刪除無效連線
void DeleteNoEffectConnection()
{
	std::list<TcpConnection *>::iterator iter;
	if(g_listNoEffectConnection.size() > 0)
	{
		TcpConnection * pTcpConnectionTmpObject = NULL;
		iter = g_listNoEffectConnection.begin();
		while(iter != g_listNoEffectConnection.end())
		{
			pTcpConnectionTmpObject = *iter;
			if(pTcpConnectionTmpObject != NULL)
			{
				g_listConnection.remove(pTcpConnectionTmpObject);
				delete pTcpConnectionTmpObject;
			}
			iter++;
		}

		g_listNoEffectConnection.clear();
	}
}

//新連接回調處理
void NewConnectionCallbackProcess(TcpConnection * new_connection,int nUserData)
{
	g_listConnection.push_back(new_connection);
}

void WINAPI RecvDataCallbackProcess(const boost::system::error_code& error,char *pData,int nDataSize,DWORD dwUserData1,DWORD dwUserData2)
{
	if (error == boost::asio::error::eof)
	{
		//對端方關閉連線
		TcpConnection * pTcpConnectionTmpObject = (TcpConnection *)dwUserData1;
		if(pTcpConnectionTmpObject != NULL)
		{
			InsertNoEffectConnection(pTcpConnectionTmpObject);
		}
		return;
	}
}




int main(int argc, char* argv[])
{
	try
	{
		io_service ioService;
		TcpServer server(ioService);
		server.setNewConnectionCallback(NewConnectionCallbackProcess,0);
		server.startWork();

		TcpConnection * pTcpConnectionObject = NULL;
		std::list<TcpConnection *>::iterator iter,iterEnd;
		
		int n = 0;
		while(true)
		{
			//刪除無效連線
			DeleteNoEffectConnection();

			//遍歷
			iter = g_listConnection.begin();
			iterEnd = g_listConnection.end();
			for(iter; iter!=iterEnd; iter++)
			{
				pTcpConnectionObject = *iter;
				pTcpConnectionObject->sendData("111",3,NULL,0,0);
				pTcpConnectionObject->recvDataByAsync(RecvDataCallbackProcess,(int)pTcpConnectionObject,0);
			}

			ioService.poll();
			Sleep(200);
			n++;
			if(n > 1000)
			{
				break;
			}
		}
		
		// 只有io_service類的run()方法執行之後回撥物件才會被呼叫
		//ioService.run();
		//釋放空間
		iter = g_listConnection.begin();
		iterEnd = g_listConnection.end();
		for(iter; iter!=iterEnd; iter++)
		{
			pTcpConnectionObject = *iter;
			if(pTcpConnectionObject != NULL)
			{
				delete pTcpConnectionObject;
			}
		}
		g_listConnection.clear();
		server.stopWork();
	}
	catch (std::exception& e)
	{
		std::cout << e.what() << std::endl;
	}

	return 0;
}

//客戶端

#include <iostream>
#include <boost/asio.hpp>
using namespace boost::asio;

int main(int argc, char* argv[])
{
	io_service ioService;
	boost::system::error_code error;
	try
	{
		//獲取ip(用直譯器的方法來解析域名)
		/*
		ip::tcp::resolver resolver(ioService);
		ip::tcp::resolver::query query("www.yahoo.com", "80");
		ip::tcp::resolver::iterator iter = resolver.resolve( query);
		ip::tcp::endpoint ep = *iter;
		std::cout << ep.address().to_string() << std::endl;
		*/

		ip::tcp::socket socket(ioService);
		ip::tcp::endpoint endpoint(boost::asio::ip::address_v4::from_string("127.0.0.1"), 10005);
		socket.connect(endpoint, error);

		//是否出錯
		if (error)
		{
			throw boost::system::system_error(error);
		}

		while(true)
		{
			boost::array<char,128> buf;
			size_t len = socket.read_some(boost::asio::buffer(buf), error);
			//服務端執行斷開.
                        if(error != NULL)
{
			if (error == boost::asio::error::eof) 
			{
				break; // 對端方關閉連線(正常關閉套接字)
			}
			else if (error == boost::asio::error::connection_reset)
			{
break;//對端方關閉連線(暴力關閉套接字)
							
}
                        else
{
throw boost::system::system_error(error); // Some other error.

}
}

			char szMsg[128] = {0};
			memcpy(szMsg,buf.data(),len);
			printf("%s\n",szMsg);

			//傳送資料
			socket.write_some(boost::asio::buffer(buf, len), error);
		}
	}
	catch (std::exception& e)
	{
		printf(e.what());
	}

	return 0;
}