1. 程式人生 > >Boost.Asio的使用技巧

Boost.Asio的使用技巧

客戶端 回調函數 wait opera 不變 自主 內核 異步調用 patch

  • 基本概念
    • Asio proactor
  • I/O服務
    • work類
    • run() vs poll()
    • stop()
    • post() vs dispatch()
    • buffer類
    • 緩沖區管理
  • I/O對象
    • socket
    • 信號處理
    • 定時器
    • strand
  • 參考

最近嘗試使用了一下Boost.Asio,不知道是否因為各大公司都有自己相對成熟的網絡庫的緣故,網絡上Asio相關的資料實在不多,而且很多翻來覆去就是那幾個簡單的示例,所以打算自己小結一下。總的來說Boost.Asio是個非常易用的庫,避免了你在各種系統底層API之間的掙紮,讓你可以非常迅速的開發出高並發的網絡服務器程序。

基本概念

asio基於兩個概念:

  • I/O服務,抽象了操作系統的異步接口 boost::asio::io_service::service

    • boost::asio::io_service
  • I/O對象,有多種對象 boost::asio::basic_io_object

    • boost::asio::ip::tcp::socket
    • boost::asio::ip::tcp::resolver
    • boost::asio::ip::tcp::acceptor
    • boost::asio::local::stream_protocol::socket 本地連接
    • boost::asio::posix::stream_descriptor
      面向流的文件描述符,比如stdout, stdin
    • boost::asio::deadline_timer 定時器
    • boost::asio::signal_set 信號處理

所有 I/O 對象通常都需要一個 I/O 服務作為它們的構造函數的第一個參數,比如:

boost::asio::io_service io_service;
boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(5));

在一定條件下使用多個 io_service 是有好處的,每個 io_service 有自己的線程,最好是運行在各自的處理器內核上,這樣每一個異步操作連同它們的句柄就可以局部化執行。 如果沒有遠端的數據或函數需要訪問,那麽每一個 io_service

就象一個小的自主應用。 這裏的局部和遠端是指象高速緩存、內存頁這樣的資源。 由於在確定優化策略之前需要對底層硬件、操作系統、編譯器以及潛在的瓶頸有專門的了解,所以應該僅在清楚這些好處的情況下使用多個 io_service

Asio proactor

和我們熟知的Reactor不同,Asio使用Proactor模型:

技術分享

  1. Initiator使用Asynchronous Operation Processor發起異步I/O操作
  2. 保存每個異步I/O操作的參數,包括回調函數的地址,並將其放入Completion Event Queue
  3. Proactor調用Asynchronous Event Demultiplexer檢測完成事件。
  4. 當檢測到I/O操作完成事件,從Completion Event Queue中取出對應的異步I/O操作,並且dispatch到相應的Completion Handler。
  5. Completion Handler調用回調函數。

可以看出asio本質就是維護著一個任務隊列,調用post()方法接收handler作為參數加入隊列,或者調用async_*()方法接收handler作為參數和對應的I/O對象加入隊列(handler實際借助boost::bind成為一個closure,可以復制到隊列),在Linux系統下會在epoll空閑時或有I/O事件觸發後執行。但是asio與Reactor不同的地方在於前者當事件到來時會自動讀寫緩沖區,等I/O操作完成後才調用原先註冊的handler,把執行流交割;而Reactor當事件到來時,即交由調用者自己處理剩下的I/O操作。

I/O服務

work類

work類用於通知io_service是否可以結束,只要對象work(io_service)存在,io_service就不會結束。所以work類用起來更像是一個標識,比如:

boost::asio::io_service io_service;
boost::asio::io_service::work* work = new boost::asio::io_service::work( io_service );
// delete work; // 如果不註釋掉這一句,則run loop不會退出;一般用shared_ptr維護work對象,使用work.reset()來結束其生命周期。
io_service.run()

run() vs poll()

run()poll()都循環執行I/O對象的事件,區別在於如果事件沒有被觸發(ready),run()會等待,但是poll()會立即返回。也就是說poll()只會執行已經觸發的I/O事件。

比如I/O對象socket1, socket2, socket3都綁定了socket.async_read_some()事件,而此時socket1socket3有數據過來。則調用poll()會執行socket1socket3相應的handler,然後返回;而調用run()也會執行socket1socket3的相應的handler,但會繼續等待socket2的讀事件。

stop()

