1. 程式人生 > >boost C++知識點(五)

boost C++知識點(五)

5 非同步輸入輸出Asio 5.1 概述 本章介紹了 Boost C++ 庫 Asio,它是非同步輸入輸出的核心。 名字本身就說明了一切:Asio 意即非同步輸入/輸出。 該庫可以讓 C++ 非同步地處理資料,且平臺獨立。 非同步資料處理就是指,任務觸發後不需要等待它們完成。 相反,Boost.Asio 會在任務完成時觸發一個應用。 非同步任務的主要優點在於,在等待任務完成時不需要阻塞應用程式,可以去執行其它任務。

非同步任務的典型例子是網路應用。 如果資料被髮送出去了,比如傳送至 Internet,通常需要知道資料是否傳送成功。 如果沒有一個象 Boost.Asio 這樣的庫,就必須對函式的返回值進行求值。 但是,這樣就要求待至所有資料傳送完畢,並得到一個確認或是錯誤程式碼。 而使用 Boost.Asio,這個過程被分為兩個單獨的步驟:第一步是作為一個非同步任務開始資料傳輸。 一旦傳輸完成,不論成功或是錯誤,應用程式都會在第二步中得到關於相應的結果通知。 主要的區別在於,應用程式無需阻塞至傳輸完成,而可以在這段時間裡執行其它操作。

5.2. I/O 服務與 I/O 物件 使用 Boost.Asio 進行非同步資料處理的應用程式基於兩個概念:I/O 服務和 I/O 物件。 I/O 服務抽象了作業系統的介面,允許第一時間進行非同步資料處理,而 I/O 物件則用於初始化特定的操作。 鑑於 Boost.Asio 只提供了一個名為 boost::asio::io_service 的類作為 I/O 服務,它針對所支援的每一個作業系統都分別實現了優化的類,另外庫中還包含了針對不同 I/O 物件的幾個類。 其中,類 boost::asio::ip::tcp::socket 用於通過網路傳送和接收資料,而類 boost::asio::deadline_timer 則提供了一個計時器,用於測量某個固定時間點到來或是一段指定的時長過去了。 以下第一個例子中就使用了計時器,因為與 Asio 所提供的其它 I/O 物件相比較而言,它不需要任何有關於網路程式設計的知識。

#include <boost/asio.hpp> 
#include <iostream> 

void handler(const boost::system::error_code &ec) 
{ 
  std::cout << "5 s." << std::endl; 
} 

int main() 
{ 
  boost::asio::io_service io_service; 
  boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(5)); 
  timer.async_wait(handler); 
  io_service.run(); 
} 

函式 main() 首先定義了一個 I/O 服務 io_service,用於初始化 I/O 物件 timer。 就象 boost::asio::deadline_timer 那樣,所有 I/O 物件通常都需要一個 I/O 服務作為它們的建構函式的第一個引數。 由於 timer 的作用類似於一個鬧鐘,所以 boost::asio::deadline_timer 的建構函式可以傳入第二個引數,用於表示在某個時間點或是在某段時長之後鬧鐘停止。 以上例子指定了五秒的時長,該鬧鐘在 timer 被定義之後立即開始計時。

雖然我們可以呼叫一個在五秒後返回的函式,但是通過呼叫方法 async_wait() 並傳入 handler() 函式的名字作為唯一引數,可以讓 Asio 啟動一個非同步操作。 請留意,我們只是傳入了 handler() 函式的名字,而該函式本身並沒有被呼叫。

async_wait() 的好處是,該函式呼叫會立即返回,而不是等待五秒鐘。 一旦鬧鐘時間到,作為引數所提供的函式就會被相應呼叫。 因此,應用程式可以在呼叫了 async_wait() 之後執行其它操作,而不是阻塞在這裡。

象 async_wait() 這樣的方法被稱為是非阻塞式的。 I/O 物件通常還提供了阻塞式的方法,可以讓執行流在特定操作完成之前保持阻塞。 例如,可以呼叫阻塞式的 wait() 方法,取代 boost::asio::deadline_timer 的呼叫。 由於它會阻塞呼叫,所以它不需要傳入一個函式名,而是在指定時間點或指定時長之後返回。

再看看上面的原始碼,可以留意到在呼叫 async_wait() 之後,又在 I/O 服務之上呼叫了一個名為 run() 的方法。這是必須的,因為控制權必須被作業系統接管,才能在五秒之後呼叫 handler() 函式。

