1. 程式人生 > >Boost.Asio C++ 網路程式設計之五:TCP回顯客戶端/服務端

Boost.Asio C++ 網路程式設計之五:TCP回顯客戶端/服務端

       回顯就是服務端將接收到的任何內容回發給客戶端顯示,然後關閉客戶端的連線。這個服務端可以處理任何數量的客戶端。每個客戶端連線之後傳送一個訊息,服務端接收到訊息後把它傳送回去。在那之後,服務端關閉連線。具體流程如下圖所示。


       對於TCP而言,我們需要一個額外的保證:每一個訊息以換行符結束(‘\n’)。編寫一個同步回顯服務端/客戶端非常簡單。下面我們分別實現同步客戶端,同步服務端,非同步客戶端和非同步服務端。

一.TCP同步客戶端

#ifdef WIN32
#define _WIN32_WINNT 0x0501
#include <stdio.h>
#endif

#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost::asio;
using boost::system::error_code;
io_service service;

size_t read_complete(char * buf, const error_code & err, size_t bytes) {
	if (err) return 0;
	bool found = std::find(buf, buf + bytes, '\n') < buf + bytes;
	// 一個一個字元的讀取,直到回車, 不快取
	return found ? 0 : 1;
}

ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), 8001);
void sync_echo(std::string msg) {
	msg += "\n";
	ip::tcp::socket sock(service);
	sock.connect(ep);
	sock.write_some(buffer(msg));
	char buf[1024];
	int bytes = read(sock, buffer(buf), boost::bind(read_complete, buf, _1, _2));
	std::string copy(buf, bytes - 1);
	msg = msg.substr(0, msg.size() - 1);
	std::cout << "server echoed our " << msg << ": "
		<< (copy == msg ? "OK" : "FAIL") << std::endl;
	sock.close();
}

int main(int argc, char* argv[]) {
	// 連線多個客戶端
	char* messages[] = { "Can", "ge", "ge", "blog!", 0 };
	boost::thread_group threads;
	for (char ** message = messages; *message; ++message) {
		threads.create_thread(boost::bind(sync_echo, *message));
		boost::this_thread::sleep(boost::posix_time::millisec(100));
	}
	threads.join_all();

	system("pause");
}
       你會發現,在讀取時,我使用了自由函式(不屬於socket類,屬於名稱空間asio)read(),因為我想要讀‘\n’之前的所有內容。sock.read_some()方法滿足不了這個要求,因為它只會讀可用的,不一定是整個的訊息。
       read(stream, buffer [, completion])這個方法同步地從一個流中讀取資料。你可以選擇指定一個完成處理方法。完成處理方法會在每次read操作呼叫成功之後呼叫,然後告訴read操作是否完成(如果沒有完成,它會繼續讀取)。它的格式是:size_t completion(const boost::system::error_code& err, size_t bytes_transfered) 。當這個完成處理方法返回0時,我們認為read操作完成;如果它返回一個非0值,它表示了下一次sock.read_some操作需要從流中讀取的位元組數。

       read_complete一個個的讀取字元,直到回車,這是通過std::find方法控制的,std::find的行為大概如下。

template<class InputIterator, class T>
  InputIterator find (InputIterator first, InputIterator last, const T& val)
{
  while (first!=last) {
    if (*first==val) return first;
    ++first;
  }
  return last;
}
       結合到上面客戶端程式碼就是,如果沒有找到回車'\n',std::find始終返回buf+bytes,否則返回'\n'的地址,也就是buf+bytes-1,此時'\n'是已讀取內容的最後一個字元。
       注意:因為我們是同步的,所以不需要呼叫service.run()。

二.TCP同步服務端

#ifdef WIN32
#define _WIN32_WINNT 0x0501
#include <stdio.h>
#endif

#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost::asio;
using namespace boost::posix_time;
using boost::system::error_code;


io_service service;
size_t read_complete(char * buff, const error_code & err, size_t bytes) {
    if ( err) return 0;
    bool found = std::find(buff, buff + bytes, '\n') < buff + bytes;
    // we read one-by-one until we get to enter, no buffering
    return found ? 0 : 1;
}


void handle_connections() {
    ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(),8001));
    char buff[1024];
    while ( true) {
        ip::tcp::socket sock(service);
        acceptor.accept(sock);
        int bytes = read(sock, buffer(buff), 
                    boost::bind(read_complete,buff,_1,_2));
        std::string msg(buff, bytes);
        sock.write_some(buffer(msg));
        sock.close();
    }
}


int main(int argc, char* argv[]) {
    handle_connections();
}
       服務端的邏輯主要在handle_connections()。因為是單執行緒,它接受一個客戶端請求,讀取客戶端傳送的訊息,然後回發給客戶端,接著等待下一個連線。可以確定,當兩個客戶端同時連線時,第二個客戶端需要等待服務端處理完第一個客戶端的請求。

       還是要注意因為我們是同步的,所以不需要呼叫service.run()。

       下面是客戶端回顯的結果,當然要先啟動服務端。


三.TCP非同步客戶端

#ifdef WIN32
#define _WIN32_WINNT 0x0501
#include <stdio.h>
#endif

#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost::asio;
io_service service;

#define MEM_FN(x)       boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y)    boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z)  boost::bind(&self_type::x, shared_from_this(),y,z)

class talk_to_svr : public boost::enable_shared_from_this<talk_to_svr>
	, boost::noncopyable {
	typedef talk_to_svr self_type;
	talk_to_svr(const std::string & message)
		: sock_(service), started_(true), message_(message) {}
	void start(ip::tcp::endpoint ep) {
		sock_.async_connect(ep, MEM_FN1(on_connect, _1));
	}
public:
	typedef boost::system::error_code error_code;
	typedef boost::shared_ptr<talk_to_svr> ptr;

	static ptr start(ip::tcp::endpoint ep, const std::string & message) {
		ptr new_(new talk_to_svr(message));
		new_->start(ep);
		return new_;
	}
	void stop() {
		if (!started_) return;
		started_ = false;
		sock_.close();
	}
	bool started() { return started_; }
private:
	void on_connect(const error_code & err) {
		if (!err)      do_write(message_ + "\n");
		else            stop();
	}
	void on_read(const error_code & err, size_t bytes) {
		if (!err) {
			std::string copy(read_buffer_, bytes - 1);
			std::cout << "server echoed our " << message_ << ": "
				<< (copy == message_ ? "OK" : "FAIL") << std::endl;
		}
		stop();
	}

	void on_write(const error_code & err, size_t bytes) {
		do_read();
	}
	void do_read() {
		async_read(sock_, buffer(read_buffer_),
			MEM_FN2(read_complete, _1, _2), MEM_FN2(on_read, _1, _2));
	}
	void do_write(const std::string & msg) {
		if (!started()) return;
		std::copy(msg.begin(), msg.end(), write_buffer_);
		sock_.async_write_some(buffer(write_buffer_, msg.size()),
			MEM_FN2(on_write, _1, _2));
	}
	size_t read_complete(const boost::system::error_code & err, size_t bytes) {
		if (err) return 0;
		bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes;
		return found ? 0 : 1;
	}

private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	char read_buffer_[max_msg];
	char write_buffer_[max_msg];
	bool started_;
	std::string message_;
};

int main(int argc, char* argv[]) {
	ip::tcp::endpoint ep(ip::address::from_string("127.0.0.1"), 8001);
	char* messages[] = { "Can", "ge", "ge", "blog", 0 };
	for (char ** message = messages; *message; ++message) {
		talk_to_svr::start(ep, *message);
		boost::this_thread::sleep(boost::posix_time::millisec(100));
	}
	service.run();
	system("pause");
}
四.TCP非同步服務端
#ifdef WIN32
#define _WIN32_WINNT 0x0501
#include <stdio.h>
#endif

#include <boost/bind.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
using namespace boost::asio;
using namespace boost::posix_time;
io_service service;

#define MEM_FN(x)       boost::bind(&self_type::x, shared_from_this())
#define MEM_FN1(x,y)    boost::bind(&self_type::x, shared_from_this(),y)
#define MEM_FN2(x,y,z)  boost::bind(&self_type::x, shared_from_this(),y,z)


class talk_to_client : public boost::enable_shared_from_this<talk_to_client>, boost::noncopyable {
	typedef talk_to_client self_type;
	talk_to_client() : sock_(service), started_(false) {}
public:
	typedef boost::system::error_code error_code;
	typedef boost::shared_ptr<talk_to_client> ptr;

	void start() {
		started_ = true;
		do_read();
	}
	static ptr new_() {
		ptr new_(new talk_to_client);
		return new_;
	}
	void stop() {
		if (!started_) return;
		started_ = false;
		sock_.close();
	}
	ip::tcp::socket & sock() { return sock_; }
private:
	void on_read(const error_code & err, size_t bytes) {
		if (!err) {
			std::string msg(read_buffer_, bytes);
			// echo message back, and then stop
			do_write(msg + "\n");
		}
		stop();
	}

	void on_write(const error_code & err, size_t bytes) {
		do_read();
	}
	void do_read() {
		async_read(sock_, buffer(read_buffer_),
			MEM_FN2(read_complete, _1, _2), MEM_FN2(on_read, _1, _2));
	}
	void do_write(const std::string & msg) {
		std::copy(msg.begin(), msg.end(), write_buffer_);
		sock_.async_write_some(buffer(write_buffer_, msg.size()),
			MEM_FN2(on_write, _1, _2));
	}
	size_t read_complete(const boost::system::error_code & err, size_t bytes) {
		if (err) return 0;
		bool found = std::find(read_buffer_, read_buffer_ + bytes, '\n') < read_buffer_ + bytes;
		// we read one-by-one until we get to enter, no buffering
		return found ? 0 : 1;
	}
private:
	ip::tcp::socket sock_;
	enum { max_msg = 1024 };
	char read_buffer_[max_msg];
	char write_buffer_[max_msg];
	bool started_;
};

ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 8001));

void handle_accept(talk_to_client::ptr client, const boost::system::error_code & err) {
	client->start();
	talk_to_client::ptr new_client = talk_to_client::new_();
	acceptor.async_accept(new_client->sock(), boost::bind(handle_accept, new_client, _1));
}


int main(int argc, char* argv[]) {
	talk_to_client::ptr client = talk_to_client::new_();
	acceptor.async_accept(client->sock(), boost::bind(handle_accept, client, _1));
	service.run();
}
      TCP非同步客戶端和非同步服務端的關鍵是enable_shared_from_this模板類的使用,關於enable_shared_from_this詳見:C++11新特性之十一:enable_shared_from_this,C++11和boost的enable_shared_from_this功能和原理一樣。

       客戶端回顯結果和同步時的一樣,如下: