1. 程式人生 > >Boost Asio的使用技巧

Boost Asio的使用技巧

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow

也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!

               

最近嘗試使用了一下Boost.Asio,不知道是否因為各大公司都有自己相對成熟的網路庫的緣故,網路上Asio相關的資料實在不多,而且很多翻來覆去就是那幾個簡單的示例,所以打算自己小結一下。總的來說Boost.Asio是個非常易用的庫,避免了你在各種系統底層API之間的掙扎,讓你可以非常迅速的開發出高併發的網路伺服器程式。

基本概念

asio基於兩個概念:

  • I/O服務,抽象了作業系統的非同步介面 boost::asio::io_service::service
    • boost::asio::io_service
  • I/O物件,有多種物件 boost::asio::basic_io_object
    • boost::asio::ip::tcp::socket
    • boost::asio::ip::tcp::resolver
    • boost::asio::ip::tcp::acceptor
    • boost::asio::local::stream_protocol::socket
       本地連線
    • boost::asio::posix::stream_descriptor 面向流的檔案描述符,比如stdoutstdin
    • boost::asio::deadline_timer 定時器
    • boost::asio::signal_set 訊號處理

所有 I/O 物件通常都需要一個 I/O 服務作為它們的建構函式的第一個引數,比如:

boost::asio::io_service io_service;boost::asio::deadline_timer timer(io_service, boost::posix_time::seconds(5));

在一定條件下使用多個 io_service 是有好處的,每個 io_service 有自己的執行緒,最好是執行在各自的處理器核心上,這樣每一個非同步操作連同它們的控制代碼就可以區域性化執行。 如果沒有遠端的資料或函式需要訪問,那麼每一個 io_service 就象一個小的自主應用。 這裡的區域性和遠端是指象快取記憶體、記憶體頁這樣的資源。 由於在確定優化策略之前需要對底層硬體、作業系統、編譯器以及潛在的瓶頸有專門的瞭解,所以應該僅在清楚這些好處的情況下使用多個 io_service

Asio proactor

和我們熟知的Reactor不同,Asio使用Proactor模型:

  1. Initiator使用Asynchronous Operation Processor發起非同步I/O操作
  2. 儲存每個非同步I/O操作的引數,包括回撥函式的地址,並將其放入Completion Event Queue
  3. Proactor呼叫Asynchronous Event Demultiplexer檢測完成事件。
  4. 當檢測到I/O操作完成事件,從Completion Event Queue中取出對應的非同步I/O操作,並且dispatch到相應的Completion Handler。
  5. Completion Handler呼叫回撥函式。

可以看出asio本質就是維護著一個任務佇列,呼叫post()方法接收handler作為引數加入佇列,或者呼叫async_*()方法接收handler作為引數和對應的I/O物件加入佇列(handler實際藉助boost::bind成為一個closure,可以複製到佇列),在Linux系統下會在epoll空閒時或有I/O事件觸發後執行。但是asio與Reactor不同的地方在於前者當事件到來時會自動讀寫緩衝區,等I/O操作完成後才呼叫原先註冊的handler,把執行流交割;而Reactor當事件到來時,即交由呼叫者自己處理剩下的I/O操作。

I/O服務

work類

work類用於通知io_service是否可以結束,只要物件work(io_service)存在,io_service就不會結束。所以work類用起來更像是一個標識,比如:

boost::asio::io_service io_service;boost::asio::io_service::work* work = new boost::asio::io_service::work( io_service );// delete work; // 如果不註釋掉這一句,則run loop不會退出;一般用shared_ptr維護work物件,使用work.reset()來結束其生命週期。io_service.run()

run() vs poll()

run()poll()都迴圈執行I/O物件的事件,區別在於如果事件沒有被觸發(ready),run()會等待,但是poll()會立即返回。也就是說poll()只會執行已經觸發的I/O事件。

比如I/O物件socket1socket2socket3都綁定了socket.async_read_some()事件,而此時socket1socket3有資料過來。則呼叫poll()會執行socket1socket3相應的handler,然後返回;而呼叫run()也會執行socket1socket3的相應的handler,但會繼續等待socket2的讀事件。

stop()

呼叫 io_service.stop() 會中止 run loop,一般在多執行緒中使用。

post() vs dispatch()

post()dispatch()都是要求io_service執行一個handler,但是dispatch()要求立即執行,而post()總是先把該handler加入事件佇列。

