1. 程式人生 > >Boost.Asio基本原理(CSDN也有Markdown了,好開森)

Boost.Asio基本原理(CSDN也有Markdown了,好開森)

Boost.Asio基本原理

這一章涵蓋了使用Boost.Asio時必須知道的一些事情。我們也將深入研究比同步程式設計更復雜、更有樂趣的非同步程式設計。

網路API

這一部分包含了當使用Boost.Asio編寫網路應用程式時必須知道的事情。

Boost.Asio名稱空間

Boost.Asio的所有內容都包含在boost::asio名稱空間或者其子名稱空間內。

  • boost::asio:這是核心類和函式所在的地方。重要的類有io_service和streambuf。類似read, read_at, read_until方法,它們的非同步方法,它們的寫方法和非同步寫方法等自由函式也在這裡。
  • boost::asio::ip:這是網路通訊部分所在的地方。重要的類有*address, endpoint, tcp,
    udp和icmp*,重要的自由函式有connectasync_connect。要注意的是在boost::asio::ip::tcp::socket中間,socket只是boost::asio::ip::tcp類中間的一個typedef關鍵字。
  • boost::asio::error:這個名稱空間包含了呼叫I/O例程時返回的錯誤碼
  • boost::asio::ssl:包含了SSL處理類的名稱空間
  • boost::asio::local:這個名稱空間包含了POSIX特性的類
  • boost::asio::windows
    :這個名稱空間包含了Windows特性的類

IP地址

對於IP地址的處理,Boost.Asio提供了ip::address , ip::address_v4ip::address_v6類。
它們提供了相當多的函式。下面列出了最重要的幾個:

  • ip::address(v4_or_v6_address):這個函式把一個v4或者v6的地址轉換成ip::address
  • ip::address:from_string(str):這個函式根據一個IPv4地址(用.隔開的)或者一個IPv6地址(十六進位制表示)建立一個地址。
  • ip::address::to_string() :這個函式返回這個地址的字串。
  • ip::address_v4::broadcast([addr, mask]):這個函式建立了一個廣播地址
    ip::address_v4::any():這個函式返回一個能表示任意地址的地址。
  • ip::address_v4::loopback(), ip_address_v6::loopback():這個函式返回環路地址(為v4/v6協議)
  • ip::host_name():這個函式用string資料型別返回當前的主機名。

大多數情況你會選擇用ip::address::from_string

ip::address addr = ip::address::from_string("127.0.0.1");

如果你想通過一個主機名進行連線,下面的程式碼片段是無用的:

// 丟擲異常
ip::address addr = ip::address::from_string("www.yahoo.com");

端點

端點是使用某個埠連線到的一個地址。不同型別的socket有它自己的endpoint類,比如ip::tcp::endpoint、ip::udp::endpointip::icmp::endpoint

如果想連線到本機的80埠,你可以這樣做:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);

有三種方式來讓你建立一個端點:

  • endpoint():這是預設建構函式,某些時候可以用來建立UDP/ICMP socket。
  • endpoint(protocol, port):這個方法通常用來建立可以接受新連線的伺服器端socket。
  • endpoint(addr, port):這個方法建立了一個連線到某個地址和埠的端點。

例子如下:

ip::tcp::endpoint ep1;
ip::tcp::endpoint ep2(ip::tcp::v4(), 80);
ip::tcp::endpoint ep3( ip::address::from_string("127.0.0.1), 80);

如果你想連線到一個主機(不是IP地址),你需要這樣做:

// 輸出 "87.248.122.122"
io_service service;
ip::tcp::resolver resolver(service);
ip::tcp::resolver::query query("www.yahoo.com", "80");
ip::tcp::resolver::iterator iter = resolver.resolve( query);
ip::tcp::endpoint ep = *iter;
std::cout << ep.address().to_string() << std::endl;

你可以用你需要的socket型別來替換tcp。首先,為你想要查詢的名字建立一個查詢器,然後用resolve()函式解析它。如果成功,它至少會返回一個入口。你可以利用返回的迭代器,使用第一個入口或者遍歷整個列表來拿到全部的入口。

給定一個端點,可以獲得他的地址,埠和IP協議(v4或者v6):

std::cout << ep.address().to_string() << ":" << ep.port()
<< "/" << ep.protocol() << std::endl;

套接字

Boost.Asio有三種類型的套接字類:ip::tcp, ip::udpip::icmp。當然它也是可擴充套件的,你可以建立自己的socket類,儘管這相當複雜。如果你選擇這樣做,參照一下boost/asio/ip/tcp.hpp, boost/asio/ip/udp.hppboost/asio/ip/icmp.hpp。它們都是含有內部typedef關鍵字的超小類。

你可以把ip::tcp, ip::udp, ip::icmp類當作佔位符;它們可以讓你便捷地訪問其他類/函式,如下所示:

  • ip::tcp::socket, ip::tcp::acceptor, ip::tcp::endpoint,ip::tcp::resolver, ip::tcp::iostream
  • ip::udp::socket, ip::udp::endpoint, ip::udp::resolver
  • ip::icmp::socket, ip::icmp::endpoint, ip::icmp::resolver

socket類建立一個相應的socket。而且總是在構造的時候傳入io_service例項:

io_service service;
ip::udp::socket sock(service)
sock.set_option(ip::udp::socket::reuse_address(true));

每一個socket的名字都是一個typedef關鍵字

  • ip::tcp::socket = basic_stream_socket
  • ip::udp::socket = basic_datagram_socket
  • ip::icmp::socket = basic_raw_socket

同步錯誤碼

所有的同步函式都有丟擲異常或者返回錯誤碼的過載,比如下面的程式碼片段:

sync_func( arg1, arg2 ... argN); // 丟擲異常
boost::system::error_code ec;
sync_func( arg1 arg2, ..., argN, ec); // 返回錯誤碼

在這一章剩下的部分,你會見到大量的同步函式。簡單起見,我省略了有返回錯誤碼的過載,但是不可否認它們確實是存在的。

socket成員方法

這些方法被分成了幾組。並不是所有的方法都可以在各個型別的套接字裡使用。這個部分的結尾將有一個列表來展示各個方法分別屬於哪個socket類。

注意所有的非同步方法都立刻返回,而它們相對的同步實現需要操作完成之後才能返回。

連線相關的函式

這些方法是用來連線或繫結socket、斷開socket字連線以及查詢連線是活動還是非活動的:

  • assign(protocol,socket):這個函式分配了一個原生的socket給這個socket例項。當處理老(舊)程式時會使用它(也就是說,原生socket已經被建立了)
  • open(protocol):這個函式用給定的IP協議(v4或者v6)開啟一個socket。你主要在UDP/ICMP socket,或者服務端socket上使用。
  • bind(endpoint):這個函式繫結到一個地址
  • connect(endpoint):這個函式用同步的方式連線到一個地址
  • async_connect(endpoint):這個函式用非同步的方式連線到一個地址
  • is_open():如果套接字已經開啟,這個函式返回true
  • close():這個函式用來關閉套接字。呼叫時這個套接字上任何的非同步操作都會被立即關閉,同時返回error::operation_aborted錯誤碼。
  • shutdown(type_of_shutdown):這個函式立即使send或者receive操作失效,或者兩者都失效。
  • cancel():這個函式取消套接字上所有的非同步操作。這個套接字上任何的非同步操作都會立即結束,然後返回error::operation_aborted錯誤碼。

例子如下:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.open(ip::tcp::v4()); n
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
char buff[1024]; sock.read_some(buffer(buff,1024));
sock.shutdown(ip::tcp::socket::shutdown_receive);
sock.close();

讀寫函式

這些是在套接字上執行I/O操作的函式。

對於非同步函式來說,處理程式的格式void handler(const boost::system::error_code& e, size_t bytes)都是一樣的

  • async_receive(buffer, [flags,] handler):這個函式啟動從套接字非同步接收資料的操作。
  • async_read_some(buffer,handler):這個函式和async_receive(buffer, handler)功能一樣。
  • async_receive_from(buffer, endpoint[, flags], handler):這個函式啟動從一個指定端點非同步接收資料的操作。
  • async_send(buffer [, flags], handler):這個函式啟動了一個非同步傳送緩衝區資料的操作。
  • async_write_some(buffer, handler):這個函式和a*sync_send(buffer, handler)*功能一致。
  • async_send_to(buffer, endpoint, handler):這個函式啟動了一個非同步send緩衝區資料到指定端點的操作。
  • receive(buffer [, flags]):這個函式非同步地從所給的緩衝區讀取資料。在讀完所有資料或者錯誤出現之前,這個函式都是阻塞的。
  • read_some(buffer):這個函式的功能和receive(buffer)是一致的。
    • receive_from(buffer, endpoint [, flags])*:這個函式非同步地從一個指定的端點獲取資料並寫入到給定的緩衝區。在讀完所有資料或者錯誤出現之前,這個函式都是阻塞的。
  • send(buffer [, flags]):這個函式同步地傳送緩衝區的資料。在所有資料傳送成功或者出現錯誤之前,這個函式都是阻塞的。
  • write_some(buffer):這個函式和send(buffer)的功能一致。
  • send_to(buffer, endpoint [, flags]):這個函式同步地把緩衝區資料傳送到一個指定的端點。在所有資料傳送成功或者出現錯誤之前,這個函式都是阻塞的。
  • available():這個函式返回有多少位元組的資料可以無阻塞地進行同步讀取。

稍後我們將討論緩衝區。讓我們先來了解一下標記。標記的預設值是0,但是也可以是以下幾種:

  • ip::socket_type::socket::message_peek:這個標記只監測並返回某個訊息,但是下一次讀訊息的呼叫會重新讀取這個訊息。
  • ip::socket_type::socket::message_out_of_band:這個標記處理帶外(OOB)資料,OOB資料是被標記為比正常資料更重要的資料。關於OOB的討論在這本書的內容之外。
  • ip::socket_type::socket::message_do_not_route:這個標記指定資料不使用路由表來發送。
  • ip::socket_type::socket::message_end_of_record:這個標記指定的資料標識了記錄的結束。在Windows下不支援。

你最常用的可能是message_peek,使用方法請參照下面的程式碼片段:

char buff[1024];
sock.receive(buffer(buff), ip::tcp::socket::message_peek );
memset(buff,1024, 0);
// 重新讀取之前已經讀取過的內容
sock.receive(buffer(buff) );

下面的是一些教你如何同步或非同步地從不同型別的套接字上讀取資料的例子:

  • 例1是在一個TCP套接字上進行同步讀寫:
ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
sock.write_some(buffer("GET /index.html\r\n"));
std::cout << "bytes available " << sock.available() << std::endl;
char buff[512];
size_t read = sock.read_some(buffer(buff));
  • 例2是在一個UDP套接字上進行同步讀寫:
ip::udp::socket sock(service);
sock.open(ip::udp::v4());
ip::udp::endpoint receiver_ep("87.248.112.181", 80);
sock.send_to(buffer("testing\n"), receiver_ep);
char buff[512];
ip::udp::endpoint sender_ep;
sock.receive_from(buffer(buff), sender_ep);

[?注意:就像上述程式碼片段所展示的那樣,使用receive_from從一個UDP套接字讀取資料時,你需要構造一個預設的端點]

  • 例3是從一個UDP服務套接字中非同步讀取資料:
using namespace boost::asio;
io_service service;
ip::udp::socket sock(service);
boost::asio::ip::udp::endpoint sender_ep;
char buff[512];
void on_read(const boost::system::error_code & err, std::size_t read_bytes) {
    std::cout << "read " << read_bytes << std::endl;
    sock.async_receive_from(buffer(buff), sender_ep, on_read);
}
int main(int argc, char* argv[]) {
    ip::udp::endpoint ep(ip::address::from_string("127.0.0.1"),
8001);
    sock.open(ep.protocol());
    sock.set_option(boost::asio::ip::udp::socket::reuse_address(true));
    sock.bind(ep);
    sock.async_receive_from(buffer(buff,512), sender_ep, on_read);
    service.run();
}

套接字控制:

這些函式用來處理套接字的高階選項:

  • get_io_service():這個函式返回建構函式中傳入的io_service例項
  • get_option(option):這個函式返回一個套接字的屬性
  • set_option(option):這個函式設定一個套接字的屬性
  • io_control(cmd):這個函式在套接字上執行一個I/O指令

這些是你可以獲取/設定的套接字選項:

名字 描述 型別
broadcast 如果為true,允許廣播訊息 bool
debug 如果為true,啟用套接字級別的除錯 bool
do_not_route 如果為true,則阻止路由選擇只使用本地介面 bool
enable_connection_aborted 如果為true,記錄在accept()時中斷的連線 bool
keep_alive 如果為true,會發送心跳 bool
linger 如果為true,套接字會在有未傳送資料的情況下掛起close() bool
receive_buffer_size 套接字接收緩衝區大小 int
receive_low_watemark 規定套接字輸入處理的最小位元組數 int
reuse_address 如果為true,套接字能繫結到一個已用的地址 bool
send_buffer_size 套接字傳送緩衝區大小 int
send_low_watermark 規定套接字資料傳送的最小位元組數 int
ip::v6_only 如果為true,則只允許IPv6的連線 bool

每個名字代表了一個內部套接字typedef或者類。下面是對它們的使用:

ip::tcp::endpoint ep( ip::address::from_string("127.0.0.1"), 80);
ip::tcp::socket sock(service);
sock.connect(ep);
// TCP套接字可以重用地址
ip::tcp::socket::reuse_address ra(true);
sock.set_option(ra);
// 獲取套接字讀取的資料
ip::tcp::socket::receive_buffer_size rbs;
sock.get_option(rbs);
std::cout << rbs.value() << std::endl;
// 把套接字的緩衝區大小設定為8192
ip::tcp::socket::send_buffer_size sbs(8192);
sock.set_option(sbs);

[?在上述特性工作之前,套接字要被開啟。否則,會丟擲異常]

TCP VS UDP VS ICMP

就像我之前所說,不是所有的成員方法在所有的套接字類中都可用。我做了一個包含成員函式不同點的列表。如果一個成員函式沒有出現在這,說明它在所有的套接字類都是可用的。

名字 TCP UDP ICMP
async_read_some - -
async_receive_from -
async_write_some - -
async_send_to -
read_some - -
receive_from -
write_some - -
send_to -

其他方法

其他與連線和I/O無關的函式如下:

  • local_endpoint():這個方法返回套接字本地連線的地址。
  • remote_endpoint():這個方法返回套接字連線到的遠端地址。
  • native_handle():這個方法返回原始套接字的處理程式。你只有在呼叫一個Boost.Asio不支援的原始方法時才需要用到它。
  • non_blocking():如果套接字是非阻塞的,這個方法返回true,否則false。
  • native_non_blocking():如果套接字是非阻塞的,這個方法返回true,否則返回false。但是,它是基於原生的套接字來呼叫本地的api。所以通常來說,你不需要呼叫這個方法(non_blocking()已經快取了這個結果);你只有在直接呼叫native_handle()這個方法的時候才需要用到這個方法。
  • at_mark():如果套接字要讀的是一段OOB資料,這個方法返回true。這個方法你很少會用到。

其他需要考慮的事情

最後要注意的一點,套接字例項不能被拷貝,因為拷貝構造方法和=操作符是不可訪問的。

ip::tcp::socket s1(service), s2(service);
s1 = s2; // 編譯時報錯
ip::tcp::socket s3(s1); // 編譯時報錯

這是非常有意義的,因為每一個例項都擁有並管理著一個資源(原生套接字本身)。如果我們允許拷貝構造,結果是我們會有兩個例項擁有同樣的原生套接字;這樣我們就需要去處理所有者的問題(讓一個例項擁有所有權?或者使用引用計數?還是其他的方法)Boost.Asio選擇不允許拷貝(如果你想要建立一個備份,請使用共享指標)

typedef boost::shared_ptr<ip::tcp::socket> socket_ptr;
socket_ptr sock1(new ip::tcp::socket(service));
socket_ptr sock2(sock1); // ok
socket_ptr sock3;           
sock3 = sock1; // ok

套接字緩衝區

當從一個套接字讀寫內容時,你需要一個緩衝區,用來儲存讀取和寫入的資料。緩衝區記憶體的有效時間必須比I/O操作的時間要長;你需要保證它們在I/O操作結束之前不被釋放。
對於同步操作來說,這很容易;當然,這個緩衝區在receive和send時都存在。

char buff[512];
...
sock.receive(buffer(buff));
strcpy(buff, "ok\n");
sock.send(buffer(buff));

但是在非同步操作時就沒這麼簡單了,看下面的程式碼片段:

// 非常差勁的程式碼 ...
void on_read(const boost::system::error_code & err, std::size_t read_bytes)
{ ... }
void func() {
    char buff[512];
    sock.async_receive(buffer(buff), on_read);
}

在我們呼叫async_receive()之後,buff就已經超出有效範圍,它的記憶體當然會被釋放。當我們開始從套接字接收一些資料時,我們會把它們拷貝到一片已經不屬於我們的記憶體中;它可能會被釋放,或者被其他程式碼重新開闢來存入其他的資料,結果就是:記憶體衝突。

對於上面的問題有幾個解決方案:

  • 使用全域性緩衝區
  • 建立一個緩衝區,然後在操作結束時釋放它
  • 使用一個集合物件管理這些套接字和其他的資料,比如緩衝區陣列

第一個方法顯然不是很好,因為我們都知道全域性變數非常不好。此外,如果兩個例項使用同一個緩衝區怎麼辦?

下面是第二種方式的實現:

void on_read(char * ptr, const boost::system::error_code & err, std::size_t read_bytes) {                       
    delete[] ptr;
}
....
char * buff = new char[512];
sock.async_receive(buffer(buff, 512), boost::bind(on_read,buff,_1,_2))

或者,如果你想要緩衝區在操作結束後自動超出範圍,使用共享指標

struct shared_buffer {
    boost::shared_array<char> buff;
    int size;
    shared_buffer(size_t size) : buff(new char[size]), size(size) {
    }
    mutable_buffers_1 asio_buff() const {
        return buffer(buff.get(), size);
    }
};


// 當on_read超出範圍時, boost::bind物件被釋放了,
// 同時也會釋放共享指標
void on_read(shared_buffer, const boost::system::error_code & err, std::size_t read_bytes) {}
sock.async_receive(buff.asio_buff(), boost::bind(on_read,buff,_1,_2));

shared_buffer類擁有實質的shared_array<>shared_array<>存在的目的是用來儲存shared_buffer例項的拷貝-當最後一個share_array<>元素超出範圍時,shared_array<>就被自動銷燬了,而這就是我們想要的結果。

因為Boost.Asio會給完成處理控制代碼保留一個拷貝,當操作完成時就會呼叫這個完成處理控制代碼,所以你的目的達到了。那個拷貝是一個boost::bind的仿函式,它擁有著實際的shared_buffer例項。這是非常優雅的!

第三個選擇是使用一個連線物件來管理套接字和其他資料,比如緩衝區,通常來說這是正確的解決方案但是非常複雜。在這一章的末尾我們會對這種方法進行討論。

緩衝區封裝函式

縱觀所有程式碼,你會發現:無論什麼時候,當我們需要對一個buffer進行讀寫操作時,程式碼會把實際的緩衝區物件封裝在一個buffer()方法中,然後再把它傳遞給方法呼叫:

char buff[512];
sock.async_receive(buffer(buff), on_read);

基本上我們都會把緩衝區包含在一個類中以便Boost.Asio的方法能遍歷這個緩衝區,比方說,使用下面的程式碼:

sock.async_receive(some_buffer, on_read);

例項some_buffer需要滿足一些需求,叫做ConstBufferSequence或者MutableBufferSequence(你可以在Boost.Asio的文件中檢視它們)。建立你自己的類去處理這些需求的細節是非常複雜的,但是Boost.Asio已經提供了一些類用來處理這些需求。所以你不用直接訪問這些緩衝區,而可以使用buffer()方法。

自信地講,你可以把下面列出來的型別都包裝到一個buffer()方法中:

  • 一個char[] const 陣列
  • 一個位元組大小的void *指標
  • 一個std::string型別的字串
  • 一個POD const陣列(POD代表純資料,這意味著構造器和釋放器不做任何操作)
  • 一個pod資料的std::vector
  • 一個包含pod資料的boost::array
  • 一個包含pod資料的std::array

下面的程式碼都是有效的:

struct pod_sample { int i; long l; char c; };
...
char b1[512];
void * b2 = new char[512];
std::string b3; b3.resize(128);
pod_sample b4[16];
std::vector<pod_sample> b5; b5.resize(16);
boost::array<pod_sample,16> b6;
std::array<pod_sample,16> b7;
sock.async_send(buffer(b1), on_read);
sock.async_send(buffer(b2,512), on_read);
sock.async_send(buffer(b3), on_read);
sock.async_send(buffer(b4), on_read);
sock.async_send(buffer(b5), on_read);
sock.async_send(buffer(b6), on_read);
sock.async_send(buffer(b7), on_read);

總的來說就是:與其建立你自己的類來處理ConstBufferSequence或者MutableBufferSequence的需求,不如建立一個能在你需要的時候保留緩衝區,然後返回一個mutable_buffers_1例項的類,而我們早在shared_buffer類中就這樣做了。

read/write/connect自由函式

Boost.Asio提供了處理I/O的自由函式,我們分四組來分析它們。

connect方法

這些方法把套接字連線到一個端點。

  • connect(socket, begin [, end] [, condition]):這個方法遍歷佇列中從start到end的端點來嘗試同步連線。begin迭代器是呼叫socket_type::resolver::query的返回結果(你可能需要回顧一下端點這個章節)。特別提示end迭代器是可選的;你可以忽略它。你還可以提供一個condition的方法給每次連線嘗試之後呼叫。用法是Iterator connect_condition(const boost::system::error_code & err,Iterator next);。你可以選擇返回一個不是next的迭代器,這樣你就可以跳過一些端點。
  • async_connect(socket, begin [, end] [, condition], handler):這個方法非同步地呼叫連線方法,在結束時,它會呼叫完成處理方法。用法是void handler(constboost::system::error_code & err, Iterator iterator);。傳遞給處理方法的第二個引數是連線成功端點的迭代器(或者end迭代器)。

它的例子如下:

using namespace boost::asio::ip;
tcp::resolver resolver(service);
tcp::resolver::iterator iter = resolver.resolve(tcp::resolver::query("www.yahoo.com","80"));
tcp::socket sock(service);
connect(sock, iter);

一個主機名可以被解析成多個地址,而connectasync_connect能很好地把你從嘗試每個地址然後找到一個可用地址的繁重工作中解放出來,因為它們已經幫你做了這些。

read/write方法

這些方法對一個流進行讀寫操作(可以是套接字,或者其他表現得像流的類):

  • async_read(stream, buffer [, completion] ,handler):這個方法非同步地從一個流讀取。結束時其處理方法被呼叫。處理方法的格式是:void handler(const boost::system::error_ code & err, size_t bytes);。你可以選擇指定一個完成處理方法。完成處理方法會在每個read操作呼叫成功之後呼叫,然後告訴Boost.Asio async_read操作是否完成(如果沒有完成,它會繼續讀取)。它的格式是:size_t completion(const boost::system::error_code& err, size_t bytes_transfered) 。當這個完成處理方法返回0時,我們認為read操作完成;如果它返回一個非0值,它表示了下一個async_read_some操作需要從流中讀取的位元組數。接下來會有一個例子來詳細展示這些。
  • async_write(stream, buffer [, completion], handler):這個方法非同步地向一個流寫入資料。引數的意義和async_read是一樣的。
  • read(stream, buffer [, completion]):這個方法同步地從一個流中讀取資料。引數的意義和async_read是一樣的。
  • write(stream, buffer [, completion]): 這個方法同步地向一個流寫入資料。引數的意義和async_read是一樣的。
async_read(stream, stream_buffer [, completion], handler)
async_write(strean, stream_buffer [, completion], handler)
write(stream, stream_buffer [, completion])
read(stream, stream_buffer [, completion]) 

首先,要注意第一個引數變成了流,而不單是socket。這個引數包含了socket但不僅僅是socket。比如,你可以用一個Windows的檔案控制代碼來替代socket。
當下面情況出現時,所有read和write操作都會結束:

  • 可用的緩衝區滿了(當讀取時)或者所有的緩衝區已經被寫入(當寫入時)
  • 完成處理方法返回0(如果你提供了這麼一個方法)
  • 錯誤發生時

下面的程式碼會非同步地從一個socket中間讀取資料直到讀取到’\n’:

io_service service;
ip::tcp::socket sock(service);
char buff[512];
int offset = 0;
size_t up_to_enter(const boost::system::error_code &, size_t bytes) {
    for ( size_t i = 0; i < bytes; ++i)
        if ( buff[i + offset] == '\n') 
            return 0;
    return 1; 
 }
void on_read(const boost::system::error_code &, size_t) {}
...
async_read(sock, buffer(buff), up_to_enter, on_read); 

Boost.Asio也提供了一些簡單的完成處理仿函式:

  • transfer_at_least(n)
  • transfer_exactly(n)
  • transfer_all()

例子如下:

char buff[512]; 
void on_read(const boost::system::error_code &, size_t) {} 
// 讀取32個位元組 
async_read(sock, buffer(buff), transfer_exactly(32), on_read);

上述的4個方法,不使用普通的緩衝區,而使用由Boost.Asio的std::streambuf類繼承來的stream_buffer方法。stl流和流緩衝區非常複雜;下面是例子:

io_service service;  
void on_read(streambuf& buf, const boost::system::error_code &, size_t) { 
    std::istream in(&buf);
    std::string line;
    std::getline(in, line);
    std::cout << "first line: " << line << std::endl; 
}
int main(int argc, char* argv[]) { 
    HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::stream_handle h(service, file);
    streambuf buf;
    async_read(h, buf, transfer_exactly(256), boost::bind(on_read,boost::ref(buf),_1,_2));
    service.run(); 
}

在這裡,我向你們展示瞭如何在一個Windows檔案控制代碼上呼叫async_read。讀取前256個字元,然後把它們儲存到緩衝區中,當操作結束時。on_read被呼叫,再建立std::istream用來傳遞緩衝區,讀取第一行(std::getline),最後把它輸出到命令列中。

read_until/async_read_until方法

這些方法在條件滿足之前會一直讀取:

  • async_read_until(stream, stream_buffer, delim, handler):這個方法啟動一個非同步read操作。read操作會在讀取到某個分隔符時結束。分隔符可以是字元,std::string或者boost::regex。處理方法的格式為:void handler(const boost::system::error_code & err, size_t bytes);
  • async_read_until(strem, stream_buffer, completion, handler):這個方法和之前的方法是一樣的,但是沒有分隔符,而是一個完成處理方法。完成處理方法的格式為:pair< iterator,bool > completion(iterator begin, iterator end);,其中迭代器的型別為buffers_iterator< streambuf::const_buffers_type >。你需要記住的是這個迭代器是支援隨機訪問的。你掃描整個區間(begin,end),然後決定read操作是否應該結束。返回的結果是一個結果對,第一個成員是一個迭代器,它指向最後被這個方法訪問的字元;第二個成員指定read操作是否需要結束,需要時返回true,否則返回false。
  • read_until(stream, stream_buffer, delim):這個方法執行一個同步的read操作,引數的意義和async_read_until一樣。
  • read_until(stream, stream_buffer, completion):這個方法執行一個同步的read操作,引數的意義和async_read_until一樣。

下面這個例子在讀到一個指定的標點符號之前會一直讀取:

typedef buffers_iterator<streambuf::const_buffers_type> iterator;
std::pair<iterator, bool> match_punct(iterator begin, iterator end) {
    while ( begin != end)
        if ( std::ispunct(*begin))
            return std::make_pair(begin,true);
    return std::make_pair(end,false);
}
void on_read(const boost::system::error_code &, size_t) {}
...
streambuf buf;
async_read_until(sock, buf, match_punct, on_read);

如果我們想讀到一個空格時就結束,我們需要把最後一行修改為:

async_read_until(sock, buff, ' ', on_read);

*_at方法

這些方法用來在一個流上面做隨機存取操作。由你來指定readwrite操作從什麼地方開始(offset):

  • async_read_at(stream, offset, buffer [, completion], handler):這個方法在指定的流的offset處開始執行一個非同步的read操作,當操作結束時,它會呼叫handler。handler的格式為:void handler(const boost::system::error_code& err, size_t bytes);buffer可以是普通的wrapper()封裝或者streambuf方法。如果你指定一個completion方法,它會在每次read操作成功之後呼叫,然後告訴Boost.Asio async_read_at操作已經完成(如果沒有,則繼續讀取)。它的格式為:size_t completion(const boost::system::error_code& err, size_t bytes);。當completion方法返回0時,我們認為read操作完成了;如果返回一個非零值,它代表了下一次呼叫流的async_read_some_at方法的最大讀取位元組數。
  • async_write_at(stream, offset, buffer [, completion], handler):這個方法執行一個非同步的write操作。引數的意義和async_read_at是一樣的
  • read_at(stream, offset, buffer [, completion]):這個方法在一個執行的流上,指定的offset處開始read。引數的意義和async_read_at是一樣的
  • write_at(stream, offset, buffer [, completion]):這個方法在一個執行的流上,指定的offset處開始write。引數的意義和async_read_at是一樣的

這些方法不支援套接字。它們用來處理流的隨機訪問;也就是說,流是可以隨機訪問的。套接字顯然不是這樣(套接字是不可回溯的)。

下面這個例子告訴你怎麼從一個檔案偏移為256的位置讀取128個位元組:

io_service service;
int main(int argc, char* argv[]) {
    HANDLE file = ::CreateFile("readme.txt", GENERIC_READ, 0, 0, OPEN_ALWAYS, FILE_ATTRIBUTE_NORMAL | FILE_FLAG_OVERLAPPED, 0);
    windows::random_access_handle h(service, file);
    streambuf buf;
    read_at(h, 256, buf, transfer_exactly(128));
    std::istream in(&buf);
    std::string line;
    std::getline(in, line);
    std::cout << "first line: " << line << std::endl;
}

非同步程式設計

這部分對非同步程式設計時可能碰到的一些問題進行了深入的探究。我建議你先讀一遍,然後在接下來讀這本書的過程中,再經常回過頭來看看,從而增強你對這部分的理解。

非同步的需求

就像我之前所說的,同步程式設計比非同步程式設計簡單很多。這是因為,線性的思考是很簡單的(呼叫A,呼叫A結束,呼叫B,呼叫B結束,然後繼續,這是以事件處理的方式來思考)。後面你會碰到這種情況,比如:五件事情,你不知道它們執行的順序,也不知道他們是否會執行!

儘管非同步程式設計更難,但是你會更傾向於選擇使用它,比如:寫一個需要處理很多併發訪問的服務端。併發訪問越多,非同步程式設計就比同步程式設計越簡單。

假設:你有一個需要處理1000個併發訪問的應用,從客戶端發給服務端的每個資訊都會再返回給客戶端,以‘\n’結尾。

同步方式的程式碼,1個執行緒:

using namespace boost::asio;
struct client {
    ip::tcp::socket sock;
    char buff[1024]; // 每個資訊最多這麼大
    int already_read; // 你已經讀了多少
};
std::vector<client> clients;
void handle_clients() {
    while ( true)
        for ( int i = 0; i < clients.size(); ++i)
            if ( clients[i].sock.available() ) on_read(clients[i]);
}
void on_read(client & c) {
    int to_read = std::min( 1024 - c.already_read, c.sock.available());
    c.sock.read_some( buffer(c.buff + c.already_read, to_read));
    c.already_read += to_read;
    if ( std::find(c.buff, c.buff + c.already_read, '\n') < c.buff + c.already_read) {
        int pos = std::find(c.buff, c.buff + c.already_read, '\n') - c.buff;
        std::string msg(c.buff, c.buff + pos);
        std::copy(c.buff + pos, c.buff + 1024, c.buff);
        c.already_read -= pos;
        on_read_msg(c, msg);
    }
}
void on_read_msg(client & c, const std::string & msg) {
    // 分析訊息,然後返回
    if ( msg == "request_login")
        c.sock.write( "request_ok\n");
    else if ...
}

有一種情況是在任何服務端(和任何基於網路的應用)都需要避免的,就是程式碼無響應的情況。在我們的例子裡,我們需要handle_clients()方法儘可能少的阻塞。如果方法在某個點上阻塞,任何進來的資訊都需要等待方法解除阻塞才能被處理。

為了保持響應,只在一個套接字有資料的時候我們才讀,也就是說,if ( clients[i].sock.available() ) on_read(clients[i])。在on_read時,我們只讀當前可用的;呼叫read_until(c.sock, buffer(…), ‘\n’)會是一個非常糟糕的選擇,因為直到我們從一個指定的客戶端讀取了完整的訊息之前,它都是阻塞的(我們永遠不知道它什麼時候會讀取到完整的訊息)

這裡的瓶頸就是on_read_msg()方法;當它執行時,所有進來的訊息都在等待。一個良好的on_read_msg()方法實現會保證這種情況基本不會發生,但是它還是會發生(有時候向一個套接字寫入資料,緩衝區滿了時,它會被阻塞)
同步方式的程式碼,10個執行緒

using namespace boost::asio;
struct client {
   // ... 和之前一樣
    bool set_reading() {
        boost::mutex::scoped_lock lk(cs_);
        if ( is_reading_) return false; // 已經在讀取
        else { is_reading_ = true; return true; }
    }
    void unset_reading() {
        boost::mutex::scoped_lock lk(cs_);
        is_reading_ = false;
    }
private:
    boost::mutex cs_;
    bool is_reading_;
};
std::vector<client> clients;
void handle_clients() {
    for ( int i = 0; i < 10; ++i)
        boost::thread( handle_clients_thread);
}
void handle_clients_thread() {
    while ( true)
        for ( int i = 0; i < clients.size(); ++i)
            if ( clients[i].sock.available() )
                if ( clients[i].set_reading()) {
                    on_read(clients[i]);
                    clients[i].unset_reading();
                }
}
void on_read(client & c) {
    // 和之前一樣
}
void on_read_msg(client & c, const std::string & msg) {
    // 和之前一樣
}

為了使用多執行緒,我們需要對執行緒進行同步,這就是set_reading()set_unreading()所做的。set_reading()方法非常重要,比如你想要一步實現“判斷是否在讀取然後標記為讀取中”。但這是有兩步的(“判斷是否在讀取”和“標記為讀取中”),你可能會有兩個執行緒同時為一個客戶端判斷是否在讀取,然後你會有兩個執行緒同時為一個客戶端呼叫on_read,結果就是資料衝突甚至導致應用崩潰。

你會發現程式碼變得極其複雜。

同步程式設計有第三個選擇,就是為每個連線開闢一個執行緒。但是當併發的執行緒增加時,這就成了一種災難性的情況。

然後,讓我們來看非同步程式設計。我們不斷地非同步讀取。當一個客戶端請求某些東西時,on_read被呼叫,然後迴應,然後等待下一個請求(然後開始另外一個非同步的read操作)。

非同步方式的程式碼,10個執行緒

using namespace boost::asio;
io_service service;
struct client {
    ip::tcp::socket sock;
    streambuf buff; // 從客戶端取回結果
}
std::vector<client> clients;
void handle_clients() {
    for ( int i = 0; i < clients.size(); ++i)
        async_read_until(clients[i].sock, clients[i].buff, '\n', boost::bind(on_read, clients[i], _1, _2));
    for ( int i = 0; i < 10; ++i)
        boost::thread(handle_clients_thread);
}
void handle_clients_thread() {
    service.run();
}
void on_read(client & c, const error_code & err, size_t read_bytes) {
    std::istream in(&c.buff);
    std::string msg;
    std::getline(in, msg);
    if ( msg == "request_login")
        c.sock.async_write( "request_ok\n", on_write);
    else if ...
    ...
    // 等待同一個客戶端下一個讀取操作
    async_read_until(c.sock, c.buff, '\n', boost::bind(on_read, c, _1, _2));
}

發現程式碼變得有多簡單了吧?client結構裡面只有兩個成員,handle_clients()僅僅呼叫了async_read_until,然後它建立了10個執行緒,每個執行緒都呼叫service.run()。這些執行緒會處理所有來自客戶端的非同步read操作,然後分發所有向客戶端的非同步write操作。另外需要注意的一件事情是:on_read()一直在為下一次非同步read操作做準備(看最後一行程式碼)。

非同步run(), run_one(), poll(), poll_ one()

為了實現監聽迴圈,io_service類提供了4個方法,比如:run(), run_one(), poll()poll_one()。雖然大多數時候使用service.run()就可以,但是你還是需要在這裡學習其他方法實現的功能。

持續執行

再一次說明,如果有等待執行的操作,run()會一直執行,直到你手動呼叫io_service::stop()。為了保證io_service一直執行,通常你新增一個或者多個非同步操作,然後在它們被執行時,你繼續一直不停地新增非同步操作,比如下面程式碼:

using namespace boost::asio;
io_service service;
ip::tcp::socket sock(service);
char buff_read[1024], buff_write[1024] = "ok";
void on_read(const boost::system::error_code &err, std::size_t bytes);
void on_write(const boost::system::error_code &err, std::size_t bytes)
{
    soc