1. 程式人生 > >[譯]C++ 協程:理解 co_await 運算子

[譯]C++ 協程:理解 co_await 運算子

C++ 協程:理解 co_await 運算子

在之前關於 協程理論的部落格 中,我介紹了一些函式和協程在較高層次上的一些不同,但沒有詳細介紹 C++ 協程技術規範(N4680)中描述的語法和語義。

協程技術規範中,C++ 新增的關鍵新功能是能夠掛起協程,並能夠在之後恢復。技術規範為此提供的機制是通過新的 co_await

運算子去實現。

理解 co_await 運算子的工作原理可以幫助我們揭開協程行為的神祕面紗,並瞭解它們如何被暫停和掛起的。在這篇文章中,我將解釋 co_await 操作符的機制,並介紹 AwaitableAwaiter 型別的相關概念。

在深入講解 co_await 之前,我想簡要介紹一下協程的技術規範,以提供一些背景知識。

協程技術規範給我們提供了什麼?

  • 三個新的關鍵字: co_awaitco_yieldco_return
  • std::experimental 名稱空間的幾個新型別:
    • coroutine_handle<P>
    • coroutine_traits<Ts...>
    • suspend_always
    • suspend_never
  • 一種能夠讓庫的作者與協程互動並定製它們行為的通用機制。
  • 一個使非同步程式碼變得更加簡單的語言工具!

C++ 協程技術規範在語言中提供的工具,可以理解為協程的低階組合語言。 這些工具很難直接以安全的方式使用,主要是供庫作者使用,用於構建應用程式開發人員可以安全使用的更高級別的抽象。

未來會將這些新的低階工具交付給即將到來的語言標準(可能是 C++20),以及標準庫中伴隨的一些高階型別,這些高階型別封裝了這些低階構建塊,應用程式開發人員將可以通過一種安全的方式輕鬆訪問協程。

編譯器與庫的互動

有趣的是,協程技術規範實際上並沒有定義協程的語義。它沒有定義如何生成返回給呼叫者的值,沒有定義如何處理傳遞給 co_return

語句的返回值,如何處理傳遞出協程的異常,它也沒有定義應該恢復協程的執行緒。

相反,它指定了庫程式碼的通用機制,那就是通過實現符合特定介面的型別來定製協程的行為。然後,編譯器生成程式碼,在庫提供的型別例項上呼叫方法。這種方法類似於庫作者通過定義 begin() / end() 方法或 iterator 型別來定製基於範圍的 for 迴圈的實現。

協程技術規範沒有對協程的機制規定任何特定的語義,這使它成為一個強大的工具。它允許庫作者為各種不同目的來定義許多不同種類的協程。

例如,你可以定義一個非同步生成單個值的協程,或者一個延遲生成一系列值的協程,或者如果遇到 nullopt 值,則通過提前退出來簡化控制流以消耗 optional <T> 值的協程。

協程技術規範定義了兩種介面:Promise 介面和 Awaitable 介面。

Promise 介面指定用於自定義協程本身行為的方法。庫作者能夠自定義呼叫協程時發生的事件,如協程返回時(通過正常方式或通過未處理的異常返回),或者自定義協程中任何 co_awaitco_yield 表示式的行為。

Awaitable 介面指定控制 co_await 表示式語義的方法。當一個值為 co_await 時,程式碼被轉換為對 awaitable 物件上的方法的一系列呼叫。它可以指定:是否暫停當前協程,暫停排程協程以便稍後恢復後執行一些邏輯,還有在協程恢復後執行一些邏輯以產生 co_await 表示式的結果。

我將在以後的部落格中介紹 Promise 介面的細節,現在我們先來看看 Awaitable 藉口。

Awaiters 與 Awaitables:解釋操作符 co_await

co_await運算子是一個新的一元運算子,可以應用於一個值。例如:co_await someValue

co_await 運算子只能在協程的上下文中使用。這有點語義重複,因為根據定義,任何包含 co_await 運算子的函式體都將被編譯為協程。

支援 co_await 運算子的型別稱為 Awaitable 型別。

注意,co_await 運算子是否可以用作型別取決於 co_await 表示式出現的上下文。用於協程的 promise 型別可以通過其 await_transform 方法更改協程中的 co_await 表示式的含義(稍後將詳細介紹)。

為了更具體地在需要的地方,我喜歡使用術語 Normally Awaitable 來描述在協程型別中沒有 await_transform 成員的協程上下文中支援 co_await 運算子的型別。我喜歡使用術語 Contextually Awaitable 來描述一種型別,它在某些型別的協程的上下文中僅支援 co_await 運算子,因為協程的 promise 型別中存在 await_transform 方法。(我樂意接受這些名字的更好建議...)

Awaiter 型別是一種實現三個特殊方法的型別,它們被稱為 co_await 表示式的一部分:await_readyawait_suspendawait_resume

請注意,我在 C# async 關鍵字的機制中“借用”了 “Awaiter” 這個術語,該機制是根據 GetAwaiter() 方法實現的,該方法返回一個物件,其介面與 c++ 的 Awaiter 概念驚人的相似。有關 C# awaiters 的更多詳細資訊,請參閱這篇博文

請注意,型別可以是 Awaitable 型別和 Awaiter 型別。

當編譯器遇到 co_await <expr> 表示式時,實際上可以根據所涉及的型別將其轉換為許多可能的內容。

獲取 Awaiter

編譯器做的第一件事是生成程式碼,以獲取等待值的 Awaiter 物件。在 N4680 章節 5.3.8(3) 中,有很多步驟可以獲得 awaiter。

讓我們假設等待協程的 promise 物件具有型別 P,並且 promise 是對當前協程的 promise 物件的 l-value 引用。

如果 promise 型別 P 有一個名為 await_transform 的成員,那麼 <expr> 首先被傳遞給 promise.await_transform(<expr>) 以獲得 Awaitable 的值。 否則,如果 promise 型別沒有 await_transform 成員,那麼我們使用直接評估 <expr> 的結果作為 Awaitable 物件。

然後,如果 Awaitable 物件,有一個可用的運算子 co_await() 過載,那麼呼叫它來獲取 Awaiter 物件。 否則,awaitable 的物件被用作 awaiter 物件。

如果我們將這些規則編碼到 get_awaitable()get_awaiter() 函式中,它們可能看起來像這樣:

template<typename P, typename T>
decltype(auto) get_awaitable(P& promise, T&& expr)
{
  if constexpr (has_any_await_transform_member_v<P>)
    return promise.await_transform(static_cast<T&&>(expr));
  else
    return static_cast<T&&>(expr);
}

template<typename Awaitable>
decltype(auto) get_awaiter(Awaitable&& awaitable)
{
  if constexpr (has_member_operator_co_await_v<Awaitable>)
    return static_cast<Awaitable&&>(awaitable).operator co_await();
  else if constexpr (has_non_member_operator_co_await_v<Awaitable&&>)
    return operator co_await(static_cast<Awaitable&&>(awaitable));
  else
    return static_cast<Awaitable&&>(awaitable);
}
複製程式碼

等待 Awaiter

因此,假設我們已經封裝了將 <expr> 結果轉換為 Awaiter 物件到上述函式中的邏輯,那麼 co_await <expr> 的語義可以(大致)這樣轉換:

{
  auto&& value = <expr>;
  auto&& awaitable = get_awaitable(promise, static_cast<decltype(value)>(value));
  auto&& awaiter = get_awaiter(static_cast<decltype(awaitable)>(awaitable));
  if (!awaiter.await_ready())
  {
    using handle_t = std::experimental::coroutine_handle<P>;

    using await_suspend_result_t =
      decltype(awaiter.await_suspend(handle_t::from_promise(p)));

    <suspend-coroutine>

    if constexpr (std::is_void_v<await_suspend_result_t>)
    {
      awaiter.await_suspend(handle_t::from_promise(p));
      <return-to-caller-or-resumer>
    }
    else
    {
      static_assert(
         std::is_same_v<await_suspend_result_t, bool>,
         "await_suspend() must return 'void' or 'bool'.");

      if (awaiter.await_suspend(handle_t::from_promise(p)))
      {
        <return-to-caller-or-resumer>
      }
    }

    <resume-point>
  }

  return awaiter.await_resume();
}
複製程式碼

await_suspend() 的呼叫返回時,await_suspend() 的返回值為 void 的版本無條件地將執行轉移回協程的呼叫者/恢復者,而返回值為 bool 的版本允許 awaiter 物件有條件地返回並立即恢復協程,而不返回呼叫者/恢復者。

await_suspen()bool 返回版本在 awaiter 可能啟動非同步操作(有時可以同步完成)的情況下非常有用。 在它同步完成的情況下,await_suspend() 方法可以返回 false 以指示應該立即恢復協程並繼續執行。

<suspend-coroutine> 處,編譯器生成一些程式碼來儲存協程的當前狀態並準備恢復。這包括儲存 <resume-point> 的斷點位置,以及將當前儲存在暫存器中的任何值溢位到協程快照記憶體中。

<suspend-coroutine> 操作完成後,當前的協程被認為是暫停的。你可以觀察到暫停的協程的第一個斷點是在 await_suspend() 的呼叫中。協程暫停後,就可以恢復或銷燬。

當操作完成後,await_suspend() 方法負責在將來的某個時刻排程並將協程恢復(或銷燬)。注意,從 await_suspend() 中返回 false 算作排程協程,以便在當前執行緒上立即恢復。

await_ready() 方法的目的,是允許你在已知操作同步完成而不需要掛起的情況下避免 <suspend-coroutine> 操作的成本。

<return-to-caller-or-resumer> 斷點處執行轉移回呼叫者或恢復者,彈出本地堆疊幀但保持協程幀活躍。

當(或者說如果)暫停的協程最終恢復時,執行將在 <resume-point> 斷點處重新開始。即緊接在呼叫 await_resume()方法之前獲取操作的結果。

await_resume() 方法呼叫的返回值成為 co_await 表示式的結果。await_resume() 方法也可以丟擲異常,在這種情況下異常從 co_await 表示式中丟擲。

注意,如果異常從 await_suspen() 丟擲,則協程會自動恢復,並且異常會從 co_await 表示式丟擲而不呼叫 await_resume()

協程控制代碼

你可能已經注意到 coroutine_handle <P> 型別的使用,該型別被傳遞給 co_await 表示式的 await_suspend() 呼叫。

該型別表示協程幀的非擁有控制代碼,可用於恢復協程的執行或銷燬協程幀。它還可以用於訪問協程的 promise 物件。

coroutine_handle 型別具有以下介面:

namespace std::experimental
{
  template<typename Promise>
  struct coroutine_handle;

  template<>
  struct coroutine_handle<void>
  {
    bool done() const;

    void resume();
    void destroy();

    void* address() const;
    static coroutine_handle from_address(void* address);
  };

  template<typename Promise>
  struct coroutine_handle : coroutine_handle<void>
  {
    Promise& promise() const;
    static coroutine_handle from_promise(Promise& promise);

    static coroutine_handle from_address(void* address);
  };
}
複製程式碼

在實現 Awaitable 型別時,你將在 coroutine_handle 上使用的主要方法是 .resume(),當操作完成並希望恢復等待的協程的執行時,應該呼叫這個方法。在 coroutine_handle 上呼叫 .resume() 將在 <resume-point> 重新喚醒一個掛起的協程。當協程接下來遇到一個 <return-to-caller-or-resumer> 時,對 .resume() 的呼叫將返回。

.destroy() 方法銷燬協程幀,呼叫任何範圍內變數的解構函式並釋放協程幀使用的記憶體。通常,你不需要(實際上應該是避免)呼叫 .destroy(),除非你是一個實現協程 promise 型別的庫編寫者。通常,協程幀將由從對協程的呼叫返回的某種 RAII(譯者注:資源獲取即初始化)型別擁有。 所以在沒有與 RAII 物件合作的情況下呼叫 .destroy() 可能會導致雙重銷燬的錯誤。

.promise() 方法返回對協程的 promise 物件的引用。但是,就像 .destroy() 那樣,它通常只在你建立協程 promise 型別時才有用。 你應該將協程的 promise 物件視為協程的內部實現細節。 對於大多數常規的 Awaitable 型別,你應該使用 coroutine_handle <void> 作為 await_suspend() 方法的引數型別,而不是 coroutine_handle <Promise>

coroutine_handle <P> :: from_promise(P&promise) 函式允許從對協程的 promise 物件的引用重構協程控制代碼。注意,你必須確保型別 P 與用於協程幀的具體 promise 型別完全匹配; 當具體的 promise 型別是 Derived 時,試圖構造 coroutine_handle <Base> 會出現未定義的行為的錯誤。

.address()/from_address() 函式允許將協程控制代碼轉換為 void* 指標。這主要是為了允許作為 “context(上下文)”引數傳遞到現有的 C 風格的 API 中,因此你可能會發現在某些情況下實現 Awaitable 型別很有用。但是,在大多數情況下,我發現有必要將附加資訊傳遞給這個 'context' 引數中的回撥,因此我通常最終將 coroutine_handle 儲存在結構中並將指標傳遞給 'context' 引數中的結構而不是使用 .address() 返回值。

無同步的非同步程式碼

co_await 運算子的一個強大的設計功能是在協程掛起之後但在執行返回給呼叫者/恢復者之前執行程式碼的能力。

這允許 Awaiter 物件在協程已經被掛起之後發起非同步操作,將被掛起的協程的(控制代碼) coroutine_handle 傳遞給運算子,當操作完成時(可能在另一個執行緒上)它可以安全地恢復操作,而不需要任何額外的同步。

例如,當協程已經掛起時,在 await_suspend() 內啟動非同步讀操作意味著我們可以在操作完成時恢復協程,而不需要任何執行緒同步來協調啟動操作的執行緒和完成操作的執行緒。

Time     Thread 1                           Thread 2
  |      --------                           --------
  |      ....                               Call OS - Wait for I/O event
  |      Call await_ready()                    |
  |      <supend-point>                        |
  |      Call await_suspend(handle)            |
  |        Store handle in operation           |
  V        Start AsyncFileRead ---+            V
                                  +----->   <AsyncFileRead Completion Event>
                                            Load coroutine_handle from operation
                                            Call handle.resume()
                                              <resume-point>
                                              Call to await_resume()
                                              execution continues....
           Call to AsyncFileRead returns
         Call to await_suspend() returns
         <return-to-caller/resumer>
複製程式碼

在利用這種方法時要特別注意的一件事情是,如果你開始將協程控制代碼釋出到其他執行緒的操作,那麼另一個執行緒可以在 await_suspend() 返回之前恢復另一個執行緒上的協程,繼續與 await_suspend() 方法的其餘部分同時執行。

協程恢復時首先要做的是呼叫 await_resume() 來獲取結果,然後經常會立即銷燬 Awaiter 物件(即 await_suspend() 呼叫的 this 指標)。在 await_suspend() 返回之前,協程可能會執行完成,銷燬協程和 promise 物件。

所以在 await_suspend() 方法中,如果可以在另一個執行緒上同時恢復協程,你需要確保避免訪問 this 指標或協程的 .promise() 物件,因為兩者都已經可能已被銷燬。一般來說,在啟動操作並計劃恢復協程之後,唯一可以安全訪問的是 await_suspend() 中的區域性變數。

與 Stackful 協程的比較

我想稍微多做一些說明,比較一下協程技術規範中的 stackless 協程在協程掛起後與一些現有的常見的協程工具(如 Win32 纖程或 boost::context )一起執行邏輯的能力。

對於許多 stackful 協程框架,一個協程的暫停操作與另一個協程的恢復操作相結合,形成一個 “context-switch(上下文切換)” 操作。使用這種 “context-switch” 操作,通常在掛起當前協程之後,而在將執行轉移到另一個協程之前,沒有機會執行邏輯。

這意味著,如果我們想在 stackful 協程之上實現類似的非同步檔案讀取操作,那麼我們必須在掛起協程之前啟動操作。因此,可以在協程暫停之前在另一個執行緒上完成操作,並且有資格恢復。在另一個執行緒上完成的操作和協程掛起之間的這種潛在競爭需要某種執行緒同步來仲裁,並決定勝利者。

通過使用 trampoline context 可以解決這個問題,該上下文可以在初始化上下文被掛起後代表啟動上下文啟動操作。然而,這將需要額外的基礎設施和額外的上下文切換以使其工作,並且這引入的開銷可能大於它試圖避免同步的成本。

避免記憶體分配

非同步操作通常需要儲存一些每個操作的狀態,以跟蹤操作的進度。這種狀態通常需要在操作期間持續,並且只有在操作完成後才會釋放。

例如,呼叫非同步 Win32 I/O 函式需要你分配並傳遞指向 OVERLAPPED 結構的指標。呼叫者負責確保此指標保持有效,直到操作完成。

使用傳統的基於回撥的 API,通常需要在堆上分配此狀態以確保其具有適當的生命週期。如果你執行了許多操作,則可能需要為每個操作分配並釋放此狀態。如果效能成為了問題,那麼可以使用自定義分配器從記憶體池中分配這些狀態物件。

同時,我們可以在使用協程時,通過利用協程幀中的區域性變數在協程掛起後還會保持活躍的特性,避免為操作狀態在堆上分配記憶體。

通過將每個操作狀態放置在 Awaiter 物件中,我們可以從協程幀有效地 “borrow(借用)” 儲存器,用於在 co_await 表示式的持續時間記憶體儲每個操作狀態。一旦操作完成,協程就會恢復並且銷燬 Awaiter 物件,從而釋放協程幀中的記憶體以供其他區域性變數使用。

最終,協程幀仍然可以在堆上分配。但是,一旦分配了,協程幀就可以使用這個堆分配來執行許多非同步操作。

你想想,協程幀就像一種高效能的 arena 記憶體分配器。編譯器在編譯時計算出所有區域性變數所需的 arena 總大小,然後能夠根據需要將記憶體分配給區域性變數,而開銷為零!試著用自定義分配器打敗它;)

示例:實現簡單的執行緒同步原語

既然我們已經介紹了 co_await 運算子的許多機制,我想通過實現一個基本可等待同步原語來展示如何將這些知識付諸實踐:非同步手動重置事件。

這個事件的基本要求是,它需要通過多個併發執行協程來成為 Awaitable 狀態,當等待時,需要掛起等待的協程,直到某個執行緒呼叫 .set() 方法,此時任何等待的協程都將恢復。如果某個執行緒已經呼叫了 .set(),那麼協程應該繼續,而不是掛起。

理想情況下,我們還希望將其設定為 noexcept,不需要在堆上分配,也不需要無鎖的實現。

2017/11/23 更新:增加 async_manual_reset_event 示例

示例用法如下所示:

T value;
async_manual_reset_event event;

// A single call to produce a value
void producer()
{
  value = some_long_running_computation();

  // Publish the value by setting the event.
  event.set();
}

// Supports multiple concurrent consumers
task<> consumer()
{
  // Wait until the event is signalled by call to event.set()
  // in the producer() function.
  co_await event;

  // Now it's safe to consume 'value'
  // This is guaranteed to 'happen after' assignment to 'value'
  std::cout << value << std::endl;
}
複製程式碼

讓我們首先考慮一下這個事件可能存在的狀態:not setset

當它處於 'not set' 狀態時,有一隊(可能為空的)協程正在等待它變為 'set' 狀態。

當它處於 ‘set’ 狀態時,不會有任何等待的協程,因為 co_wait 狀態下的事件可以在不暫停的情況下繼續。

這個狀態實際上可以用一個 std :: atomic <void *> 來表示。

  • 為 ‘set’ 狀態保留一個特殊的指標值。在這種情況下,我們將使用事件的 this 指標,因為我們知道不能與任何列表項相同的地址。
  • 否則,事件處於 ‘not set’ 狀態,並且該值是指向等待協程結構的單鏈表的頭部的指標。

我們可以通過將節點儲存在放置在協程幀內的 ‘awaiter’ 物件中,從而避免為堆上的連結串列分配節點的額外呼叫。

讓我們從一個類介面開始,如下所示:

class async_manual_reset_event
{
public:

  async_manual_reset_event(bool initiallySet = false) noexcept;

  // No copying/moving
  async_manual_reset_event(const async_manual_reset_event&) = delete;
  async_manual_reset_event(async_manual_reset_event&&) = delete;
  async_manual_reset_event& operator=(const async_manual_reset_event&) = delete;
  async_manual_reset_event& operator=(async_manual_reset_event&&) = delete;