async_wait() 會啟動一個非同步操作並立即返回,而 run() 則是阻塞的。因此呼叫 run() 後程序執行會停止。 具有諷刺意味的是,許多作業系統只是通過阻塞函式來支援非同步操作。 以下例子顯示了為什麼這個限制通常不會成為問題。

#include <boost/asio.hpp> 
#include <iostream> 

void handler1(const boost::system::error_code &ec) 
{ 
  std::cout << "5 s." << std::endl; 
} 

void handler2(const boost::system::error_code &ec) 
{ 
  std::cout << "10 s." << std::endl; 
} 

int main() 
{ 
  boost::asio::io_service io_service; 
  boost::asio::deadline_timer timer1(io_service, boost::posix_time::seconds(5)); 
  timer1.async_wait(handler1); 
  boost::asio::deadline_timer timer2(io_service, boost::posix_time::seconds(10)); 
  timer2.async_wait(handler2); 
  io_service.run(); 
} 

上面的程式用了兩個 boost::asio::deadline_timer 型別的 I/O 物件。 第一個 I/O 物件表示一個五秒後觸發的鬧鐘,而第二個則表示一個十秒後觸發的鬧鐘。 每一段指定時長過去後,都會相應地呼叫函式 handler1() 和 handler2()。

在 main() 的最後,再次在唯一的 I/O 服務之上呼叫了 run() 方法。 如前所述,這個函式將阻塞執行,把控制權交給作業系統以接管非同步處理。 在作業系統的幫助下,handler1() 函式會在五秒後被呼叫,而 handler2() 函式則在十秒後被呼叫。

乍一看,你可能會覺得有些奇怪,為什麼非同步處理還要呼叫阻塞式的 run() 方法。 然而,由於應用程式必須防止被中止執行,所以這樣做實際上不會有任何問題。 如果 run() 不是阻塞的,main() 就會結束從而中止該應用程式。 如果應用程式不應被阻塞,那麼就應該在一個新的執行緒內部呼叫 run(),它自然就會僅僅阻塞那個執行緒。

一旦特定的 I/O 服務的所有非同步操作都完成了,控制權就會返回給 run() 方法,然後它就會返回。 以上兩個例子中,應用程式都會在鬧鐘到時間後馬上結束。

5.3. 可擴充套件性與多執行緒 用 Boost.Asio 這樣的庫來開發應用程式,與一般的 C++ 風格不同。 那些可能需要較長時間才返回的函式不再是以順序的方式來呼叫。 不再是呼叫阻塞式的函式,Boost.Asio 是啟動一個非同步操作。 而那些需要在操作結束後呼叫的函式則實現為相應的控制代碼。 這種方法的缺點是,本來順序執行的功能變得在物理上分割開來了,從而令相應的程式碼更難理解。

象 Boost.Asio 這樣的庫通常是為了令應用程式具有更高的效率。 應用程式不需要等待特定的函式執行完成,而可以在期間執行其它任務,如開始另一個需要較長時間的操作。

可擴充套件性是指,一個應用程式從新增資源有效地獲得好處的能力。 如果那些執行時間較長的操作不應該阻塞其它操作的話,那麼建議使用 Boost.Asio. 由於現今的PC機通常都具有多核處理器,所以執行緒的應用可以進一步提高一個基於 Boost.Asio 的應用程式的可擴充套件性。

如果在某個 boost::asio::io_service 型別的物件之上呼叫 run() 方法,則相關聯的控制代碼也會在同一個執行緒內被執行。 通過使用多執行緒,應用程式可以同時呼叫多個 run() 方法。 一旦某個非同步操作結束,相應的 I/O 服務就將在這些執行緒中的某一個之中執行控制代碼。 如果第二個操作在第一個操作之後很快也結束了,則 I/O 服務可以在另一個執行緒中執行控制代碼,而無需等待第一個控制代碼終止。

要注意,使用執行緒並不總是值得的。 以上例子的執行會導致不同資訊在標準輸出流上混合輸出,因為這兩個控制代碼可能會並行執行,訪問同一個共享資源:標準輸出流 std::cout。 這種訪問必須被同步,以保證每一條資訊在另一個執行緒可以向標準輸出流寫出另一條資訊之前被完全寫出。 在這種情形下使用執行緒並不能提供多少好處,如果各個獨立控制代碼不能獨立地並行執行。

多次呼叫同一個 I/O 服務的 run() 方法,是為基於 Boost.Asio 的應用程式增加可擴充套件性的推薦方法。 另外還有一個不同的方法:不要繫結多個執行緒到單個 I/O 服務,而是建立多個 I/O 服務。 然後每一個 I/O 服務使用一個執行緒。 如果 I/O 服務的數量與系統的處理器核心數量相匹配,則非同步操作都可以在各自的核心上執行。

#include <boost/asio.hpp> 
#include <boost/thread.hpp> 
#include <iostream> 

void handler1(const boost::system::error_code &ec) 
{ 
  std::cout << "5 s." << std::endl; 
} 

void handler2(const boost::system::error_code &ec) 
{ 
  std::cout << "5 s." << std::endl; 
} 

boost::asio::io_service io_service1; 
boost::asio::io_service io_service2; 

void run1() 
{ 
  io_service1.run(); 
} 

void run2() 
{ 
  io_service2.run(); 
} 

int main() 
{ 
  boost::asio::deadline_timer timer1(io_service1, boost::posix_time::seconds(5)); 
  timer1.async_wait(handler1); 
  boost::asio::deadline_timer timer2(io_service2, boost::posix_time::seconds(5)); 
  timer2.async_wait(handler2); 
  boost::thread thread1(run1); 
  boost::thread thread2(run2); 
  thread1.join(); 
  thread2.join(); 
} 

前面的那個使用兩個計時器的例子被重寫為使用兩個 I/O 服務。 這個應用程式仍然基於兩個執行緒;但是現在每個執行緒被繫結至不同的 I/O 服務。 此外,兩個 I/O 物件 timer1 和 timer2 現在也被繫結至不同的 I/O 服務。

這個應用程式的功能與前一個相同。 在一定條件下使用多個 I/O 服務是有好處的,每個 I/O 服務有自己的執行緒,最好是執行在各自的處理器核心上,這樣每一個非同步操作連同它們的控制代碼就可以區域性化執行。 如果沒有遠端的資料或函式需要訪問,那麼每一個 I/O 服務就象一個小的自主應用。 這裡的區域性和遠端是指象快取記憶體、記憶體頁這樣的資源。 由於在確定優化策略之前需要對底層硬體、作業系統、編譯器以及潛在的瓶頸有專門的瞭解,所以應該僅在清楚這些好處的情況下使用多個 I/O 服務。

5.4. 網路程式設計 雖然 Boost.Asio 是一個可以非同步處理任何種類資料的庫,但是它主要被用於網路程式設計。 這是由於,事實上 Boost.Asio 在加入其它 I/O 物件之前很久就已經支援網路功能了。 網路功能是非同步處理的一個很好的例子,因為通過網路進行資料傳輸可能會需要較長時間,從而不能直接獲得確認或錯誤條件。

Boost.Asio 提供了多個 I/O 物件以開發網路應用。 以下例子使用了 boost::asio::ip::tcp::socket 類來建立與中另一臺PC的連線,並下載 ‘Highscore’ 主頁;就象一個瀏覽器在指向 www.highscore.de 時所要做的。

#include <boost/asio.hpp> 
#include <boost/array.hpp> 
#include <iostream> 
#include <string> 

boost::asio::io_service io_service; 
boost::asio::ip::tcp::resolver resolver(io_service); 
boost::asio::ip::tcp::socket sock(io_service); 
boost::array<char, 4096> buffer; 

void read_handler(const boost::system::error_code &ec, std::size_t bytes_transferred) 
{ 
  if (!ec) 
  { 
    std::cout << std::string(buffer.data(), bytes_transferred) << std::endl; 
    sock.async_read_some(boost::asio::buffer(buffer), read_handler); 
  } 
} 

void connect_handler(const boost::system::error_code &ec) 
{ 
  if (!ec) 
  { 
    boost::asio::write(sock, boost::asio::buffer("GET / HTTP 1.1\r\nHost: highscore.de\r\n\r\n")); 
    sock.async_read_some(boost::asio::buffer(buffer), read_handler); 
  } 
} 

void resolve_handler(const boost::system::error_code &ec, boost::asio::ip::tcp::resolver::iterator it) 
{ 
  if (!ec) 
  { 
    sock.async_connect(*it, connect_handler); 
  } 
} 

int main() 
{ 
  boost::asio::ip::tcp::resolver::query query("www.highscore.de", "80"); 
  resolver.async_resolve(query, resolve_handler); 
  io_service.run(); 
} 

網際網路使用了所謂的IP地址來標識每臺PC。 IP地址實際上只是一長串數字,難以記住。 而記住象 www.highscore.de 這樣的名字就容易得多。 為了在網際網路上使用類似的名字,需要通過一個叫作域名解析的過程將它們翻譯成相應的IP地址。 這個過程由所謂的域名解析器來完成,對應的 I/O 物件是:boost::asio::ip::tcp::resolver。

因為接收資料需要一個成功的連線,進而需要一次成功的域名解析,所以這三個不同的非同步操作要以三個不同的控制代碼來啟動。 resolve_handler() 訪問 I/O 物件 sock,用由迭代器 it 所提供的解析後地址建立一個連線。 而 sock 也在 connect_handler() 的內部被使用,傳送 HTTP 請求並啟動資料的接收。 因為所有這些操作都是非同步的,各個控制代碼的名字被作為引數傳遞。 取決於各個控制代碼,需要相應的其它引數,如指向解析後地址的迭代器 it 或用於儲存接收到的資料的緩衝區 buffer。

開始執行後,該應用將建立一個型別為 boost::asio::ip::tcp::resolver::query 的物件 query,表示一個查詢,其中含有名字 www.highscore.de 以及網際網路常用的埠80。 這個查詢被傳遞給 async_resolve() 方法以解析該名字。 最後,main() 只要呼叫 I/O 服務的 run() 方法,將控制交給作業系統進行非同步操作即可。

當域名解析的過程完成後,resolve_handler() 被呼叫,檢查域名是否能被解析。 如果解析成功,則存有錯誤條件的物件 ec 被設為0。 只有在這種情況下,才會相應地訪問 socket 以建立連線。 伺服器的地址是通過型別為 boost::asio::ip::tcp::resolver::iterator 的第二個引數來提供的。

呼叫了 async_connect() 方法之後,connect_handler() 會被自動呼叫。 在該控制代碼的內部,會訪問 ec 物件以檢查連線是否已建立。 如果連線是有效的,則對相應的 socket 呼叫 async_read_some() 方法,啟動讀資料操作。 為了儲存接收到的資料,要提供一個緩衝區作為第一個引數。 在以上例子中,緩衝區的型別是 boost::array,它來自 Boost C++ 庫 Array,定義於 boost/array.hpp. 請留意,read_handler() 在將資料寫出至 std::cout 之後,會再次呼叫 async_read_some() 方法。 這是必需的,因為無法保證僅在一次非同步操作中就可以接收到整個網頁。 async_read_some() 和 read_handler() 的交替呼叫只有當連線被破壞時才中止,如當 web 伺服器已經傳送完整個網頁時。 這種情況下,在 read_handler() 內部將報告一個錯誤,以防止進一步將資料輸出至標準輸出流,以及進一步對該 socket 呼叫 async_read() 方法。 這時該例程將停止,因為沒有更多的非同步操作了。

#include <boost/asio.hpp> 
#include <string> 

boost::asio::io_service io_service; 
boost::asio::ip::tcp::endpoint endpoint(boost::asio::ip::tcp::v4(), 80); 
boost::asio::ip::tcp::acceptor acceptor(io_service, endpoint); 
boost::asio::ip::tcp::socket sock(io_service); 
std::string data = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, world!"; 

void write_handler(const boost::system::error_code &ec, std::size_t bytes_transferred) 
{ 
} 

void accept_handler(const boost::system::error_code &ec) 
{ 
  if (!ec) 
  { 
    boost::asio::async_write(sock, boost::asio::buffer(data), write_handler); 
  } 
} 

int main() 
{ 
  acceptor.listen(); 
  acceptor.async_accept(sock, accept_handler); 
  io_service.run(); 
} 

接收器初始化完成後,main() 首先呼叫 listen() 方法將接收器置於接收狀態,然後再用 async_accept() 方法等待初始連線。 用於傳送和接收資料的 socket 被作為第一個引數傳遞。

當一個PC試圖建立一個連線時,accept_handler() 被自動呼叫。 如果該連線請求成功,就執行自由函式 boost::asio::async_write() 來通過 socket 傳送儲存在 data 中的資訊。 boost::asio::ip::tcp::socket 還有一個名為 async_write_some() 的方法也可以傳送資料;不過它會在傳送了至少一個位元組之後呼叫相關聯的控制代碼。 該控制代碼需要計算還剩餘多少位元組,並反覆呼叫 async_write_some() 直至所有位元組傳送完畢。 而使用 boost::asio::async_write() 可以避免這些,因為這個非同步操作僅在緩衝區的所有位元組都被髮送後才結束。

在這個例子中,當所有資料傳送完畢,空函式 write_handler() 將被呼叫。 由於所有非同步操作都已完成,所以應用程式終止。 與其它PC的連線也被相應關閉。