1. 程式人生 > >Boost.Asio C++ Network Programming(Chapter 2)

Boost.Asio C++ Network Programming(Chapter 2)

第二章Boost.Asio基礎

在這一章節中,需要確切知道什麼時候使用Boost.Asio,並且深入的研究非同步程式設計,這會比同步程式設計更加有趣。

The Network API

本節講解在使用Boost.Asio寫一個網路應用程式時需要知道哪些內容。

Boost.Asio namespaces

Boost.Asio下的所有都在boost::asio名稱空間下,或者是子名稱空間下:

boost:asio:包含核心類和函式。重要的類io_servicestreambuf。我們也有任意的函式

(free functions),如read,read_at,read_until,他們的非同步同伴,和

writewrite的非同步同伴函式。

boost::asio::ip:包含網路部分。包含重要的類如:addressendpointtcpudpicmp還有任意函式(free functionsconnectasync_connect。注意在boost::asio::ip::tcp::socket名稱,socket只是typedefboost::asio::ip::tcp下的一個關鍵字。

boost::asio::error:此名稱空間包含在呼叫I/O時的一些錯誤程式碼。

boost::asio::ssl:此名稱空間包含SSL處理相關類。

boost::asio::local:此名稱空間包含了

POSIX具體類。

boost::asio::windows:此名稱空間包含了windows具體類。

IP Addresses

處理IP地址,Boost.Asio提供了ip::addressip::address_v4ip::address_v6類。

它提供了相當多重要的功能:

ip::address(v4_or_v6_address):轉換ip_v4ip_v6地址到ip::address

ip::address::from_string(str):IPv4地址(點分隔)或者IPv6地址(十六進位制計數法)建立一個地址。

ip::address::to_string():返回一個友好表示

IP地址。

ip::address_v4::broadcast([addr,mask]):這個函式建立一個廣播地址。

ip::address_v4::any():返回一個任意地址。

ip::address_v4::loopback(),ip_address_v6::lookback():返回環回地址。

ip::host_name():返回當前主機名稱字串。

大部分時候這麼使用ip::address_from_string:

ip::address addr=ip::address::from_string(“127.0.0.1”);

如果需要連線一個主機,下面的程式碼片段是無效的:

// 丟擲異常

ip::address addr=ip::address::from_string(“www.yahoo.com”);

Endpoints

Endpoint是一個地址和埠。每一個不同的socket類都有自己的端點類:如

ip::tcp::endpoint,ip::udp::endpoint,ip::ecmp::endpoint

如果你想連線本機80埠:

ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);

可以通過三種方式構造endpoint

endpoint():預設建構函式有時可用於UDP/ICMP sockets

endpoint(protocol,port):用於接受連線的伺服器端socket

endpoint(addr,port):通過地址和埠構造一個endpoint

例如:

ip::tcp::endpoint ep1;

ip::tcp::endpoint ep2(ip::tcp::v4(),80);

ip::tcp::endpoint ep3(ip::address::from_string(“127.0.0.1”),80);

如果你想連線一個主機(沒有IP地址):

//輸出87.248.122.122

io_service service;

ip::tcp::resolver resolver(service);

ip::tcp::resolver::query query(“www.yahoo.com”,80);

ip::tcp::resolver::iterator iter = resolver.resolve(query);

ip::tcp::endpoint ep=*iter;

std::cout << ep.address().to_string() << std::endl;

你需要更換tcp套接字型別。首先,建立一個需要查詢的名稱,然後使用

resolver()函式。如果成功,至少返回一條記錄。給定迭代器,可以只使用第一個或者遍歷列表。

給定一個endpoint,你可以獲取地址,埠,和ip協議(v4或者v6):

std::cout << ep.address().tostring() << “:”<<ep.port()

<< “/”<< ep.protocol()<<std::endl;

Sockets

Boost.Asio有三種類型的套接字類:ip::tcpip::udpip::icmp,當然也是可以擴充套件的。你可以建立自己的套接字,是複雜的。如果選擇這樣做,看看

boost/asio/ip/tcp.hpp,boost/asio/ip/udp.hpp,boost/asio/ip/icmp.hpp,他們都是內部

typedef關鍵字很小的類。

