Boost.Asio C++ Network Programming(Chapter 2)
第二章Boost.Asio基礎
在這一章節中,需要確切知道什麼時候使用Boost.Asio,並且深入的研究非同步程式設計,這會比同步程式設計更加有趣。
The Network API
本節講解在使用Boost.Asio寫一個網路應用程式時需要知道哪些內容。
Boost.Asio namespaces
Boost.Asio下的所有都在boost::asio名稱空間下,或者是子名稱空間下:
boost:asio:包含核心類和函式。重要的類io_service和streambuf。我們也有任意的函式
(free functions),如read,read_at,read_until,他們的非同步同伴,和
boost::asio::ip:包含網路部分。包含重要的類如:address,endpoint,tcp,udp,icmp還有任意函式(free functions)connect和async_connect。注意在boost::asio::ip::tcp::socket名稱,socket只是typedefboost::asio::ip::tcp下的一個關鍵字。
boost::asio::error:此名稱空間包含在呼叫I/O時的一些錯誤程式碼。
boost::asio::ssl:此名稱空間包含SSL處理相關類。
boost::asio::local:此名稱空間包含了
boost::asio::windows:此名稱空間包含了windows具體類。
IP Addresses
處理IP地址,Boost.Asio提供了ip::address,ip::address_v4,ip::address_v6類。
它提供了相當多重要的功能:
ip::address(v4_or_v6_address):轉換ip_v4或ip_v6地址到ip::address。
ip::address::from_string(str):從IPv4地址(點分隔)或者IPv6地址(十六進位制計數法)建立一個地址。
ip::address::to_string():返回一個友好表示
ip::address_v4::broadcast([addr,mask]):這個函式建立一個廣播地址。
ip::address_v4::any():返回一個任意地址。
ip::address_v4::loopback(),ip_address_v6::lookback():返回環回地址。
ip::host_name():返回當前主機名稱字串。
大部分時候這麼使用ip::address_from_string:
ip::address addr=ip::address::from_string(“127.0.0.1”);
如果需要連線一個主機,下面的程式碼片段是無效的:
// 丟擲異常
ip::address addr=ip::address::from_string(“www.yahoo.com”);
Endpoints
Endpoint是一個地址和埠。每一個不同的socket類都有自己的端點類:如
ip::tcp::endpoint,ip::udp::endpoint,ip::ecmp::endpoint。
如果你想連線本機80埠:
ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);
可以通過三種方式構造endpoint:
endpoint():預設建構函式有時可用於UDP/ICMP sockets。
endpoint(protocol,port):用於接受連線的伺服器端socket。
endpoint(addr,port):通過地址和埠構造一個endpoint。
例如:
ip::tcp::endpoint ep1;
ip::tcp::endpoint ep2(ip::tcp::v4(),80);
ip::tcp::endpoint ep3(ip::address::from_string(“127.0.0.1”),80);
如果你想連線一個主機(沒有IP地址):
//輸出87.248.122.122
io_service service;
ip::tcp::resolver resolver(service);
ip::tcp::resolver::query query(“www.yahoo.com”,80);
ip::tcp::resolver::iterator iter = resolver.resolve(query);
ip::tcp::endpoint ep=*iter;
std::cout << ep.address().to_string() << std::endl;
你需要更換tcp套接字型別。首先,建立一個需要查詢的名稱,然後使用
resolver()函式。如果成功,至少返回一條記錄。給定迭代器,可以只使用第一個或者遍歷列表。
給定一個endpoint,你可以獲取地址,埠,和ip協議(v4或者v6):
std::cout << ep.address().tostring() << “:”<<ep.port()
<< “/”<< ep.protocol()<<std::endl;
Sockets
Boost.Asio有三種類型的套接字類:ip::tcp,ip::udp,ip::icmp,當然也是可以擴充套件的。你可以建立自己的套接字,是複雜的。如果選擇這樣做,看看
boost/asio/ip/tcp.hpp,boost/asio/ip/udp.hpp,boost/asio/ip/icmp.hpp,他們都是內部
typedef關鍵字很小的類。
你會想到ip::tcp,ip::udp,ip::icmp類佔位符,他們提供了便捷訪問類和方法:
ip::tcp::socket,ip::tcp::acceptor,ip::tcp::endpoint,ip::tcp::resolver,ip::tcp::iostream
ip::udp::socket,ip::udp::endpoint,ip::udp::resolver
ip::icmp::socket,ip::icmp::endpoint,ip::icmp::resolver
套接字類建立一個響應的套接字,你給定io_service給建構函式:
io_service service;
ip::udp::socket sock(service);
sock.set_option(ip::udp::socket::reuse_address(true));
每個套接字名稱都是一個typedef關鍵字:
ip::tcp::socket = basic_stream_socket<tcp>
ip::udp::socket=basic_stream_socket<udp>
ip::icmp::socket=basic_raw_socket<icmp>
同步錯誤程式碼
所有的同步函式都有一個過載,丟擲異常或者返回錯誤程式碼。如下程式碼片段:
sync_func(arg1,arg2,...,argN);// throws
boost::asio::error_code ec;
sync_func(arg1,arg2,...,argN,ec);//返回錯誤程式碼
在本章的剩餘部分,你會看到更多的同步功能。為了保持簡單,省略了返回錯誤程式碼,但是他們是存在的。
套接字成員函式
函式被分成幾組。不是每個套接字都可以使用所有的函式。本節的最後會顯示函式屬於哪些類。
注意所有的非同步函式理解返回,同步函式只有在完成操作後才返回。
連線相關函式
這些函式是連線或者繫結套接字,斷開它,或者查詢連線是否有效或者無效:
assign(protocl,socket):指定一個原始(本地)套接字到一個套接字例項。用它處理遺留程式碼(本地套接字已經建立)。
open(protocol):開啟一個給定協議(v4或者v6)的套接字。主要用於UDP/ICMP套接字,或者服務端套接字。
bind(endpoint):繫結一個地址。
connect(endpoint):同步連線到給定的地址。
async_connect(endpoint):非同步連線到給定的地址。
io_open():如果套接字已經開啟返回true。
close():關閉套接字。所有的非同步操作被取消,並且返回
error::operation_aborted錯誤程式碼。
shutdown(type_of_shutdown):從現在開始禁止傳送操作,接收操作,或者兩個同時禁止。
cancel():在當前套接字上取消所有的非同步操作。這個套接字的所有非同步操作都完成並且返回error::operation_aborted錯誤程式碼。
例項如下:
ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4());
socket.connect(ep);
sock.write_some(buffer(“GET /index.html\r\n”));
char buff[1024];
sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();
讀寫函式
在套接字上執行輸入輸出功能。
非同步操作函式的,處理者的簽名是一樣的:
void handler(const boost::system::error_code &e,size_t bytes)。
async_receive(buffer,[flags,]handler):在一個套接字上啟動非同步接受操作。
async_read_some(buffer,handler):和async_receive是等價的。
async_receive_from(buffer,endpoint,[,flags],handler):在一個指定的端點上啟動一個非同步的接受操作。
async_send(buffer[,flags],handler):啟動一個非同步傳送資料操作。
async_write_come(buffer,handler):和async_send是等價的。
async_send_to(buffer,endpoint,handler):在一個指定的端點上啟動傳送資料的非同步操作。
receive(buffer[,flags]):同步接受資料到給定的緩衝區。阻塞直到接收到資料,或者發生錯誤。
read_some(buffer):等價於receive。
receive_from(buffer,endpoint[,flags]):在指定的端點,接受資料到給定的緩衝區,函式阻塞直到接收到資料,或者出現錯誤。
send(buffer[,flags]):同步傳送緩衝區資料。函式阻塞直到傳送成功,或者出現錯誤。
write_some(buffer):等價於send。
send_to(buffer,endpoint[,flags]):同步傳送緩衝區資料到給定的端點。函式阻塞直到傳送成功或者出現錯誤。
available():返回可以不阻塞的同步讀取多個位元組。
討論緩衝區。檢驗flags。flags預設值為0,也可以是一個組合:
ip::socket_type::socket::message_peek:這個標記僅僅檢視訊息。它返回一個訊息,但是下次讀取是將重讀這個訊息。
ip::socket_type::socket::message_out_of_band:這個標記處理out-of-band(OOB)資料。OOB資料是比普通資料更重要的資料。關於OOB的討論超出了本書的範圍。
ip::socket_type::socket::message_do_not_route:這個標記指定訊息不使用路由表傳送。
ip::socket_type:socket::message_end_of_record:這個標記指定資料記錄的結束標記。這個不能在windows下指定。
使用message_peek,如下面程式碼片段所示:
char buff[1024];
sock.receive(buffer(buff),ip::tcp::socket::message_peek);
memset(buff,1024,0);
// 重讀上一次讀取的
sock.receive(buffer(buff));
以下例子,引導讀者使用同步和非同步不同的套接字:
1.例子1同步讀寫一個tcp套接字。
ip::tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer(“GET /index/html\r\n”));
std::cout<<”bytes available” << sock.available()<<std::endl;
char buff[512];
size_t read=sock.read_some(buffer(buff));
2.同步讀寫一個udp套接字。
ip::udp::socket sock(service);
sock.oopen(ip::udp::v4);
ip::udp::endpoint receiver_ep(“87.248.112.181”,80);
sock.send_to(buffer(“testing\n”),receive_ep);
char buff[512];
ip::udp::endpoint sender_ep;
sock.receive_from(buffer(buff),sender_ep);
注意:在上面的程式碼片段中,從UDP套接字接受資料使用的是receive_from,需要一個預設建構函式的endpoint。
3.非同步讀寫一個udp套接字
using namespace boost::asio;
io_service service;
ip::udp::socket sock(service);
boost::asio::ip::udp::endpoint sender_ep;
char buff[512];
void on_read(const boost::sytem::error_code &err,std::size_t read_bytes){
std::cout << “read” << read_bytes << std::endl;
Sock.async_recieve_from(buffer(buff),sender_ep,on_read);
}
int main(int argc,char* argv[]){
ip::udp::endpoint ep(ip::address::from_string(“127.0.0.1”,8001);
sock.open(ep.protocol());
sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
sock.bind(ep);
sock.async_receive_from(buffer(buff,512),sender_ep,on_read);
serivce.run();
}
套接字控制
這些函式處理套接字高階選項
get_io_service():返回建構函式給定的io_servcie例項。
get_option(option):返回套接字選項。
set_option(option):設定套接字選項。
io_control(cmd):在套接字上執行I/O命令。
下面是可以設定或者獲取的套接字選項:
名稱 |
描述 |
類別 |
broadcast |
If true, it allows broadcasting messages |
bool |
debug |
If true, it enables socket-level debugging |
bool |
do_not_route |
If true, it prevents routing and use local interfaces only |
bool |
enable_ connection_ aborted |
If true, it reports connections that were aborted on accept() |
bool |
keep_alive |
If true, it sends keep-alives |
bool |
linger |
If true, socket lingers on close() if there's unsent data |
bool |
receive_buffer_ size |
This is a received buffer size for a socket |
int |
receive_low_ watemark |
This provides a minimum number of bytes to process for socket input |
int |
reuse_address |
If true, socket can be bound to an address already in use |
bool |
send_buffer_ size |
This sends buffer size for a socket |
int |
send_low_ watermark |
This provides a minimum number of bytes to send for socket output |
int |
ip::v6_only |
If true, it allows only IPv6 communication |
bool |
每個名稱代表一個套接字typedef型別 或者一個類,使用如下程式碼所示:
ip:tcp::endpoint ep(ip::address::from_string(“127.0.0.1”),80);
ip::tcp::socket sock(service);
sock.connet(ep);
// TCP套接字可複用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
// 獲取套接字接受緩衝區大小
ip::tcp::socket::receive_buffer_size rbs;
sock.get_option(rbs);
std::cout << rbs.value() << std::endl;
// 設定接受緩衝區大小為8192
ip::tcp::socket::send_buffer_size sbs(8192);
sock.set_option(sbs);
注意:使用以上功能時,需要開啟套接字,否則會丟擲異常。
TCP 和 UDP 和ICMP
在前面說過,不是所有的套接字可以使用所有的函式。做了一個列表展示不同函式。一些函式沒在這裡出現,意味著可以應用於所有的套接字。
Name |
TCP |
UDP |
ICMP |
async_read_some |
Yes |
- |
- |
async_receive_from |
- |
Yes |
Yes |
async_write_some |
Yes |
- |
- |
async_send_to |
- |
Yes |
Yes |
read_some |
Yes |
- |
- |
receive_from |
- |
Yes |
Yes |
write_some |
Yes |
- |
- |
send_to |
- |
Yes |
Yes |
其它雜項功能
其它無關輸入輸出的功能函式如下:
local_endpoint():返回套接字的本地連線地址。
remote_endpoint():返回需要連線的遠端地址。
native_handle():返回原始套接字控制代碼。你只想在不使用boost.asio的情況下使用原始套接字。
no_blocking():如果套接字是非阻塞的返回true,否則返回false。
native_no_blocking():如果套接字是非阻塞返回true,否則返回false。它是在本地呼叫原始套接字。通常不需要這個(no_blocking已經快取了結果);如果使用native_hander使用他。
at_mark():如果套接字是讀OOB資料返回true。很少需要這個。
其它
最後一點,套接字例項是無法複製的,複製建構函式和賦值過載(operator=)是無法呼叫的。
ip::tcp::socket s1(service),s2(service);
s1=s2;//編譯期錯誤
ip::tcp::socket s3(s1);// 編譯器錯誤
這是有道理的,每個例項擁有和管理資源(原始套接字本身)。如果允許複製構造,我們會結束兩個擁有相同原始套接字的例項,他們需要某種管理機制(一個例項擁有所有權,或者引用計數,或者其它方法)。Boost.Aso選擇不允許複製(如果你想使用副本,只能使用智慧指標)。
typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
socket_ptr sock1(new ip::tcp::socket(servcie));
socket_ptr sock2(sock1);
socket_ptr sock3;
sock3 = sock1;
套接字緩衝區
當從一個套接字讀取或者向一個套接字寫入資料時,需要一個緩衝區,存放輸入輸出資料。這個緩衝區儲存器必須脫離I/O操作;你要確保在I/O操作期間或者超出作用域不會被釋放。
在同步操作時很方便,當然傳送和接受結束後,buff還是存在。
char buff[512];
...
sock.receive(buffer(buff));
strcpy(buff,”ok\n”);
sock.send(buffer(buff));
如下程式碼片段所示,非同步操作就沒這麼簡單了:
// 錯誤的程式碼
void on_read(const boost::system::error_code &err,std::size_t read_bytes){
...
}
void func(){
char buff[512];
sock.async_receive(buffer(buff),on_read);
}
呼叫async_receive()函式之後,buff將離開他的作用域,因此他的記憶體空間會被釋放。當我們正在這個套接字上接受資料,我們將複製到一個不屬於我們的內容中;它可以被釋放,或者重新分配給其它程式碼需要的資料,因此,是錯誤的記憶體。
上述問題的幾種解決方案:
使用全域性緩衝區。
創佳一個緩衝區,在操作完成後釋放它。
一個連線物件管理著套接字,和其它額外的資料,如buffer(s)。
第一個解決方案是不好的,因為我們直到全域性變數是非常不好的。此外,如果兩個處理器使用相同的緩衝區會發生什麼?
這裡是如何實現第二種解決方案:
void on_read(char *ptr,const boost::system::error_code &err, std::size_t read_bytes){
delete [] ptr;
}
...
char *buff=new char[512];
sock.async_receive(buffer(buf,512),bost::bind(on_read,buff,_1,_2));
或者你想讓緩衝區離開作用域時自動完成操作(銷燬?),使用智慧指標:
struct shared_buffer{
boost::shared_array<char> buff;
int size;
shared_buffer(size_t size):buff(new char[size]),size(size){
}
mutable_buffers_1 asio_buff() const{
return buffer(buff.get(),size);
}
};
// 當前on_read離開作用域時,boost::bind的物件將被釋放,
// 也將釋放shared_buffer
void on_read(shared_buffer,const boost::system::error_code &err,std::size_t read_bytes){}
...
shared_buffer buff(512);
sock.async_receive(buff.asio_buff(),boost::bind(on_read,buff,_1,_2));
類shared_buffer用好內部類shared_array<>,一個shared_buffer的例項副本隨著
shared_array<>一直存在,當離開作用域時,shared_array<>將自動銷燬,這正式我們所需要的。
這正是你期待的,當呼叫完成處理程式時,Boost.Asio拷貝一個副本給完成處理程式。這個拷貝是boost::bind的一個函式引數(functor),他內部包含了一個我們給定的shared_buffer例項的拷貝。
第三個選項是使用連線物件管理套接字和相關資料。如緩衝區(buffers),通常是正確的,但是卻很複雜。將在本章最後討論。
緩衝區包裝類(函式)
在你看過的程式碼片段中,每當執行讀/寫操作時都需要一個緩衝區(buffer),呼叫buffer()函式包裝一個緩衝區物件。
char buff[512];
sock.async_receive(buffer(buff),on_read);
這主要包裝了一個任意緩衝區給一個類以便Boost.Asio遍歷緩衝區。可以使用如下的程式碼片段:
sock.async_receive(some_buffer,on_read);
some_buffer需要滿足一定的條件,即(ConstBufferSequence 或者
MutableBufferSequence (可以檢視Boost.Asio的文件)。自己建立滿足這樣條件的類是很複雜的,但是Boost.Asio已經提供了一些滿足這些條件的類。不要直接訪問他們,使用buffer()函式即可。
你可以使用buffer()的任意功能:
A char[] const array
A void* pointer and size in characters
An std::string string
An POD[] const array (POD stands for plain old data, meaning, constructor and destructor do nothing)
An std::vector array of any POD
A boost::array array of any POD
An std::array array of any POD
如下程式碼所示:
struct pod_sample{int i;long l;char c;}
...
char b1[512];
void *b2=new char[512];
std::string b3;b3.resize(128);
pod_sample b4[16];
std::vector<pod_sample> b5;b5.resize(16);
boost::array<pod_sample,16> b6;
std::array<pod_sample,16) b7;
sock.async_send(buffer(b1),on_read);
sock.async_send(buffer(b2,512),on_read);
sock.async_send(buffer(b3),on_read);
sock.async_send(buffer(b4),on_read);
sock.async_send(buffer(b5),on_read);
sock.async_send(buffer(b6),on_read);
sock.async_send(buffer(b7),on_read);
所有的一切,都不不要自己建立類來滿足ConstBufferSequence 或者
MutableBufferSequence,你可以根據需要建立一個類,放入緩衝區,返回mutable_buffers_1例項,這就是前面我們講的shared_buffer類。
讀、寫、連線函式
Boost.Asio提供處理I/O的函式,我們分為4組
The connect functions
這個函式連線到一個給定的端點上。
connect(socket,begin[,end][,condition]):函式嘗試同步連線到列表中的每個端點上。begin迭代器是socket_type::resolver::query呼叫的結果(你可能需要再次看看EndPoint章節)。指定的end迭代器是可選項,你可以忽略它。在每個嘗試連線前可以提供一個條件函式。函式簽名Iterator connect_condition(const boost::system::error_code & err, Iterator next);你可以選擇返回不同的迭代器,允許跳過某些端點。
async_connect(socket,begin[,end][,condition],handler):這個函式執行非同步連線,在最後呼叫完成處理程式,處理程式簽名:
void handler(const boost::system::error_code & err, Iterator iterator)。第二個引數傳遞給完成處理程式成功的端點,否則就是end迭代器。
例子如下程式碼所示:
using namespace boost::asio::ip;
tcp::resolver resolver(service);
tcp::resolver::iterator iter=resolver.resolve(tcp::resolver::query(“www.yahoo.com”,”80”));
tcp::socket sock(service);
connect(sock,iter);
一個主機可以解析為多個地址,因此connect和async_connect嘗試連線每個地址,直到有個成功。
The read/write functions
這些函式從一個從流(stream)中讀取,或者向流(stream)中寫入(可以是一個套接字,或者任何有流行為的類)。
async_read(stream,buffer[,completion],handler):非同步從流中讀取。完成後,完成處理程式(handler)被呼叫。完成處理程式函式簽名:
void handler(const boost::system::error_ code & err, size_t bytes)。你可以選擇指定completion函式,completion函式在讀取成功後呼叫,並且告訴Boost.Asio async_read完成了(如果不成功,繼續讀)。他的簽名:
size_t completion(const boost::system::error_code& err, size_t bytes_transfered). 當completion函式返回0,我們認為讀操作完成;如果返回非0值,表示下次呼叫async_read_some可以讀取的最大位元組數。
async_write(stream,buffer[,completion],handler):異步向流中寫入資料。引數和aysnc_read類似。
read(stream,buffer[,completion]):同步從流中讀取資料。引數和async_read類似。
write(stream,buffer[,completion]):同步向流中寫入資料。引數和async_read類似。
async_read(stream, stream_buffer [, completion], handler)
async_write(strean, stream_buffer [, completion], handler)
write(stream, stream_buffer [, completion])
read(stream, stream_buffer [, completion])
首先,注意套接字例項,第一個引數是流(stream)。包括但是不限於套接字。例如可以不是一個套接字,可以是windows檔案控制代碼。
每個讀/寫操作結束會觸發這些條件的一個或者多個:
提供的緩衝區滿了(讀)或者緩衝區的資料被寫入(寫)。
completion函式返回0(如果你提供這樣的函式)。
發生了錯誤。
下面的程式碼非同步從套接字中讀取,直到滿足’\n’:
io_service servcie;
ip::tcp::socket socket(service);
char buff[512];
int offset = 0;
size_t up_to_enter(const boost::system::error_code &,size_t bytes){
for(size_t i=0;i<bytes;i++){
if(buff[i+offset]==’\n’)
return 0;
return 1;
}
}
void on_read(const boost::system::error_code &,size_t){}
...
async_read(sock,buffer(buff),up_to_enter,on_read);
Boost.Asio提供了幾個completion的幫助函式。
transfer_at_least(n)
transfer_exactly(n)
transfer_all()
如下程式碼所示:
char buff[512];
void on_read(const boost::system::error_code&,size_t){}
// 讀取整整32個位元組。
async_read(sock,buffer(buff),transfer_exactly(32),on_read);
最後四個函式不使用常規的buffer,使用stream_buffer函式,Boost.Asio提供的
std::streambuf的繼承類。STL 流(streams)和流緩衝(stream buffers)是非常靈活的,如下面程式碼所示:
io_service servcie;
void on_read(stream &buf,const boost::system::error_code &,size_t){
std::istream in(&buf);
std::string line;
std::getline(in,line);
std::cout << “first line:”<< line<<std::endl;
}
int main(int argc,char *argv[]){
HANDLE file=::CreateFile(“readme.txt”,GENERIC_READ,0,0,
OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPEND,0);
windows::stream_hander h(service,file);
Streambuf buf;
async_read(h,buf,transfer_exactly(256),
boost::bind(on_read,boost::ref(buf),_1,_2));
service.run();
}
這裡,演示了async_read可以使用windows檔案控制代碼。首先讀取256個位元組資料,然後儲存在緩衝區。當讀操作完成,on_read被呼叫,建立一個std::istream處理緩衝區,讀取第一行,並且輸出到控制檯。
The read_until/async_read_until functions
這些函式在滿足條件之前一直讀:
async_read_until(stream,stream_buffer,delim,handler):開始一個非同步讀取操作。遇到delimeter讀操作將停止。delimeter可以是任意字元,std::string或者boost::regex。完成處理程式標籤:void handler(const boost::system::error_code & err, size_t bytes)。
async_read_until(stream,stream_buffer,completion,handler):這個函式和前面一個相同,但是使用completion函式替代delim。completion函式簽名:
pair<iterator,bool> completion(iterator begin, iterator end);
這裡iterator = is buffers_iterator<streambuf::const_buffers_type>。 你要記住的是,迭代器型別是 random-accessiterator。你可以掃描(begin,end)範圍,以確定是否需要停止或者繼續。返回一個pair,第一個引數是函式結束字元一個迭代器(
the first member will be an iterator passed at the end of the last character consumed by the function );第二個引數如果是true讀操作將停止,否則繼續。
read_until(stream,stream_buffer,delim):同步讀取操作。引數和async_read_until類似。
read_until(stream,stream_buffer,comletion):同步讀取操作。引數和
async_read_until類似。
下面的列子將讀取ygie識別符號號:
typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator,bool> match_punct(iterator begin,iterator end){
while(begin!=end){
if(std::ispunct(*begin))
return std::make_pair(begin,true);
}
return std::make_pair(end,false);
}
void on_read(const boost::system::error_code &,size_t ){}
....
streambuf buf;
async_read_until(sock,buf,match_punct,on_read);
如果向讀取一個空格,修改最後一行:
async_read_until(sock,buff,’ ’,on_read);
The *_at functions
隨機讀寫一個流的操作。從指定偏移位置讀寫。
async_read_at(stream,offset,buffer[,completion],handler):在一個流的指定偏移位置開始一個非同步的讀操作。當操作完成時,會呼叫完成處理程式(handler)。完成處理程式(handler)函式標籤:void handler(const boost::system::error_code& err, size_t bytes) 。通常使用buffer()包裝函式或者streambuf 函式。如果提供了completion函式,每次讀取成功都會呼叫,並且告訴Boost.Asio async_read_at操作完成(如果不是,繼續讀)。completion函式簽名:
size_t completion(const boost::system::error_code& err, size_t bytes)。如果
completion函式返回0,我們認為讀操作完成;如果返回非0值,表示下一次呼叫
async_read_some_at的最大可以讀取的位元組數。
async_write_at(stream,offset,buffer[,completion],handler):啟動一個非同步寫入操作。引數和async_read_at類似。
read_at(stream,offset,buffer[,completion]):一個給定的流上,從偏移位置開始讀取。引數和async_read_at類似。
write_at(stream,offset,buffer[,completion]):在一個給定的流上,從偏移位置開始寫入。引數和async_read_at類似。
這些函式不適用與套接字。他們處理隨機讀取流;換句話說,流可以隨機訪問。套接字顯然不是這樣的(套接字是隻能向前的(forward-only))。
從檔案中的256偏移位置讀取128個位元組:
io_service service;
int main(int argc,char*argv){
HANDLE file = ::CreateFile(“readme.txt”,GENERIC_READ,0,0,
OPEN_ALWAYS,FILE_ATTRIBUTE_NORMAL|FILE_FLAG_OVERLAPPED,0);
windows::random_access_handle h(service,file);
streambuf buf;
read_at(h,256,buf,transfer_exactly(128));
std::istream in(&buf);
std::string line;
std::getline(in,line);
std::cout << “first line:”<<line<< std::endl;
}
非同步程式設計
這個章節深入討論非同步程式設計時會遇到的問題。閱讀一次後,簡易回過頭來重讀一次,鞏固這些概念。
The need for going asynchronous
正如之前所說,同步程式設計要比非同步程式設計容易。這是因為他是線性的思路(呼叫函式A直到結束,呼叫函式B直到結束,等等。是事件處理思想)。在後一種情況下,你可以說有五個事件,不知道他們的執行順序,甚至不知道他們都會執行。
非同步程式設計儘管是很難的,但你還是會喜歡他,伺服器需要處理大量的併發客戶端連線。大量的併發客戶端,非同步要比同步程式設計容易。
有一個需要處理1000個併發客戶端的伺服器程式,訊息在客戶端和伺服器來回傳送,結束符為’\n’。
同步程式碼,1個執行緒:
using namespace boost::asio;
struct client{
ip::tcp::socket sock;
char buff[1024];//每個訊息的最大長度
Int already_read;//已經讀了多少
};
std::vector<client> clients;
void handle_client(){
while(true){
for(int i=0;i<clients.size();++i)
if(clients[i].sock.available())
on_read(clients[i]));
}
}
void on_read(client *c){
int to_read = std::min(1024-c.already_read,c.sock.available());
c.sock.read_some(buffer(c.buff+c.already_read,to_read));
c.already_read += to_read;
if(std::find(c.buff,c.buff+c.already_read,’\n’)<c.buff+c.alread_read){
int pos=std::find(c.buff,c.buff+c.alread_read,’\n’)-c.buff;
std::string msg(c.buff,c.buff+pos);
std::copy(c.buff+pos,c.buff+1024,c.buff);
c.alread_read-=pos;
on_read_msg(c,msg);
}
}
void on_read_msg(client &c,const std::string &msg){
// 分析訊息,會寫
if(msg ==”request_login”)
c.sock.write(“request_ok\n”);
else if
...
}
任何伺服器都要避免的事情就是伺服器反應緩慢(網路基本應用程式)。在例子中
handler_clients()功能儘可能少。如果函式阻塞在任意一點,後續客戶端傳送到來的訊息需要等待阻塞結束才能繼續處理他們。
為了保持響應,當有資料時,我們只讀取一個套接字。
if(client[i].sock.available()) on_read(clients[i])。在on_read中,我們只讀取可用的;呼叫read_until(c.sock,buffer(...),’\n’)是一個糟糕的注意。因為這會阻塞知道我們讀取到完整的訊息(我們不知道他什麼時候會發生)。
這裡的瓶頸在on_read_msg()函式;只要執行,任何到來的訊息都放入。一個好的on_read_msg方法確保不會發生,但仍有可能發生(有時候向一個緩衝區寫入資料會阻塞,直到緩衝區滿了)。
同步程式碼:10個執行緒:
using namespace boost::asio;
struct client{
// ... 和前面一樣
bool set_reading(){
boost::mutex::scoped_lock lk(cs_);
if(is_reading_) return false;// 已經讀
else{is_reading_=true;return true;}
}
void unset_reading(){
boost::mutex::scoped_lock lk(cs_);
is_reading_ =false;
}
private:
boost::mutex cs_;
bool is_reading_;
};
std::vector<client> clients;
void handle_clients(){
for(int i=0;i<10;++i){
boost::thread(handle_clients_thread);
}
}
void hande_clients_thread(){
while(true)
for(int i=0;i<clients.size();++i)
if(clients[i].sock.available())
if(clients[i].set_reading()){
on_read(clients[i]);
clients[i].unset_reading();
}
}
void on_read(client &c){
// 和前面相同
}
void on_read_msg(client &c,const std::string &msg){
// 和前面相同
}
為了使用更多執行緒,我們需要同步,在set_reading()和set_unreading()函式中。
set_reading()函式是非常重要的。在一步內”測試並且標記讀取“,如果有兩個步驟,”測試讀取“和”標記讀取“,可能有兩個執行緒對同一個客戶端測試成功,然後兩個執行緒呼叫同一個客戶端的on_read函式,破壞了資料並且應用程式可能會崩潰。
你會注意到,程式碼變得越來越複雜。
同步的程式碼第三種寫法,一個客戶端一個執行緒,隨著客戶端增加,也會變得沒有了(執行緒)。
讓我們開始非同步。我們不斷的非同步讀取。當客戶端傳送一個請求時,on_msg被呼叫,然後回覆,飯後等待下一次請求(做另一次非同步操作)。
非同步程式碼,10個執行緒:
using namespace boost::asio;
io_service service;
struct client{
ip::tcp::socket sock;
streambuf buff;
};
std::vector<client> clients;
void handle_clients(){
for(int i=0;i<clients.size();++i)
async_read_until(clients[i].sock,clienets[i].buff,’\n’,
boost::bind(on_read,clients[i],_1,_2));
for(int i=0;i<10;++i)
boost::thread(handle_clients_thread);
}
void handle_clients_thread(){
service.run();
}
void on_read(client &c,const error_code &err,size_t read_bytes){
std::istream in(&c.buff);
std::string msg;
std::getline(in,msg);
if(msg == “request_login”)
c.sock.async_write(“request_ok\n”,on_write);
else if...
...
// 現在,等待下一次客戶端讀取
async_read_until(c.sock,c.buff,’\n’,
boost::bind(on_read,c,_1,_2));
}
注意程式碼變得簡單了。客戶端結構體只有兩個成員,handle_clients()函式只是呼叫async_read_until,然後建立10個執行緒,每個執行緒呼叫service.run()。這些執行緒 處理和排程非同步讀寫操作。需要注意的是,on_read()將時刻準備下一個非同步讀取操作。
非同步run(),run_on(),poll(),poll_on()
為了實現迴圈監聽,io_service提供了4個方法,run(),run_one(),poll(),poll_one()。大部分時候使用service.run()