1. 程式人生 > >Boost庫之asio io_service以及run、run_one、poll、poll_one區別

Boost庫之asio io_service以及run、run_one、poll、poll_one區別

key net 如何 sta alt erro erl ron eric

一、io_service的作用

io_servie 實現了一個任務隊列,這裏的任務就是void(void)的函數。Io_servie最常用的兩個接口是post和run,post向任務隊列中投遞任務,run是執行隊列中的任務,直到全部執行完畢,並且run可以被N個線程調用。Io_service是完全線程安全的隊列。

二、Io_servie的接口

提供的接口有run、run_one、poll、poll_one、stop、reset、dispatch、post,最常用的是run、post、stop

三、Io_servie 實現代碼的基本類結構:

Io_servie是接口類,為實現跨平臺,采用了策略模式,所有接口均有impl_type實現。根據平臺不同impl_type分為

win_iocp_io_service Win版本的實現,這裏主要分析Linux版本。

task_io_service 非win平臺下的實現,其代碼結構為:

detail/task_io_service_fwd.hpp 簡單聲明task_io_service名稱

detail/task_io_service.hpp 聲明task_io_service的方法和屬性

detail/impl/task_io_service.ipp 具體實現文件

隊列中的任務類型為opertioan,原型其實是typedef task_io_service_operation operation,其實現文件在detail/task_io_service_operation.hpp中,當隊列中的任務被執行時,就是task_io_service_operation:: complete被調用的時候。

四、Dispatch和post的區別

Post一定是PostQueuedCompletionStatus並且在GetQueuedCompletionStatus 之後執行。

Dispatch會首先檢查當前thread是不是io_service.run/runonce/poll/poll_once線程,如果是,則直接運行。

五、Io_servie::run方法的實現

Run方法執行隊列中的所有任務,直到任務執行完畢。

run方法首先構造一個idle_thread_info,和first_idle_thread_類型相同,即通過first_idle_thread_將所有線程串聯起來,它這個串聯不是立即串聯的,當該線程無任務可做是加入到first_idle_thread_的首部,有任務執行時,從first_idle_thread_中斷開。這很正常,因為first_idle_thread_維護的是當前空閑線程。

加鎖,循環執行do_one方法,直到do_one返回false

do_one每次執行一個任務。首先檢查隊列是否為空,若空將此線程追加到first_idle_thread_的首部,然後阻塞在條件變量上,直到被喚醒。

當被喚醒或是首次執行,若stopped_為true(即此時stop方法被調用了),返回0

隊列非空,pop出一個任務,檢查隊列無任務那麽簡單的解鎖,若仍有,調用wake_one_thread_and_unlock嘗試喚醒其他空閑線程執行。然後執行該任務,返回1.

實際上在執行隊列任務時有一個特別的判斷if (o ==&task_operation_),那麽將會執行task_->run,task_變量類型為reactor,在linux平臺實現為epoll_reactor,實現代碼文件為detail/impl/epoll_reactor.ipp,run方法實際上執行的是epoll_wait,run阻塞在epoll_wait上等待事件到來,並且處理完事件後將需要回調的函數push到io_servie的任務隊列中,雖然epoll_wait是阻塞的,但是它提供了interrupt函數,該interrupt是如何實現的呢,它向epoll_wait添加一個文件描述符,該文件描述符中有8個字節可讀,這個文件描述符是專用於中斷epoll_wait的,他被封裝到select_interrupter中,select_interrupter實際上實現是eventfd_select_interrupter,在構造的時候通過pipe系統調用創建兩個文件描述符,然後預先通過write_fd寫8個字節,這8個字節一直保留。在添加到epoll_wait中采用EPOLLET水平觸發,這樣,只要select_interrupter的讀文件描述符添加到epoll_wait中,立即中斷epoll_wait。

Run方法的原則是:

有任務立即執行任務,盡量使所有的線程一起執行任務

若沒有任務,阻塞在epoll_wait上等待io事件

若有新任務到來,並且沒有空閑線程,那麽先中斷epoll_wait,先執行任務

若隊列中有任務,並且也需要epoll_wait監聽事件,那麽非阻塞調用epoll_wait(timeout字段設置為0),待任務執行完畢在阻塞在epoll_wait上。

幾乎對線程的使用上達到了極致。

從這個函數中可以知道,在使用ASIO時,io_servie應該盡量多,這樣可以使其epoll_wait占用的時間片最多,這樣可以最大限度的響應IO事件,降低響應時延。但是每個io_servie::run占用一個線程,所以io_servie最佳應該和CPU的核數相同。

六、Io_servie::stop的實現

加鎖,調用stop_all_threads

設置stopped_變量為true,遍歷所有的空閑線程,依次喚醒

task_interrupted_設置為true,調用task_的interrupt方法。

七、reset和stop

文檔中reset的解釋是重置io_service以便下一次調用。

run,run_one,poll,poll_one是被stop掉導致退出,或者由於完成了所有任務(正常退出)導致退出時,在調用下一次 run,run_one,poll,poll_one之前,必須調用此函數。reset不能在run,run_one,poll,poll_one正在運行時調用。如果是消息處理handler(用戶代碼)拋出異常,則可以在處理之後直接繼續調用 io.run,run_one,poll,poll_one。

八、run,run_one,poll,poll_one的區別

run其實就是一直循環執行do_one,並且是以阻塞方式進行的(參數為true),而run_one同樣是以阻塞方式進行的,但只執行一次do_one;poll和run幾乎完全相同,只是它是以非阻塞方式執行do_one(參數為false),poll_one是以非阻塞方式執行一次do_one。

run,run_one,及poll,poll_one的實現代碼,如下:

[cpp] view plain copy
  1. // Run the event loop until stopped or no more work.
  2. size_t run(boost::system::error_code& ec)
  3. {
  4. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  5. {
  6. stop();
  7. ec = boost::system::error_code();
  8. return 0;
  9. }
  10. call_stack<win_iocp_io_service>::context ctx(this);
  11. size_t n = 0;
  12. while (do_one(true, ec))
  13. if (n != (std::numeric_limits<size_t>::max)())
  14. ++n;
  15. return n;
  16. }
  17. // Run until stopped or one operation is performed.
  18. size_t run_one(boost::system::error_code& ec)
  19. {
  20. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  21. {
  22. stop();
  23. ec = boost::system::error_code();
  24. return 0;
  25. }
  26. call_stack<win_iocp_io_service>::context ctx(this);
  27. return do_one(true, ec);
  28. }
  29. // Poll for operations without blocking.
  30. size_t poll(boost::system::error_code& ec)
  31. {
  32. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  33. {
  34. stop();
  35. ec = boost::system::error_code();
  36. return 0;
  37. }
  38. call_stack<win_iocp_io_service>::context ctx(this);
  39. size_t n = 0;
  40. while (do_one(false, ec))
  41. if (n != (std::numeric_limits<size_t>::max)())
  42. ++n;
  43. return n;
  44. }
  45. // Poll for one operation without blocking.
  46. size_t poll_one(boost::system::error_code& ec)
  47. {
  48. if (::InterlockedExchangeAdd(&outstanding_work_, 0) == 0)
  49. {
  50. stop();
  51. ec = boost::system::error_code();
  52. return 0;
  53. }
  54. call_stack<win_iocp_io_service>::context ctx(this);
  55. return do_one(false, ec);
  56. }
  57. do_one的函數原型
  58. size_t do_one(bool block, boost::system::error_code& ec)
  59. {
  60. BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, &completion_key, &overlapped, block ? timeout : 0);
  61. }

Boost庫之asio io_service以及run、run_one、poll、poll_one區別