  bool is_set() const noexcept;

  struct awaiter;
  awaiter operator co_await() const noexcept;

  void set() noexcept;
  void reset() noexcept;

private:

  friend struct awaiter;

  // - 'this' => set state
  // - otherwise => not set, head of linked list of awaiter*.
  mutable std::atomic<void*> m_state;

};
複製程式碼

我們有一個相當直接和簡單的介面。在這一點上,需要關注的是它有一個 operator co_await() 方法,它返回了一個尚未定義的 awaiter 型別。

現在讓我們來定義 awaiter 型別

定義 Awaiter 型別

首先,它需要知道它將等待哪個 async_manual_reset_event 的物件,因此它需要一個對這一事件和對應建構函式的應用來進行初始化。

它還需要充當 awaiter 值連結串列中的節點,因此它需要持有指向列表中下一個 awaiter 物件的指標。

它還需要儲存正在執行 co_await 表示式的等待協程的coroutine_handle,以便在事件變為 'set' 狀態時事件可以恢復協程。我們不關心協程的 promise 型別是什麼,所以我們只使用 coroutine_handle <>(這是 coroutine_handle <void> 的簡寫)。

最後,它需要實現 Awaiter 介面,因此需要三種特殊方法:await_readyawait_suspendawait_resume。 我們不需要從 co_await 表示式返回一個值,因此 await_resume 可以返回 void

當我們將這些都放在一起,awaiter 的基本類介面如下所示:

struct async_manual_reset_event::awaiter
{
  awaiter(const async_manual_reset_event& event) noexcept
  : m_event(event)
  {}

  bool await_ready() const noexcept;
  bool await_suspend(std::experimental::coroutine_handle<> awaitingCoroutine) noexcept;
  void await_resume() noexcept {}

private:

  const async_manual_reset_event& m_event;
  std::experimental::coroutine_handle<> m_awaitingCoroutine;
  awaiter* m_next;
};
複製程式碼

現在,當我們執行 co_await 一個事件時,如果事件已經設定,我們不希望等待協程暫停。 因此,如果事件已經設定,我們可以定義 await_ready() 來返回 true

bool async_manual_reset_event::awaiter::await_ready() const noexcept
{
  return m_event.is_set();
}
複製程式碼

接下來,讓我們看一下 await_suspend() 方法。這通常是 awaitable 型別會發生莫名其妙的事情的地方。

首先,它需要將等待協程的控制代碼存入 m_awaitingCoroutine 成員,以便事件稍後可以在其上呼叫 .resume()

然後,當我們完成了這一步,我們需要嘗試將 awaiter 自動加入到 waiters 的連結串列中。如果我們成功加入它,然後我們返回 true ,以表明我們不想立即恢復協程,否則,如果我們發現事件已併發地更改為 set 狀態,那麼我們返回 false ,以表明協程應立即恢復。

bool async_manual_reset_event::awaiter::await_suspend(
  std::experimental::coroutine_handle<> awaitingCoroutine) noexcept
{
  // Special m_state value that indicates the event is in the 'set' state.
  const void* const setState = &m_event;

  // Remember the handle of the awaiting coroutine.
  m_awaitingCoroutine = awaitingCoroutine;

  // Try to atomically push this awaiter onto the front of the list.
  void* oldValue = m_event.m_state.load(std::memory_order_acquire);
  do
  {
    // Resume immediately if already in 'set' state.
    if (oldValue == setState) return false; 

    // Update linked list to point at current head.
    m_next = static_cast<awaiter*>(oldValue);

    // Finally, try to swap the old list head, inserting this awaiter
    // as the new list head.
  } while (!m_event.m_state.compare_exchange_weak(
             oldValue,
             this,
             std::memory_order_release,
             std::memory_order_acquire));

  // Successfully enqueued. Remain suspended.
  return true;
}
複製程式碼

注意,在載入舊狀態時,我們使用 'acquire' 檢視記憶體順序,如果我們讀取特殊的 'set' 值時,那麼我們就可以看到在呼叫 'set()' 之前發生的寫操作。

如果 compare-exchange 執行成功,我們需要 ‘release’ 的狀態,以便後續的 ‘set()’ 呼叫將看到我們對 m_awaitingconoutine 的寫入,以及之前對協程狀態的寫入。

補全事件類的其餘部分

現在我們已經定義了 awaiter 型別,讓我們回過頭來看看 async_manual_reset_event 方法的實現。

首先是建構函式。它需要初始化為 'not set' 狀態和空的 waiters 連結串列(即 nullptr)或初始化為 'set' 狀態(即 this)。

async_manual_reset_event::async_manual_reset_event(
  bool initiallySet) noexcept
: m_state(initiallySet ? this : nullptr)
{}
複製程式碼

接下來,is_set() 方法非常簡單 - 如果它具有特殊值 this,則為 'set':

bool async_manual_reset_event::is_set() const noexcept
{
  return m_state.load(std::memory_order_acquire) == this;
}
複製程式碼

然後是 reset() 方法,如果它處於 'set' 狀態,我們希望它轉換為 'not set' 狀態,否則保持原樣。

void async_manual_reset_event::reset() noexcept
{
  void* oldValue = this;
  m_state.compare_exchange_strong(oldValue, nullptr, std::memory_order_acquire);
}
複製程式碼

使用 set() 方法,我們希望通過使用特殊的 'set' 值(this)將當前狀態來轉換到 'set' 狀態,然後檢查原本的值是什麼。 如果有任何等待的協程,那麼我們希望在返回之前依次順序恢復它們。

void async_manual_reset_event::set() noexcept
{
  // Needs to be 'release' so that subsequent 'co_await' has
  // visibility of our prior writes.
  // Needs to be 'acquire' so that we have visibility of prior
  // writes by awaiting coroutines.
  void* oldValue = m_state.exchange(this, std::memory_order_acq_rel);
  if (oldValue != this)
  {
    // Wasn't already in 'set' state.
    // Treat old value as head of a linked-list of waiters
    // which we have now acquired and need to resume.
    auto* waiters = static_cast<awaiter*>(oldValue);
    while (waiters != nullptr)
    {
      // Read m_next before resuming the coroutine as resuming
      // the coroutine will likely destroy the awaiter object.
      auto* next = waiters->m_next;
      waiters->m_awaitingCoroutine.resume();
      waiters = next;
    }
  }
}
複製程式碼

最後,我們需要實現 operator co_await() 方法。這隻需要構造一個 awaiter 物件。

async_manual_reset_event::awaiter
async_manual_reset_event::operator co_await() const noexcept
{
  return awaiter{ *this };
}
複製程式碼

我們終於完成它了,一個可等待的非同步手動重置事件,具有無鎖,無記憶體分配,noexcept 實現。

如果你想嘗試一下程式碼,或者看看它編譯到 MSVC 和 Clang 下面的程式碼,可以看看 godbolt 上檢視。

你還可以在 cppcoro 庫中找到此類的實現,以及許多其他有用的 awaitable 型別,例如 async_mutexasync_auto_reset_event

結語

這篇文章介紹瞭如何根據 AwaitableAwaiter 概念實現和定義運算子 co_await

它還介紹瞭如何實現一個等待的非同步執行緒同步原語,該原語利用了在協程幀上分配 awaiter 物件的事實,以避免額外的堆分配。

我希望這篇文章已經幫助你對 co_await 這個新的運算子有了更好的理解。

在下一篇部落格中,我將探討 Promise 概念以及協程型別作者如何定製其協程的行為。

致謝

我要特別感謝 Gor Nishanov 在過去幾年中耐心而熱情地回答了我關於協程的許多問題。

此外,還有 Eric Niebler 對本文的早期草稿進行稽核並提供反饋。

如果發現譯文存在錯誤或其他需要改進的地方,歡迎到 掘金翻譯計劃 對譯文進行修改並 PR,也可獲得相應獎勵積分。文章開頭的 本文永久連結 即為本文在 GitHub 上的 MarkDown 連結。


掘金翻譯計劃 是一個翻譯優質網際網路技術文章的社群,文章來源為 掘金 上的英文分享文章。內容覆蓋 AndroidiOS前端後端區塊鏈產品設計人工智慧等領域,想要檢視更多優質譯文請持續關注 掘金翻譯計劃官方微博知乎專欄