你會想到ip::tcpip::udpip::icmp類佔位符,他們提供了便捷訪問類和方法:

ip::tcp::socket,ip::tcp::acceptor,ip::tcp::endpoint,ip::tcp::resolver,ip::tcp::iostream

ip::udp::socket,ip::udp::endpoint,ip::udp::resolver

ip::icmp::socket,ip::icmp::endpoint,ip::icmp::resolver

套接字類建立一個響應的套接字,你給定io_service給建構函式:

io_service service;

ip::udp::socket sock(service);

sock.set_option(ip::udp::socket::reuse_address(true));

每個套接字名稱都是一個typedef關鍵字:

ip::tcp::socket = basic_stream_socket<tcp>

ip::udp::socket=basic_stream_socket<udp>

ip::icmp::socket=basic_raw_socket<icmp>

同步錯誤程式碼

所有的同步函式都有一個過載,丟擲異常或者返回錯誤程式碼。如下程式碼片段:

sync_func(arg1,arg2,...,argN);// throws

boost::asio::error_code ec;

sync_func(arg1,arg2,...,argN,ec);//返回錯誤程式碼

在本章的剩餘部分,你會看到更多的同步功能。為了保持簡單,省略了返回錯誤程式碼,但是他們是存在的。

套接字成員函式

函式被分成幾組。不是每個套接字都可以使用所有的函式。本節的最後會顯示函式屬於哪些類。

注意所有的非同步函式理解返回,同步函式只有在完成操作後才返回。

連線相關函式

這些函式是連線或者繫結套接字,斷開它,或者查詢連線是否有效或者無效:

assign(protocl,socket):指定一個原始(本地)套接字到一個套接字例項。用它處理遺留程式碼(本地套接字已經建立)。

open(protocol):開啟一個給定協議(v4或者v6)的套接字。主要用於UDP/ICMP套接字,或者服務端套接字。

bind(endpoint):繫結一個地址。

connect(endpoint):同步連線到給定的地址。

async_connect(endpoint):非同步連線到給定的地址。

io_open():如果套接字已經開啟返回true

close():關閉套接字。所有的非同步操作被取消,並且返回

error::operation_aborted錯誤程式碼。

shutdown(type_of_shutdown):從現在開始禁止傳送操作,接收操作,或者兩個同時禁止。

cancel():在當前套接字上取消所有的非同步操作。這個套接字的所有非同步操作都完成並且返回error::operation_aborted錯誤程式碼。

例項如下:

ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);

ip::tcp::socket sock(service);

sock.open(ip::tcp::v4());

socket.connect(ep);

sock.write_some(buffer(“GET /index.html\r\n”));

char buff[1024];

sock.read_some(buffer(buff,1024));

sock.shutdown(ip::tcp::socket::shutdown_receive);

sock.close();

讀寫函式

在套接字上執行輸入輸出功能。

非同步操作函式的,處理者的簽名是一樣的:

void handler(const boost::system::error_code &e,size_t bytes)

async_receive(buffer,[flags,]handler):在一個套接字上啟動非同步接受操作。

async_read_some(buffer,handler):async_receive是等價的。

async_receive_from(buffer,endpoint,[,flags],handler):在一個指定的端點上啟動一個非同步的接受操作。

async_send(buffer[,flags],handler):啟動一個非同步傳送資料操作。

async_write_come(buffer,handler):async_send是等價的。

async_send_to(buffer,endpoint,handler):在一個指定的端點上啟動傳送資料的非同步操作。

receive(buffer[,flags]):同步接受資料到給定的緩衝區。阻塞直到接收到資料,或者發生錯誤。

read_some(buffer):等價於receive

receive_from(buffer,endpoint[,flags]):在指定的端點,接受資料到給定的緩衝區,函式阻塞直到接收到資料,或者出現錯誤。

send(buffer[,flags]):同步傳送緩衝區資料。函式阻塞直到傳送成功,或者出現錯誤。

write_some(buffer):等價於send

send_to(buffer,endpoint[,flags]):同步傳送緩衝區資料到給定的端點。函式阻塞直到傳送成功或者出現錯誤。 

available():返回可以不阻塞的同步讀取多個位元組。

討論緩衝區。檢驗flagsflags預設值為0,也可以是一個組合:

ip::socket_type::socket::message_peek:這個標記僅僅檢視訊息。它返回一個訊息,但是下次讀取是將重讀這個訊息。

ip::socket_type::socket::message_out_of_band:這個標記處理out-of-band(OOB)資料。OOB資料是比普通資料更重要的資料。關於OOB的討論超出了本書的範圍。

ip::socket_type::socket::message_do_not_route:這個標記指定訊息不使用路由表傳送。

ip::socket_type:socket::message_end_of_record:這個標記指定資料記錄的結束標記。這個不能在windows下指定。

使用message_peek,如下面程式碼片段所示:

char buff[1024];

sock.receive(buffer(buff),ip::tcp::socket::message_peek);

memset(buff,1024,0);

// 重讀上一次讀取的

sock.receive(buffer(buff));

以下例子,引導讀者使用同步和非同步不同的套接字:

1.例子1同步讀寫一個tcp套接字。

ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);

ip::tcp::socket sock(service);

sock.connect(ep);

sock.write_some(buffer(“GET /index/html\r\n”));

std::cout<<”bytes available” << sock.available()<<std::endl;

char buff[512];

size_t read=sock.read_some(buffer(buff));

2.同步讀寫一個udp套接字。

ip::udp::socket sock(service);

sock.oopen(ip::udp::v4);

ip::udp::endpoint receiver_ep(“87.248.112.181”,80);

sock.send_to(buffer(“testing\n”),receive_ep);

char buff[512];

ip::udp::endpoint sender_ep;

sock.receive_from(buffer(buff),sender_ep);

注意:在上面的程式碼片段中,從UDP套接字接受資料使用的是receive_from,需要一個預設建構函式的endpoint

3.非同步讀寫一個udp套接字

using namespace boost::asio;

io_service service;

ip::udp::socket sock(service);

boost::asio::ip::udp::endpoint sender_ep;

char buff[512];

void on_read(const boost::sytem::error_code &err,std::size_t read_bytes){

std::cout << “read” << read_bytes << std::endl;

Sock.async_recieve_from(buffer(buff),sender_ep,on_read);

}

int main(int argc,char* argv[]){

ip::udp::endpoint ep(ip::address::from_string(“127.0.0.1”,8001);

sock.open(ep.protocol());

sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));

sock.bind(ep);

sock.async_receive_from(buffer(buff,512),sender_ep,on_read);

serivce.run();

}

套接字控制

這些函式處理套接字高階選項

get_io_service():返回建構函式給定的io_servcie例項。

get_option(option):返回套接字選項。

set_option(option):設定套接字選項。

io_control(cmd):在套接字上執行I/O命令。

下面是可以設定或者獲取的套接字選項:

名稱

描述

類別

broadcast 

If true, it allows broadcasting messages 

bool 

debug 

If true, it enables socket-level debugging 

bool 

do_not_route 

If true, it prevents routing and use local interfaces only 

bool 

enable_ connection_ aborted 

If true, it reports connections that were aborted on accept() 

bool 

keep_alive 

If true, it sends keep-alives 

bool 

linger 

If true, socket lingers on close() if there's unsent data 

bool 

receive_buffer_ size 

This is a received buffer size for a socket 

int 

receive_low_ watemark 

This provides a minimum number of bytes to process for socket input 

int 

reuse_address 

If true, socket can be bound to an address already in use 

bool 

send_buffer_ size 

This sends buffer size for a socket 

int 

send_low_ watermark 

This provides a minimum number of bytes to send for socket output 

int 

ip::v6_only 

If true, it allows only IPv6 communication 

bool 

每個名稱代表一個套接字typedef型別 或者一個類,使用如下程式碼所示:

ip:tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);

ip::tcp::socket sock(service);

sock.connet(ep);

// TCP套接字可複用地址

ip::tcp::socket::reuse_address ra(true);

sock.set_option(ra);

// 獲取套接字接受緩衝區大小

ip::tcp::socket::receive_buffer_size rbs;

sock.get_option(rbs);

std::cout << rbs.value() << std::endl;

// 設定接受緩衝區大小為8192

ip::tcp::socket::send_buffer_size sbs(8192);

sock.set_option(sbs);

注意:使用以上功能時,需要開啟套接字,否則會丟擲異常。