什麼時候需要使用post()?當不希望立即呼叫一個handler,而是非同步呼叫該handler,則應該呼叫post()把該handler交由io_service放到事件佇列裡去執行。比如,Boost.Asio自帶的聊天室示例,其中實現了一個支援非同步IO的聊天室客戶端,是個很好的例子。

chat_client.cpp 的write()函式之所以要使用post(),是為了避免臨界區同步問題。write()呼叫和do_write()async_write()的執行分別屬於兩個執行緒,前者會往write_msgs_裡寫資料,而後者會從write_msgs_裡讀資料,如果不使用post(),而直接呼叫do_write(),顯然需要使用鎖來同步write_msgs_。但是使用post()相當於由io_service來排程write_msgs_的讀寫,這就在一個執行緒內完成,無需額外的鎖機制。

buffer類

buffer類分mutable_bufferconst_buffer兩個類,buffer類特別簡單,僅有兩個成員變數:指向資料的指標 和 相應的資料長度。buffer類本身並不申請記憶體,只是提供了一個對現有記憶體的封裝。

需要注意的是,所有async_write()async_read()之類函式接受的buffer型別是MutableBufferSequence / ConstBufferSequence,這意味著它們既可以接受boost::asio::buffer,也可以接受 std::vector<boost::asio::buffer> 這樣的型別。

緩衝區管理

緩衝區的生命期是使用asio最需要重視的兩件事之一,緩衝區之所以需要重視的原因在於Asio非同步呼叫Reference裡的這段描述:

Although the buffers object may be copied as necessary, ownership of the underlying memory blocks is retained by the caller, which must guarantee that they remain valid until the handler is called.

這意味著緩衝區從發起非同步呼叫到handler被執行,這段時間內需要交由io_service控制,這個限制常常導致asio的某些程式碼變得可能比Reactor相應程式碼還要麻煩一些。

還是舉上面聊天室的那個例子。chat_client.cpp 的do_write()函式收到使用者輸入資料後,之所以把該資料儲存到std::deque<std::string> write_msgs_ 佇列,而不是存到類似char data[]的數組裡,然後去呼叫async_write(..data..)傳送資料,是為了避免這種情況:輸入資料速度過快,當上一次async_write()呼叫的handler還沒有來得及處理,又收到一份新的資料,如果直接儲存到data,會導致覆蓋上一次async_write()的緩衝區。async_write()要求這個緩衝區從呼叫async_write()開始,直到handler處理這個時間段是不變的。

同樣的,在do_write()函式裡呼叫async_write()函式之前,先判斷write_msgs_佇列是否為空,也是為了保證async_write()總是從write_msgs_佇列頭取得有效的資料,而在handle_write()裡當資料傳送完畢後,再pop_front()彈出已經發送的資料包。以此避免出現前一個async_write()的handler還沒執行完畢,就把佇列頭彈出去,導致對應的緩衝區失效問題。

這裡主要還是因為async_write()async_read()的區別,前者是主動發起的,後者可以由io_service控制,所以後者不用擔心這種緩衝區被覆蓋問題。因為在同一個執行緒裡,哪怕需要讀取的事件觸發得再快,也需要由io_service逐一處理。

在這個聊天室的例子裡,如果不考慮把資料按使用者輸入順序傳送出去的話,可以使用更簡單的辦法來處理do_write()函式,例如:

void do_write(chat_message msg){    chat_message* pmsg = new chat_message(msg); // implement copy ctor for chat_message firstly    boost::asio::async_write(socket_,            boost::asio::buffer(pmsg->data(), pmsg->length()),            boost::bind(&chat_client::handle_write, this,                    boost::asio::placeholders::error, pmsg));}void handle_write(const boost::system::error_code& error, chat_message* pmsg){    if (!error) {    }else{        do_close();    }    delete pmsg;}

這裡相當於給每個非同步呼叫分配一塊屬於自己的記憶體,非同步呼叫完成即自動釋放掉,有些類似於閉包了。如果不希望頻繁new/delete記憶體,也可以考慮使用boost::circular_buffer一次性分配記憶體後逐項使用。

I/O物件

socket

Boost.Asio最常用的物件應該就是socket了,常用的函式一般有這幾個:

  • 讀寫TCP socket的時候,一般使用read()async_read()write()async_write(),為了避免所謂的short reads and writes,一般不使用receive()async_receive()send()async_send()
  • 讀寫有連線的UDP socket的時候,一般使用receive()async_receive()send(),async_send()
  • 讀寫無連線的UDP socket的時候,一般使用receive_from()async_receive_from(),send_to()async_send_to()

而自由函式boost::asio::async_write()和類成員函式socket.async_write_some()的有什麼區別呢(boost::asio::async_read()socket.async_read_some()類似):

  • boost::asio::async_write() 非同步寫,立即返回。但它可以保證寫完整個緩衝區的內容,否則將報錯。
  • boost::asio::async_write() 是通過呼叫n次socket.async_write_some() 來實現的,所以程式碼必須確保在boost::asio::async_write()執行的時候,沒有其他的寫操作在同一socket上執行。
  • 在呼叫boost::asio::async_write()的時候,如果指定buffer的length沒有寫完或出錯,是不會回撥相應的handler的,它將一直在run loop中執行;直到buffer裡所有的資料都寫完或出錯(此時handler裡返回的長度肯定會小於buffer length),才會呼叫handler繼續處理;而socket.async_write_some()不會有這樣的問題,它只會嘗試寫一次,寫完的長度會在handler的引數裡返回。

所以,這裡強調使用asio時第二件需要重視的事情,就是handler的返回值(一般可能宣告為boost::asio::placeholders::error)。因為asio裡所有的任務都由io_service非同步執行,只有執行成功或者失敗之後才會回撥handler,所以返回值是你瞭解當前非同步操作狀況的唯一辦法,記住不要忽略任何handler的返回值處理。

訊號處理

Boost.Asio的訊號處理非常簡單,宣告一個訊號集合,然後把相應的非同步handler綁上就可以了。如果你希望在一個訊號集裡處理所有的訊號,那麼你可以根據handler的第二個引數,來獲取當前觸發的是那個訊號。比如:

boost::asio::signal_set signals(io_service, SIGINT, SIGTERM);signals.add(SIGUSR1); // 也可以直接用add函式新增訊號signals.async_wait(boost::bind(handler, _1, _2));void handler(  const boost::system::error_code& error,  int signal_number // 通過這個引數獲取當前觸發的訊號值);

定時器

Boost.Asio的定時器用起來根訊號集一樣簡單,但由於它太過簡單,也有不方便的地方。比如,在一個UDP伺服器裡,一般收到的每個UDP包中都會包含一個sequence number,用於標識該UDP,以應對包處理超時情況。假設每個UDP包處理時間只有100ms,如果超時則直接給客戶端返回超時標記。這種最簡單的定時器常用的一些Reactor框架都有很完美的解決方案,一般是建一個定時器連結串列來實現,但是Asio中的定時器沒法單獨完成這個工作。

boost::asio::deadline_timer 只有兩種狀態:超時和未超時。所以,只能很土的對每個UDP包建立一個定時器,然後藉助std::mapboost::shared_ptr儲存sequence number到定時器的對映,根據定時器handler的返回值判斷該定時器是超時,還是被主動cancel。

strand

在多執行緒中,多個I/O物件的handler要訪問同一塊臨界區,此時可以使用strand來保證這些handler之間的同步。

示例:

我們向定時器註冊 func1 和 func2,它們可能會同時訪問全域性的物件(比如 std::cout )。這時我們希望對 func1 和 func2 的呼叫是同步的,即執行其中一個的時候,另一個要等待。

這時就可以用到 boost::asio::strand 類,它可以把幾個cmd包裝成同步執行的。例如,我們向定時器註冊 func1 和 func2 時,可以改為:

boost::asio::strand  the_strand;t1.async_wait(the_strand.wrap(func1));      //包裝為同步執行的t2.async_wait(the_strand.wrap(func2));

這樣就保證了在任何時刻,func1 和 func2 都不會同時在執行。

還有就是如果你希望把一個io_service物件繫結到多個執行緒。此時需要boost::asio::strand來確保handler不會被同時執行,因為非同步操作,比如async_writeasync_receive_from之類會影響到臨界區buffer。

具體可參考asio examples裡的示例:HTTP Server 2HTTP Server 3的connection.hpp設計。

參考

關於boost asio最好的參考書當然就是官方文件和它的原始碼,此外還有兩個不錯的資料: