1. 程式人生 > >15.6.2【Task使用】 組合異步操作

15.6.2【Task使用】 組合異步操作

中一 取字符 接受 具體化 別人 sync 承諾 complete 累加

  對於C# 5異步特性,我最喜歡的一點是它可以自然而然地組合在一起。這表現為兩種不同的 方式。最明顯的是,異步方法返回任務,並通常會調用其他返回任務的方法。這些方法可以是直 接的異步操作(如鏈的最底部),也可以是更多的異步方法。所有的包裝和拆包都需要將結果轉 換為任務,反向操作則由編譯器完成。

  另一種組合形式是,創建與操作無關的構建塊來管理任務的處理。這些構建塊無須知道任務 在做什麽,而只是單純待在 Task<T> 的抽象級別。這有點像LINQ操作符,只是面向的是任務而 不是序列。框架中內置了一些構建塊,但也可以自行創建。

1. 在單個調用中收集結果

  例如,嘗試獲取若幹URL。15.3.6節中一次性獲取了所有URL,並在完成任務後立即停止獲 取。假設這次要啟動多個並行請求,然後每得到一個URL就記錄下結果。記住,異步方法返回的 是已經運行的任務,因此可非常輕松地為每個URL啟動一個任務:

1             string[] urls = new string[] { "http://stackoverflow.com", "http://www.google.com", "http://csharpindepth.com" };
2             var tasks = urls.Select(async url =>
3             {
4                 using (var client = new HttpClient())
5                 {
6                     return
await client.GetStringAsync(url); 7 } 8 }).ToList();

  註意,需調用 ToList() 來具體化LINQ查詢。這保證了每個任務將只啟動一次。否則每次叠 代 tasks 時,將會再次獲取字符串。(如不釋放 HttpClient ,代碼會更加簡單,但即便如此,代 碼也不是很難看。)

  TPL提供了一個 Task.WhenAll 方法,從而將各有一個結果的多個任務組合成一個包含多個 結果的任務。常用的方法重載簽名如下所示:

        public static Task<TResult[]> WhenAll<TResult>(IEnumerable<Task<TResult>> tasks);

  這個聲明看上去非常糟糕,但在真正使用時,會發現其方法目的非常單純。你將得到一個 List<Task<string>> ,因此可以寫為:

            string[] results = await Task.WhenAll(tasks);

  所有任務均已結束,並將結果收集到一個數組中後,等待方可終止。本章前面講過,如果多 個任務拋出異常,則只有第一個異常會立即拋出,但可總是叠代這些任務,以找到具體失敗的任 務及其失敗原因,或使用代碼清單15-2中所示的 WithAggregatedException 擴展方法。

  如果只關註第一個返回的請求,則可使用 Task.WhenAny 方法。該方法不會等待第一個成功 完成的任務,而只會等待第一個到達終點狀態的任務。

  本例中,你可能想要點特別的做法。在任務完成後報告全部結果可能會更有用些。

2. 在全部完成時收集結果

  Task.WhenAll 是.NET內置的轉換構建塊(transformational building block),接下來將介紹如何以類似的方式構建自己的方法。TAP文檔中含有類似的示例代碼,從而創建了 Interleaved 方法,這裏將介紹另一版本。

  代碼清單15-12旨在傳遞一個輸入任務的序列,並返回一個輸出任務的序列。兩個序列中任 務的結果是相同的,但存在一個重要差異,即輸出任務的完成順序與輸入完全一樣,因此可以一 次 await 一個任務,並可立即得到任務結果。這聽上去有些神奇,對我來說也是如此,因此我們 來看看代碼,研究一下它的工作原理。

 1         public static IEnumerable<Task<T>> InCompletionOrder<T>(this IEnumerable<Task<T>> source)
 2         {
 3             var inputs = source.ToList();
 4             var boxes = inputs.Select(x => new TaskCompletionSource<T>()).ToList();
 5 
 6             int currentIndex = -1;
 7             foreach (var task in inputs)
 8             {
 9                 task.ContinueWith(completed =>
10                 {
11                     var nextBox = boxes[Interlocked.Increment(ref currentIndex)];
12                     PropagateResult(completed, nextBox);
13                 }, TaskContinuationOptions.ExecuteSynchronously);
14             }
15             return boxes.Select(box => box.Task);
16         }

  代碼清單15-12依賴TPL中一個非常重要的類型,即 TaskCompletionSource<T> 。該類型 可用於創建一個尚未含有結果的 Task ,並在之後提供結果(或異常)。它和 AsyncTaskMethod Builder<T> 都建立在相同的基礎結構之上。後者為異步方法提供返回的 Task ,並在方法體完成 時,將帶結果的任務向外傳播。

  為什麽會用這麽奇怪的變量名( boxes )呢?我常常把任務想象成紙箱,這些紙箱承諾 (promise)在某個時刻,其內部會含有值或錯誤。 TaskCompletionSource<T> 就像是背面有洞 的箱子,你可以把它給別人,然後再偷偷地把值從洞口塞進去 ① 。這正是 PropagateResult 方法 的作用,不過它沒那麽有意思,所以此處不予列出,基本上它會將已完成的 Task<T> 的結果傳播 到 TaskCompletionSource<T> 中。如果原始任務正常完成,則將返回值復制到 Task CompletionSource<T> 中。如果原始任務產生了錯誤,則可將異常復制到 TaskCompletion Source<T> 中。取消原始任務後, TaskCompletionSource<T> 也會隨之被取消。

  真正聰明的部分是(我對此說法不承擔任何責任——有人發郵件建議我加入這一免責聲明), 在該方法運行時,它並不知道哪個 TaskCompletionSource<T> 會對應哪個輸入任務,而只是將 相同的後續操作附加到各任務上,然後由後續操作來尋找下一個 TaskCompletionSource<T> (通過對一個計數器進行原子地累加)並傳播結果。也就是說,它會按照原始任務的輸出順序對箱子進行填充。

  圖15-5展示了三個輸入任務,以及相應的由方法返回的輸出任務。即使輸入任務的順序與方 法返回的順序不同,輸出任務的順序也會與之相同。 有了這個絕妙的擴展方法後,即可編寫代碼清單15-13,從而得到一組URL,並行地對每個 URL發起請求,並在請求完成時寫下各頁面的長度,然後返回總長度。

 1         static void Main()
 2         {
 3             var task = ShowPageLengthsAsync("http://stackoverflow.com", "http://www.google.com", "http://csharpindepth.com");
 4             Console.WriteLine("Total length: {0}", task.Result);
 5         }
 6         static async Task<int> ShowPageLengthsAsync(params string[] urls)
 7         {
 8             var tasks = urls.Select(async url =>
 9             { 
10                 using (var client = new HttpClient())
11                 {
12                     return await client.GetStringAsync(url);
13                 }
14             }).ToList();
15 
16             int total = 0;
17             foreach (var task in tasks.InCompletionOrder())
18             {
19                 string page = await task;
20                 Console.WriteLine("Got page length {0}", page.Length);
21                 total += page.Length;
22             }
23             return total;
24         }

  代碼清單15-13存在兩個小問題。
  1.一個任務失敗,則整個異步操作都將失敗,並且不會保留結果。這也許沒問題,但也可能希望能夠將每次失敗記錄下來。(與.NET 4不同,不處理任務異常,則默認不會讓進程當掉,但至少應考慮對其他任務產生的影響。)
  2.失去對頁面轉向具體URL的跟蹤。
這兩個問題都可以通過少量代碼輕松解決,也可以進一步提取成可復用的構建塊。舉這些例子並不是為了滿足個別需求,而是為了讓你接受組合帶來的各種可能性。TAP白皮書中並不是只有 Interleaved 這一個例子,它還包括很多概念,並附帶一些有助於理解的示例。

15.6.2【Task使用】 組合異步操作