調用 io_service.stop() 會中止 run loop,一般在多線程中使用。

post() vs dispatch()

post()dispatch()都是要求io_service執行一個handler,但是dispatch()要求立即執行,而post()總是先把該handler加入事件隊列。

什麽時候需要使用post()?當不希望立即調用一個handler,而是異步調用該handler,則應該調用post()把該handler交由io_service放到事件隊列裏去執行。比如,Boost.Asio自帶的聊天室示例,其中實現了一個支持異步IO的聊天室客戶端,是個很好的例子。

chat_client.cpp 的write()函數之所以要使用post(),是為了避免臨界區同步問題。write()調用和do_write()async_write()的執行分別屬於兩個線程,前者會往write_msgs_裏寫數據,而後者會從write_msgs_裏讀數據,如果不使用post(),而直接調用do_write(),顯然需要使用鎖來同步write_msgs_。但是使用post()相當於由io_service來調度write_msgs_的讀寫,這就在一個線程內完成,無需額外的鎖機制。

buffer類

buffer類分mutable_bufferconst_buffer兩個類,buffer類特別簡單,僅有兩個成員變量:指向數據的指針 和 相應的數據長度。buffer類本身並不申請內存,只是提供了一個對現有內存的封裝。

需要註意的是,所有async_write()async_read()之類函數接受的buffer類型是 MutableBufferSequence / ConstBufferSequence,這意味著它們既可以接受boost::asio::buffer,也可以接受 std::vector<boost::asio::buffer> 這樣的類型。

緩沖區管理

緩沖區的生命期是使用asio最需要重視的兩件事之一,緩沖區之所以需要重視的原因在於Asio異步調用Reference裏的這段描述:

Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

這意味著緩沖區從發起異步調用到handler被執行,這段時間內需要交由io_service控制,這個限制常常導致asio的某些代碼變得可能比Reactor相應代碼還要麻煩一些。

還是舉上面聊天室的那個例子。chat_client.cpp 的do_write()函數收到用戶輸入數據後,之所以把該數據保存到std::deque<std::string> write_msgs_ 隊列,而不是存到類似char data[]的數組裏,然後去調用async_write(..data..)發送數據,是為了避免這種情況:輸入數據速度過快,當上一次async_write()調用的handler還沒有來得及處理,又收到一份新的數據,如果直接保存到data,會導致覆蓋上一次async_write()的緩沖區。async_write()要求這個緩沖區從調用async_write()開始,直到handler處理這個時間段是不變的。

同樣的,在do_write()函數裏調用async_write()函數之前,先判斷write_msgs_隊列是否為空,也是為了保證async_write()總是從write_msgs_隊列頭取得有效的數據,而在handle_write()裏當數據發送完畢後,再pop_front()彈出已經發送的數據包。以此避免出現前一個async_write()的handler還沒執行完畢,就把隊列頭彈出去,導致對應的緩沖區失效問題。

這裏主要還是因為async_write()async_read()的區別,前者是主動發起的,後者可以由io_service控制,所以後者不用擔心這種緩沖區被覆蓋問題。因為在同一個線程裏,哪怕需要讀取的事件觸發得再快,也需要由io_service逐一處理。

在這個聊天室的例子裏,如果不考慮把數據按用戶輸入順序發送出去的話,可以使用更簡單的辦法來處理do_write()函數,例如:

:::c++
void do_write(chat_message msg)
{
    chat_message* pmsg = new chat_message(msg); // implement copy ctor for chat_message firstly
    boost::asio::async_write(socket_,
            boost::asio::buffer(pmsg->data(), pmsg->length()),
            boost::bind(&chat_client::handle_write, this,
                    boost::asio::placeholders::error, pmsg));
}

void handle_write(const boost::system::error_code& error, chat_message* pmsg)
{
    if (!error) {

    }else{
        do_close();
    }
    delete pmsg;
}

這裏相當於給每個異步調用分配一塊屬於自己的內存,異步調用完成即自動釋放掉,有些類似於閉包了。如果不希望頻繁new/delete內存,也可以考慮使用boost::circular_buffer一次性分配內存後逐項使用。

I/O對象

socket

Boost.Asio最常用的對象應該就是socket了,常用的函數一般有這幾個:

  • 讀寫TCP socket的時候,一般使用read(), async_read(), write(), async_write(),為了避免所謂的short reads and writes,一般不使用receive(), async_receive(), send(), async_send()
  • 讀寫有連接的UDP socket的時候,一般使用receive(), async_receive(), send(), async_send()
  • 讀寫無連接的UDP socket的時候,一般使用receive_from(), async_receive_from(), send_to(), async_send_to()

而自由函數boost::asio::async_write()和類成員函數socket.async_write_some()的有什麽區別呢(boost::asio::async_read()socket.async_read_some()類似):

  • boost::asio::async_write() 異步寫,立即返回。但它可以保證寫完整個緩沖區的內容,否則將報錯。
  • boost::asio::async_write() 是通過調用n次socket.async_write_some() 來實現的,所以代碼必須確保在boost::asio::async_write()執行的時候,沒有其他的寫操作在同一socket上執行。
  • 在調用boost::asio::async_write()的時候,如果指定buffer的length沒有寫完或出錯,是不會回調相應的handler的,它將一直在run loop中執行;直到buffer裏所有的數據都寫完或出錯(此時handler裏返回的長度肯定會小於buffer length),才會調用handler繼續處理;而socket.async_write_some()不會有這樣的問題,它只會嘗試寫一次,寫完的長度會在handler的參數裏返回。

所以,這裏強調使用asio時第二件需要重視的事情,就是handler的返回值(一般可能聲明為boost::asio::placeholders::error)。因為asio裏所有的任務都由io_service異步執行,只有執行成功或者失敗之後才會回調handler,所以返回值是你了解當前異步操作狀況的唯一辦法,記住不要忽略任何handler的返回值處理。

信號處理

Boost.Asio的信號處理非常簡單,聲明一個信號集合,然後把相應的異步handler綁上就可以了。如果你希望在一個信號集裏處理所有的信號,那麽你可以根據handler的第二個參數,來獲取當前觸發的是那個信號。比如:

boost::asio::signal_set signals(io_service, SIGINT, SIGTERM);
signals.add(SIGUSR1); // 也可以直接用add函數添加信號

signals.async_wait(boost::bind(handler, _1, _2));

void handler(
  const boost::system::error_code& error,
  int signal_number // 通過這個參數獲取當前觸發的信號值
);

定時器

Boost.Asio的定時器用起來根信號集一樣簡單,但由於它太過簡單,也有不方便的地方。比如,在一個UDP伺服器裏,一般收到的每個UDP包中都會包含一個sequence number,用於標識該UDP,以應對包處理超時情況。假設每個UDP包處理時間只有100ms,如果超時則直接給客戶端返回超時標記。這種最簡單的定時器常用的一些Reactor框架都有很完美的解決方案,一般是建一個定時器鏈表來實現,但是Asio中的定時器沒法單獨完成這個工作。

boost::asio::deadline_timer 只有兩種狀態:超時和未超時。所以,只能很土的對每個UDP包創建一個定時器,然後借助std::mapboost::shared_ptr保存sequence number到定時器的映射,根據定時器handler的返回值判斷該定時器是超時,還是被主動cancel。

strand

在多線程中,多個I/O對象的handler要訪問同一塊臨界區,此時可以使用strand來保證這些handler之間的同步。

示例:

我們向定時器註冊 func1 和 func2,它們可能會同時訪問全局的對象(比如 std::cout )。這時我們希望對 func1 和 func2 的調用是同步的,即執行其中一個的時候,另一個要等待。

這時就可以用到 boost::asio::strand 類,它可以把幾個cmd包裝成同步執行的。例如,我們向定時器註冊 func1 和 func2 時,可以改為:

boost::asio::strand  the_strand;
t1.async_wait(the_strand.wrap(func1));      //包裝為同步執行的
t2.async_wait(the_strand.wrap(func2));

這樣就保證了在任何時刻,func1 和 func2 都不會同時在執行。

還有就是如果你希望把一個io_service對象綁定到多個線程。此時需要boost::asio::strand來確保handler不會被同時執行,因為異步操作,比如async_writeasync_receive_from之類會影響到臨界區buffer。

具體可參考asio examples裏的示例:HTTP Server 2和HTTP Server 3的connection.hpp設計。

參考

關於boost asio最好的參考書當然就是官方文檔和它的源碼,此外還有兩個不錯的資料:

  • The Boost C++ Libraries, Chapter 7: Asynchronous Input and Output 這本書已經有中文翻譯了
  • A guide to getting started with boost::asio

from:http://blog.jqian.net/post/boost-asio.html

Boost.Asio的使用技巧