TCP 和 UDP ICMP

在前面說過,不是所有的套接字可以使用所有的函式。做了一個列表展示不同函式。一些函式沒在這裡出現,意味著可以應用於所有的套接字。

Name 

TCP 

UDP 

ICMP 

async_read_some 

Yes 

async_receive_from 

Yes 

Yes 

async_write_some 

Yes 

async_send_to 

Yes 

Yes 

read_some 

Yes 

receive_from 

Yes 

Yes 

write_some 

Yes 

send_to 

Yes 

Yes 

其它雜項功能

其它無關輸入輸出的功能函式如下:

local_endpoint():返回套接字的本地連線地址。

remote_endpoint():返回需要連線的遠端地址。

native_handle():返回原始套接字控制代碼。你只想在不使用boost.asio的情況下使用原始套接字。

no_blocking():如果套接字是非阻塞的返回true,否則返回false

native_no_blocking():如果套接字是非阻塞返回true,否則返回false。它是在本地呼叫原始套接字。通常不需要這個(no_blocking已經快取了結果);如果使用native_hander使用他。

at_mark():如果套接字是讀OOB資料返回true。很少需要這個。

其它

最後一點,套接字例項是無法複製的,複製建構函式和賦值過載(operator=)是無法呼叫的。

ip::tcp::socket s1(service),s2(service);

s1=s2;//編譯期錯誤

ip::tcp::socket s3(s1);// 編譯器錯誤

這是有道理的,每個例項擁有和管理資源(原始套接字本身)。如果允許複製構造,我們會結束兩個擁有相同原始套接字的例項,他們需要某種管理機制(一個例項擁有所有權,或者引用計數,或者其它方法)。Boost.Aso選擇不允許複製(如果你想使用副本,只能使用智慧指標)。

typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;

socket_ptr sock1(new ip::tcp::socket(servcie));

socket_ptr sock2(sock1);

socket_ptr sock3;

sock3 = sock1;

套接字緩衝區

當從一個套接字讀取或者向一個套接字寫入資料時,需要一個緩衝區,存放輸入輸出資料。這個緩衝區儲存器必須脫離I/O操作;你要確保在I/O操作期間或者超出作用域不會被釋放。

在同步操作時很方便,當然傳送和接受結束後,buff還是存在。

char buff[512];

...

sock.receive(buffer(buff));

strcpy(buff,”ok\n”);

sock.send(buffer(buff));

如下程式碼片段所示,非同步操作就沒這麼簡單了:

// 錯誤的程式碼

void on_read(const boost::system::error_code &err,std::size_t read_bytes){

...

}

void func(){

char buff[512];

sock.async_receive(buffer(buff),on_read);

}

呼叫async_receive()函式之後,buff將離開他的作用域,因此他的記憶體空間會被釋放。當我們正在這個套接字上接受資料,我們將複製到一個不屬於我們的內容中;它可以被釋放,或者重新分配給其它程式碼需要的資料,因此,是錯誤的記憶體。

上述問題的幾種解決方案:

使用全域性緩衝區。

創佳一個緩衝區,在操作完成後釋放它。

一個連線物件管理著套接字,和其它額外的資料,如buffer(s)

第一個解決方案是不好的,因為我們直到全域性變數是非常不好的。此外,如果兩個處理器使用相同的緩衝區會發生什麼?

這裡是如何實現第二種解決方案:

void on_read(char *ptr,const boost::system::error_code &err, std::size_t read_bytes){

delete [] ptr;

}

...

char *buff=new char[512];

sock.async_receive(buffer(buf,512),bost::bind(on_read,buff,_1,_2));

或者你想讓緩衝區離開作用域時自動完成操作(銷燬?),使用智慧指標:

struct shared_buffer{

boost::shared_array<char> buff;

int size;

shared_buffer(size_t size):buff(new char[size]),size(size){

}

mutable_buffers_1 asio_buff() const{

return buffer(buff.get(),size);

}

};

// 當前on_read離開作用域時,boost::bind的物件將被釋放,

// 也將釋放shared_buffer

void on_read(shared_buffer,const boost::system::error_code &err,std::size_t read_bytes){}

...

shared_buffer buff(512);

sock.async_receive(buff.asio_buff(),boost::bind(on_read,buff,_1,_2));

shared_buffer用好內部類shared_array<>,一個shared_buffer的例項副本隨著

shared_array<>一直存在,當離開作用域時,shared_array<>將自動銷燬,這正式我們所需要的。

這正是你期待的,當呼叫完成處理程式時,Boost.Asio拷貝一個副本給完成處理程式。這個拷貝是boost::bind的一個函式引數(functor),他內部包含了一個我們給定的shared_buffer例項的拷貝。

第三個選項是使用連線物件管理套接字和相關資料。如緩衝區(buffers),通常是正確的,但是卻很複雜。將在本章最後討論。

緩衝區包裝類(函式)

在你看過的程式碼片段中,每當執行讀/寫操作時都需要一個緩衝區(buffer),呼叫buffer()函式包裝一個緩衝區物件。

char buff[512];

sock.async_receive(buffer(buff),on_read);

這主要包裝了一個任意緩衝區給一個類以便Boost.Asio遍歷緩衝區。可以使用如下的程式碼片段:

sock.async_receive(some_buffer,on_read);

some_buffer需要滿足一定的條件,即(ConstBufferSequence 或者

MutableBufferSequence (可以檢視Boost.Asio的文件)。自己建立滿足這樣條件的類是很複雜的,但是Boost.Asio已經提供了一些滿足這些條件的類。不要直接訪問他們,使用buffer()函式即可。

你可以使用buffer()的任意功能:

A char[] const array 

A void* pointer and size in characters 

An std::string string 

An POD[] const array (POD stands for plain old data, meaning, constructor and destructor do nothing) 

An std::vector array of any POD 

A boost::array array of any POD 

An std::array array of any POD 

如下程式碼所示:

struct pod_sample{int i;long l;char c;}

...

char b1[512];

void *b2=new char[512];

std::string b3;b3.resize(128);

pod_sample b4[16];

std::vector<pod_sample> b5;b5.resize(16);

boost::array<pod_sample,16> b6;

std::array<pod_sample,16) b7;

sock.async_send(buffer(b1),on_read);

sock.async_send(buffer(b2,512),on_read);

sock.async_send(buffer(b3),on_read);

sock.async_send(buffer(b4),on_read);

sock.async_send(buffer(b5),on_read);

sock.async_send(buffer(b6),on_read);

sock.async_send(buffer(b7),on_read);

所有的一切,都不不要自己建立類來滿足ConstBufferSequence 或者

MutableBufferSequence,你可以根據需要建立一個類,放入緩衝區,返回mutable_buffers_1例項,這就是前面我們講的shared_buffer類。

讀、寫、連線函式

Boost.Asio提供處理I/O的函式,我們分為4

The connect functions

這個函式連線到一個給定的端點上。

connect(socket,begin[,end][,condition]):函式嘗試同步連線到列表中的每個端點上。begin迭代器是socket_type::resolver::query呼叫的結果(你可能需要再次看看EndPoint章節)。指定的end迭代器是可選項,你可以忽略它。在每個嘗試連線前可以提供一個條件函式。函式簽名Iterator connect_condition(const boost::system::error_code & err, Iterator next);你可以選擇返回不同的迭代器,允許跳過某些端點。

async_connect(socket,begin[,end][,condition],handler):這個函式執行非同步連線,在最後呼叫完成處理程式,處理程式簽名:

void handler(const boost::system::error_code & err, Iterator iterator)。第二個引數傳遞給完成處理程式成功的端點,否則就是end迭代器。

例子如下程式碼所示:

using namespace boost::asio::ip;

tcp::resolver resolver(service);

tcp::resolver::iterator iter=resolver.resolve(tcp::resolver::query(“www.yahoo.com”,”80”));

tcp::socket sock(service);

connect(sock,iter);

一個主機可以解析為多個地址,因此connectasync_connect嘗試連線每個地址,直到有個成功。

The read/write functions

這些函式從一個從流(stream)中讀取,或者向流(stream)中寫入(可以是一個套接字,或者任何有流行為的類)。

async_read(stream,buffer[,completion],handler):非同步從流中讀取。完成後,完成處理程式(handler)被呼叫。完成處理程式函式簽名:

void handler(const boost::system::error_ code & err, size_t bytes)。你可以選擇指定completion函式,completion函式在讀取成功後呼叫,並且告訴Boost.Asio async_read完成了(如果不成功,繼續讀)。他的簽名:

size_t completion(const boost::system::error_code& err, size_t bytes_transfered). completion函式返回0,我們認為讀操作完成;如果返回非0值,表示下次呼叫async_read_some可以讀取的最大位元組數。

async_write(stream,buffer[,completion],handler):異步向流中寫入資料。引數和aysnc_read類似。

read(stream,buffer[,completion]):同步從流中讀取資料。引數和async_read類似。

write(stream,buffer[,completion]):同步向流中寫入資料。引數和async_read類似。

async_read(stream, stream_buffer [, completion], handler) 

async_write(strean, stream_buffer [, completion], handler) 

write(stream, stream_buffer [, completion]) 

read(stream, stream_buffer [, completion]) 

首先,注意套接字例項,第一個引數是流(stream)。包括但是不限於套接字。例如可以不是一個套接字,可以是windows檔案控制代碼。

每個讀/寫操作結束會觸發這些條件的一個或者多個:

提供的緩衝區滿了(讀)或者緩衝區的資料被寫入()

completion函式返回0(如果你提供這樣的函式)。

發生了錯誤。

下面的程式碼非同步從套接字中讀取,直到滿足’\n’:

io_service servcie;

ip::tcp::socket socket(service);

char buff[512];

int offset = 0;

size_t up_to_enter(const boost::system::error_code &,size_t bytes){

for(size_t i=0;i<bytes;i++){

if(buff[i+offset]==’\n’)

return 0;

return 1;

}

}

void on_read(const boost::system::error_code &,size_t){}

...

async_read(sock,buffer(buff),up_to_enter,on_read);

Boost.Asio提供了幾個completion的幫助函式。

transfer_at_least(n)

transfer_exactly(n)

transfer_all()

如下程式碼所示:

char buff[512];

void on_read(const boost::system::error_code&,size_t){}

// 讀取整整32個位元組。

async_read(sock,buffer(buff),transfer_exactly(32),on_read);

最後四個函式不使用常規的buffer,使用stream_buffer函式,Boost.Asio提供的

std::streambuf的繼承類。STL (streams)和流緩衝(stream buffers)是非常靈活的,如下面程式碼所示:

io_service servcie;

void on_read(stream &buf,const boost::system::error_code &,size_t){

std::istream in(&buf);

std::string line;

std::getline(in,line);

std::cout << “first line:”<< line<<std::endl;

}

int main(int argc,char *argv[]){

HANDLE file=::CreateFile(“readme.txt”,GENERIC_READ,0,0,

OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPEND,0);

windows::stream_hander h(service,file);

Streambuf buf;

async_read(h,buf,transfer_exactly(256),

boost::bind(on_read,boost::ref(buf),_1,_2));

service.run();

}

這裡,演示了async_read可以使用windows檔案控制代碼。首先讀取256個位元組資料,然後儲存在緩衝區。當讀操作完成,on_read被呼叫,建立一個std::istream處理緩衝區,讀取第一行,並且輸出到控制檯。

The read_until/async_read_until functions

這些函式在滿足條件之前一直讀:

async_read_until(stream,stream_buffer,delim,handler):開始一個非同步讀取操作。遇到delimeter讀操作將停止。delimeter可以是任意字元,std::string或者boost::regex。完成處理程式標籤:void handler(const boost::system::error_code & err, size_t bytes)

async_read_until(stream,stream_buffer,completion,handler):這個函式和前面一個相同,但是使用completion函式替代delimcompletion函式簽名:

pair<iterator,bool> completion(iterator begin, iterator end)

這裡iterator = is buffers_iterator<streambuf::const_buffers_type>。 你要記住的是,迭代器型別是 random-accessiterator。你可以掃描(beginend)範圍,以確定是否需要停止或者繼續。返回一個pair,第一個引數是函式結束字元一個迭代器(

the first member will be an iterator passed at the end of the last character consumed by the function );第二個引數如果是true讀操作將停止,否則繼續。

read_until(stream,stream_buffer,delim):同步讀取操作。引數和async_read_until類似。

read_until(stream,stream_buffer,comletion):同步讀取操作。引數和

async_read_until類似。

下面的列子將讀取ygie識別符號號:

typedef buffers_iterator<streambuf::const_buffers_type> iterator;

std::pair<iterator,bool> match_punct(iterator begin,iterator end){

while(begin!=end){

if(std::ispunct(*begin))

return std::make_pair(begin,true);

}

return std::make_pair(end,false);

}

void on_read(const boost::system::error_code &,size_t ){}

....

streambuf buf;

async_read_until(sock,buf,match_punct,on_read);

如果向讀取一個空格,修改最後一行:

async_read_until(sock,buff,’ ’,on_read);

The *_at functions

隨機讀寫一個流的操作。從指定偏移位置讀寫。

async_read_at(stream,offset,buffer[,completion],handler):在一個流的指定偏移位置開始一個非同步的讀操作。當操作完成時,會呼叫完成處理程式(handler)。完成處理程式(handler)函式標籤:void handler(const boost::system::error_code& err, size_t bytes) 。通常使用buffer()包裝函式或者streambuf 函式。如果提供了completion函式,每次讀取成功都會呼叫,並且告訴Boost.Asio async_read_at操作完成(如果不是,繼續讀)。completion函式簽名:

size_t completion(const boost::system::error_code& err, size_t bytes)。如果

completion函式返回0,我們認為讀操作完成;如果返回非0值,表示下一次呼叫

async_read_some_at的最大可以讀取的位元組數。

async_write_at(stream,offset,buffer[,completion],handler):啟動一個非同步寫入操作。引數和async_read_at類似。

read_at(stream,offset,buffer[,completion]):一個給定的流上,從偏移位置開始讀取。引數和async_read_at類似。

write_at(stream,offset,buffer[,completion]):在一個給定的流上,從偏移位置開始寫入。引數和async_read_at類似。

這些函式不適用與套接字。他們處理隨機讀取流;換句話說,流可以隨機訪問。套接字顯然不是這樣的(套接字是隻能向前的(forward-only))。

從檔案中的256偏移位置讀取128個位元組:

io_service service;

int main(int argc,char*argv){

HANDLE file = ::CreateFile(“readme.txt”,GENERIC_READ,0,0,

  OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);

windows::random_access_handle h(service,file);

streambuf buf;

read_at(h,256,buf,transfer_exactly(128));

std::istream in(&buf);

std::string line;

std::getline(in,line);

std::cout << “first line:”<<line<< std::endl;

}

非同步程式設計

這個章節深入討論非同步程式設計時會遇到的問題。閱讀一次後,簡易回過頭來重讀一次,鞏固這些概念。

The need for going asynchronous 

正如之前所說,同步程式設計要比非同步程式設計容易。這是因為他是線性的思路(呼叫函式A直到結束,呼叫函式B直到結束,等等。是事件處理思想)。在後一種情況下,你可以說有五個事件,不知道他們的執行順序,甚至不知道他們都會執行。

非同步程式設計儘管是很難的,但你還是會喜歡他,伺服器需要處理大量的併發客戶端連線。大量的併發客戶端,非同步要比同步程式設計容易。

有一個需要處理1000個併發客戶端的伺服器程式,訊息在客戶端和伺服器來回傳送,結束符為’\n’。

同步程式碼,1個執行緒:

using namespace boost::asio;

struct client{

ip::tcp::socket sock;

char buff[1024];//每個訊息的最大長度

Int already_read;//已經讀了多少

};

std::vector<client> clients;

void handle_client(){

while(true){

for(int i=0;i<clients.size();++i)

if(clients[i].sock.available())

on_read(clients[i]));

}

}

void on_read(client *c){

int to_read = std::min(1024-c.already_read,c.sock.available());

c.sock.read_some(buffer(c.buff+c.already_read,to_read));

c.already_read += to_read;

if(std::find(c.buff,c.buff+c.already_read,’\n’)<c.buff+c.alread_read){

int pos=std::find(c.buff,c.buff+c.alread_read,’\n’)-c.buff;

std::string msg(c.buff,c.buff+pos);

std::copy(c.buff+pos,c.buff+1024,c.buff);

c.alread_read-=pos;

on_read_msg(c,msg);

}

}

void on_read_msg(client &c,const std::string &msg){

// 分析訊息,會寫

if(msg ==”request_login”)

c.sock.write(“request_ok\n”);

else if

...

}

任何伺服器都要避免的事情就是伺服器反應緩慢(網路基本應用程式)。在例子中

handler_clients()功能儘可能少。如果函式阻塞在任意一點,後續客戶端傳送到來的訊息需要等待阻塞結束才能繼續處理他們。

為了保持響應,當有資料時,我們只讀取一個套接字。

if(client[i].sock.available()) on_read(clients[i])。在on_read中,我們只讀取可用的;呼叫read_until(c.sock,buffer(...),’\n’)是一個糟糕的注意。因為這會阻塞知道我們讀取到完整的訊息(我們不知道他什麼時候會發生)。

這裡的瓶頸在on_read_msg()函式;只要執行,任何到來的訊息都放入。一個好的on_read_msg方法確保不會發生,但仍有可能發生(有時候向一個緩衝區寫入資料會阻塞,直到緩衝區滿了)。

同步程式碼:10個執行緒:

using namespace boost::asio;

struct client{

// ... 和前面一樣

bool set_reading(){

boost::mutex::scoped_lock lk(cs_);

if(is_reading_) return false;// 已經讀

else{is_reading_=true;return true;}

}

void unset_reading(){

boost::mutex::scoped_lock lk(cs_);

is_reading_ =false;

}

private:

boost::mutex cs_;

bool is_reading_;

};

std::vector<client> clients;

void handle_clients(){

for(int i=0;i<10;++i){

boost::thread(handle_clients_thread);

}

}

void hande_clients_thread(){

while(true)

for(int i=0;i<clients.size();++i)

if(clients[i].sock.available())

if(clients[i].set_reading()){

on_read(clients[i]);

clients[i].unset_reading();

}

}

void on_read(client &c){

// 和前面相同

}

void on_read_msg(client &c,const std::string &msg){

// 和前面相同

}

為了使用更多執行緒,我們需要同步,在set_reading()set_unreading()函式中。

set_reading()函式是非常重要的。在一步內”測試並且標記讀取“,如果有兩個步驟,”測試讀取“和”標記讀取“,可能有兩個執行緒對同一個客戶端測試成功,然後兩個執行緒呼叫同一個客戶端的on_read函式,破壞了資料並且應用程式可能會崩潰。

你會注意到,程式碼變得越來越複雜。

同步的程式碼第三種寫法,一個客戶端一個執行緒,隨著客戶端增加,也會變得沒有了(執行緒)。

讓我們開始非同步。我們不斷的非同步讀取。當客戶端傳送一個請求時,on_msg被呼叫,然後回覆,飯後等待下一次請求(做另一次非同步操作)。

非同步程式碼,10個執行緒:

using namespace boost::asio;

io_service service;

struct client{

ip::tcp::socket sock;

streambuf buff;

};

std::vector<client> clients;

void handle_clients(){

for(int i=0;i<clients.size();++i)

async_read_until(clients[i].sock,clienets[i].buff,’\n’,

boost::bind(on_read,clients[i],_1,_2));

for(int i=0;i<10;++i)

boost::thread(handle_clients_thread);

}

void handle_clients_thread(){

service.run();

}

void on_read(client &c,const error_code &err,size_t read_bytes){

std::istream in(&c.buff);

std::string msg;

std::getline(in,msg);

if(msg == “request_login”)

c.sock.async_write(“request_ok\n”,on_write);

else if...

...

// 現在,等待下一次客戶端讀取

async_read_until(c.sock,c.buff,’\n’,

boost::bind(on_read,c,_1,_2));

}

注意程式碼變得簡單了。客戶端結構體只有兩個成員,handle_clients()函式只是呼叫async_read_until,然後建立10個執行緒,每個執行緒呼叫service.run()。這些執行緒 處理和排程非同步讀寫操作。需要注意的是,on_read()將時刻準備下一個非同步讀取操作。

非同步run(),run_on(),poll(),poll_on()

為了實現迴圈監聽,io_service提供了4個方法,run()run_one()poll()poll_one()。大部分時候使用service.run()