1. 程式人生 > >《CLR Via C#》讀書筆記:27.計算限制的非同步操作

《CLR Via C#》讀書筆記:27.計算限制的非同步操作

一、CLR 執行緒池基礎

一般來說如果計算機的 CPU 利用率沒有 100% ,那麼說明很多程序的部分執行緒沒有執行。可能在等待 檔案/網路/資料庫等裝置讀取或者寫入資料,又可能是等待按鍵、滑鼠移動等事件。

執行 I/O 限制的操作時,作業系統通過裝置驅動程式通知硬體幹活,而 CPU 處於一種空閒狀態。而在現代應用程式當中,使用執行緒池來執行計算限制的操作,而不是手動建立執行緒。

每個 CLR 都有自己獨立的執行緒池,並且由各自 CLR 控制的所有 AppDomain 所共享。

執行緒池本身維護了一個請求佇列,當程式需要執行一個非同步操作的時候,會將一個記錄項追加到佇列之中,然後由執行緒池將該記錄項分派給執行緒池執行緒,如果沒有執行緒則建立一個新的執行緒。執行緒任務處理完整之後,將該執行緒放入執行緒池中等待以後進行復用。

執行緒池本身是啟發式的,結合程式負載,他會自己根據當前執行緒池內執行緒的狀態銷燬/新增執行緒。

二、執行簡單的計算限制操作

通過 ThreadPool 靜態類,我們可以方便地使用執行緒池中的執行緒為我們執行一些計算限制的非同步操作。只需要呼叫 ThreadPoolQueueUserWorkItem(WaitCallBack callback) 方法,或者是他的另一個過載方法,接收一個 state 值作為引數。

他的兩個方法都是非阻塞的,呼叫之後會立即返回。

WaitCallBack 的方法簽名如下:

delegate void WaitCallBack(Object state);

在 CLR 的執行緒池中,將 callback 委託作為工作項新增到隊列當中,然後由執行緒池分發執行緒進行處理。

【注意】

一旦回撥方法丟擲了未處理的異常,CLR 會立即終止程序。

三、執行上下文

每個執行緒都有一個執行上下文的資料結構,包含由安全設定,宿主設定和邏輯呼叫上下文資料(AsyncLocal 與 CallContext)。

當在某個執行緒(例如主執行緒)使用了另外一個執行緒(輔助執行緒),就會產生執行上下文由呼叫者執行緒流向被呼叫執行緒。這會對效能造成一定的影響,這是因為執行上下文包含有大量地資訊。而如果輔助執行緒又呼叫了更多的輔助執行緒,這個時候執行上下問的複製開銷就非常大。

我們可以通過 ExecutionContext 類控制執行緒的執行上下文是否流向輔助執行緒,只有輔助執行緒不需要訪問執行上下文時可以阻止執行上下文流動。當阻止了執行上下文流動時,輔助執行緒會使用最後一次與其關聯的任意執行上下文,這個時候對於安全設定等就不可信,不應執行任何依賴於執行上下文的操作。

一般來說在主執行緒,可以通過 ExecutionContext.SuppressFlow(); 方法阻止執行上下文流動,然後再通過 ExecutionContext.RestoreFlow(); 恢復流動。

四、協作式取消和超時

.NET 提供了標準的取消操作模式,這個模式是協作式的,也就是你要取消的操作必須顯式宣告自己可以被取消。這是因為使用者在執行某些長耗時的計算限制操作的時候,可能會因為等待時間太長或者其他原因需要取消這個操作。

首先我們通過 System.Threading.CancellationTokenSource 物件管理或者取消物件狀態,使用時直接 new 一個即可,而該物件擁有一個 CancellationToken 物件。

這個 Token 物件用於傳遞給執行計算限制操作的方法,通過該 Token 的 IsCancellationRequested 屬性你可以在方法內部確認任務是否被取消,如果被取消你就可以進行返回操作。

例子如下:

static void Main(string[] args)
{
    var tokenSource = new CancellationTokenSource();

    ThreadPool.QueueUserWorkItem(z => Calculate(CancellationToken.None, 10000));

    Console.ReadKey();
    tokenSource.Cancel();

    Console.ReadLine();
}

private static void Calculate(CancellationToken token, int count)
{
    for (int i = 0; i < count; i++)
    {
        if (token.IsCancellationRequested)
        {
            Console.WriteLine("使用者提前終止操作,退出執行緒..");
            break;
        }
        
        Console.WriteLine(count);
        Thread.Sleep(200);
    }
    
    Console.WriteLine("計數完成.");
}

【注意】

如果你要執行一個不允許被取消的操作,可以為方法傳遞一個 CancellationToken.None 物件,因為該物件沒有 Source 源,則不可能會被呼叫 Cancel() 進行取消。

註冊取消事件

CancellationToken 允許我們通過 Register() 方法註冊多個委託,這些被註冊了的委託會在 TokenSource 呼叫 Cancel 取消的時候優先呼叫,其呼叫的先後順序為註冊時的順序。

【注意】

呼叫 Register() 方法的時候,他有兩個 bool 型別的引數,分別是 useSyncContextuseExecutionContext。這兩個引數用於指定,是否要用呼叫執行緒的同步上下文或者執行上下文來呼叫回撥函式。

同時在註冊成功之後會返回一個 CancellationTokenRegistration 物件,通過呼叫該物件的 Dispose 方法可以刪除已經註冊的委託回撥,這樣在取消時就不會呼叫該回調。

TokenSource 連結

可以通過 CancellationTokenSource.CreateLinkedTokenSource() 連結兩個或多個物件,連結成功後會返回一個單獨的 TokenSource 物件。

一旦這個新物件連結的任何一個 TokenSource 物件被取消的時候,該物件也會被取消掉。

Cancel 的異常處理

在呼叫 TokenSource 的 Cancel() 方法時(預設為 false),該方法還有另外一個過載傳入 bool 值,如果為 true 的時候,有多個註冊的回撥委託,一旦某個出現異常直接會被丟擲該異常,不會等待其他回撥執行完畢。

如果為 false,則會等到所有回撥方法執行完成時,丟擲一個 AggregateException 異常,內部的 InnerExceptions 包含有所有在執行過程中產生的異常資訊集合。

超時取消

除了直接呼叫 Cancel() 立即取消操作之外,還有一個延遲取消的方法 CancelAfter(),通過傳遞具體的延遲時間,我們可以在指定的之間之後取消某個任務。(PS:有點像 Polly 的 TimeOut )

五、任務

為啥使用任務,雖然通過 ThreadPool 可以很方便地發起一次計算限制的操作,但是你不知道你的方法啥時候執行完成,也無法在操作完成之後獲得返回值。

使用任務執行一個計算限制操作有兩種方式,兩者也一樣的可以傳遞 CancellationToken 進行取消操作:

  1. new Task(Sum,20).Start();
  2. Task.Run(()=>Sum(20));

除此之外還可以在構造 Task 時 傳遞一些標誌位,用於任務排程器進行一些特殊處理。

等待任務完成並獲取結果

任務除了標準的無返回值的 Task 型別之外,還有一個包含有泛型引數的 Task<TResult> 型別,其中 TResult 引數就是任務的返回值型別。

在建立好 Task<TResult> 物件之後,可以通過 Task.Wait() 等待任務執行完成,Task 的 Wait() 方法會阻塞呼叫者執行緒直到任務執行完成。執行完成之後,可以通過 Task.Reuslt 獲取任務執行之後的返回值。

PS:

這裡獲取 Result 屬性值時,其內部也會呼叫 Wait() 方法。

如果該 Task 內的計算限制操作丟擲了未經處理的異常,這個異常會被吞噬掉,呼叫 Wait() 方法或者使用 Result 屬性的時候,這些異常資訊會被包裹在 AggregateException 內部並返回給呼叫者執行緒。

【注意】

不推薦直接呼叫 Wait() ,如果 Task 已經開始執行,該方法會阻塞呼叫者執行緒,直到執行完成。第二種情況是任務還沒有開始執行的時候,呼叫者執行緒不會被阻塞,Task 立即執行並返回。而排程器可能會使用呼叫者執行緒來執行這些操作,這個時候,如果呼叫者執行緒獲取了一個執行緒同步鎖,而 Task 因為也是在呼叫者執行緒執行,嘗試獲取鎖的時候,就會產生死鎖。

AggregateException 可能會包含有多個異常,這個時候可以使用該物件的 Handle(Func<Exception,bool> predicate) 方法來為每一個異常進行處理,處理返回 true,未處理返回 false。

在呼叫了 Handle 方法之後,仍然有異常沒有處理,這些沒有處理的異常又會造成一個新的 AggregateException 被丟擲。

【注意】

如果不知道有哪些 Task 內部未處理的異常,可以通過象任務排程器的 UnobservedTaskException 事件登記一個回撥方法,如果存在一個沒有處理到的異常,則會呼叫你的回撥方法,並將異常傳遞給你。

除了 Task.Wait() 方法,還有等待一組任務的 Task.WaitAny()Task.WaitAll()。幾個方法都會阻塞呼叫者執行緒,前者當傳遞的一組任務有任意一個完成則立即返回該任務的索引,後者則是要等待這一組任務全部完成之後才會喚醒呼叫執行緒。

這兩個方法一旦被取消,都會丟擲 OperationCanceledException 異常。

取消任務

可以通過一個 CancellationTokenSource 來取消 Task,一樣的需要傳入的計算限制方法新增 CancellationToken 引數。

只不過呢,在 Task 任務內部我們不通過 IsCancellationRequested 來判斷任務是否取消,而是通過呼叫 Token 的 ThrowIfCancellationRequested() 方法來丟擲異常。

該方法會判斷當前任務是否被取消,如果被取消了,則丟擲異常。這是因為與直接通過執行緒池新增任務不同,執行緒池無法知道任務何時完成,而任務則可以表示是否完成,並且還能返回值。

任務完成時啟動新任務

之前說過通過呼叫 Task.Wait() 或者在任務尚未完成的時候呼叫 Task.Result 屬性,都會造成執行緒池建立新的執行緒。而我們可以通過在任務完成之後,立即開啟一個新的任務,這樣我們就可以通過新的任務知道前一個任務是否已經完成了。

建立一個的計算限制任務物件,我們在啟動了該任務物件之後,呼叫 Task.ContinueWith() 方法來建立一個延續任務,新的延續性任務會有一個 Task 引數,該引數就是最開始的任務。

而在使用 Task.ContinueWith() 時,他還可以傳遞一個標識位。這個標識位用於表明這個延續性任務是在第一個任務什麼情況下才會執行,一般有三種:OnlyOnCanceled(第一個任務取消時才被執行)、OnlyOnFaulted(第一個任務丟擲未處理異常時執行)、OnlyOnRanToCompletion(第一個任務順利完成時執行)。

啟動子任務

一個任務在其內部可以建立其子任務,只需要在內部構造 Task 物件的時候,傳遞一個標識位 TaskCreationOptions.AttachedToParent 將其與父任務關聯。這樣的話,除非其所有子任務執行完成,父任務不會被認為已經完成。

延續性任務也可以作為第一個任務的子任務,指定對應的標識位即可。

任務的內部構造

任務主要由以下幾部分構成:

  1. 任務唯一的 Task Id。
  2. 排程器的引用。
  3. 回撥方法的引用。
  4. 執行上下文的引用。
  5. 其他...

可以看到構造一個 Task 還是需要比較大的開銷的,如果你不需要 Task 的附加特性,完全可以使用 TaskPool.QueueUserworkItem 來獲得更好的效能與效率。

通過 Task 的只讀屬性 Task.Status,我們可以知道任務目前處於哪種狀態,其最終的狀態主要有 3 種,分別是:RanToCompletion(已完成)、Canceled(被取消)、Faulted(出現異常失敗),這三種狀態都屬於任務完成狀態。

另外值得注意的是,通過 ContinueWith()ContinueWhenAll()ContinueWhenAny() 等方法建立的任務狀態都為 WaitingForActivation,這個狀態代表任務會自動開始。

任務工廠

如果你需要在執行多個相同配置的 Task 物件,可以通過 TaskFactoryTaskFactory<TResult>,其大概含義與 Task 的含義相同。

在建立工廠時,可以傳遞一些常用的配置標識位和 CancellationToken 物件,之後我們可以通過 StartNew() 方法來統一執行一堆任務。

任務排程器

任務排程器一般有兩種,第一種是執行緒池任務排程器,一般用於服務端程式。還有一種是同步上下文任務排程器,一般用於 GUI 程式。

六、Parallel 的 For、Foreach、Invoke

For 與 Foreach 基本用於操作一個集合,然後迴圈處理其值。而如果在某個方法內部需要執行多個方法,則可以通過 Invoke 來進行執行。使用 Parallel 類可以讓 CPU 最大化地利用起來而不會阻塞主執行緒。

不過一般不會將所有 For 與 Foreach 都替換為並行化的查詢,這是因為某些迴圈會修改共享資料,這個時候使用 Parallel 的操作則會破壞資料,雖然可以通過增加執行緒同步鎖來解決,不過這樣會造成單執行緒訪問,無法享受並行處理的好處。

同時 Parallel 方法本身也會有開銷,當然在大量重複性任務中這種開銷可以忽略不計,但是如果僅為幾十個短耗時的計算限制任務啟用 Parallel 就會得不償失。

這三種操作都接受一個ParallelOptions 物件用於配置最大並行的工作項數目與排程器。

Parallel 的 For 與 Foreach 的一個過載方法允許傳入 3 個委託,他們分別是:

  • 任務區域性初始化委託(localInit):該委託是在每次工作項處理之前被呼叫。
  • 任務主體委託(body):具體的工作項處理邏輯,參與工作的各個執行緒都會呼叫一次。
  • 任務區域性終結器委託(localFinally):本委託是在每個工作項處理完成之後都會被呼叫。

從上述邏輯來看,可以看作區域性初始化委託為一個父任務,後面兩個為子級連續任務的構造。

例項:

static void Main(string[] args)
{
    var files = new List<string>();
    files.AddRange(Directory.GetFiles(@"E:\Program Files","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Program Files (x86)","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Project","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Cache","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"E:\Windows Kits","*.*",SearchOption.AllDirectories));
    files.AddRange(Directory.GetFiles(@"C:\Program Files\dotnet","*.*",SearchOption.AllDirectories));
    
    Console.WriteLine($"總檔案數量:{files.Count}");
    long allFileCount = 0;

    var watch = new Stopwatch();
    watch.Start();
    Parallel.ForEach<string, long>(files,
        localInit: () =>
        {
            // 初始化檔案大小為 0,
            // 這裡的引數型別取決於任務返回的引數
            return 0;
        },
        body: (fileName, parallelStatus, index, fileCount) =>
        {
            // 計算檔案大小並返回
            long count = 0;
            try
            {
                var info = new FileInfo(fileName);
                count = info.Length;
            }
            catch (Exception e)
            {
            }
            
            // 這裡因為該任務會被執行緒池複用,所以要進行累加
            return count + fileCount;
        },
        localFinally: fileCount => { Interlocked.Add(ref allFileCount, fileCount); }
    );
    
    watch.Stop();
    Console.WriteLine($"並行效率:{watch.ElapsedMilliseconds} ms");
    Console.WriteLine($"檔案總大小:{allFileCount / 1024 / 1024 / 1024} Gb");


    allFileCount = 0;
    watch.Reset();
    
    watch.Start();
    foreach (var file in files)
    {
        long count = 0;
        
        try
        {
            var info = new FileInfo(file);
            count = info.Length;
        }
        catch (Exception e)
        {
        }
        
        allFileCount+=count;
    }
    
    watch.Stop();
    Console.WriteLine($"單執行緒效率:{watch.ElapsedMilliseconds} ms");
    Console.WriteLine($"檔案總大小:{allFileCount / 1024 / 1024 / 1024} Gb");


    Console.ReadLine();
}

效能提升:

通過 Parallel 的 Foreach 與普通的 foreach 遍歷計算,效能總體提升了約 56%,越耗時的操作提升的效果就越明顯。

在 Body 的主體委託當中,傳入了一個 ParallelLoopState 物件,該物件用於每個執行緒與其他任務進行互動。主要有兩個方法 Stop()Break(),前者用於停止迴圈,後者用於跳出迴圈,並且跳出迴圈之後,其 LowestBreakIteration 會返回呼叫過 Break() 方法的最低項。

並且 Parallel 還會返回一個 ParallelLoopResult 物件,該通過該物件我們可以得知這些迴圈是否正常完成。

七、並行語言整合查詢 PLINQ

LINQ 預設查詢的方式是一個執行緒順序處理資料集合中的所有項,稱之為順序查詢。而 PLINQ 就是將這些操作分散到各個 CPU 並行執行,通過 AsParallel() 擴充套件方法可以將 IEnumerable<TSource> 轉換為 ParallelQuery<TSource>

而從並行操作切換回順序操作,只需要呼叫 ParallelEnumableAsSequential() 方法即可。

經過 PLINQ 處理後的資料項其結果是無序的,如果需要有序結果,可以呼叫 AsOrdered() 方法。但是該方法比較損耗效能,一般不推薦使用,如果需要排序應該直接使用與 LINQ 同名的 PLINQ 擴充套件方法。

PLINQ 一般會自己分析使用最好的查詢方式進行查詢,有時候使用順序查詢效能更好。

  • WithCancellation() :允許取消某個 PLINQ 查詢。
  • WithExecutionMode():允許配置 PLINQ 執行模式,是否強制並行查詢。
  • WithMergeOptions():允許配置結果的緩衝與合併方式。
  • WithDegreeOfParallelism():允許配置查詢的最大並行數。

PS:

不建議在多執行緒環鏡中使用 Console.Write() 進行輸出,因為 Console 類內部會對執行緒進行同步,確保只有一個執行緒可以訪問控制檯視窗,這樣會損害效能。

八、定時計算限制操作

通過 CLR 提供的 Timer 定時器,我們可以傳入一個回撥方法。這樣的話計時器會可以根據傳入的週期,來定時將我們的回撥方法通過執行緒池執行緒進行呼叫。

同時計時器還允許傳入一個 dueTime 引數來指定這個計時器首次呼叫回撥方法時需要等待多久(立即執行可以傳入 0),而 period 可以指定 Timer 呼叫回撥方法的週期。

【原理】

線上程池內部所有的 Timer 物件只使用了一個執行緒,當某個 Timer 到期的時候,這個執行緒就會被喚醒。該執行緒通過 ThreadPool.QueueUserWorkItem() 方法將一個工作項新增到執行緒池佇列,這樣你的回撥方法就會得到執行。

【注意】

如果回撥方法執行的時常超過了你設定的週期時常,這樣會造成多個執行緒都在執行你的回撥。因為 Timer 不知道你的回撥執行完成沒有,他只會到期執行你的回撥方法。

解決措施是構造一個 Timer 的時候,為 period 指定一個 Timeout.Infinite 常量,這樣計時器只會觸發一次。之後在你的回撥方法執行完成之後,在其內部通過 Timer.Change() 方法指定一個執行週期,並且設定其 dueTime 為立即執行。

這樣做了之後,你的 Timer 就會確保你的回撥被執行完成之後再開始下一個週期。

這一點可以參考 Abp 實現的 AbpTimer 物件。

九、執行緒池如何管理執行緒

CLR 允許開發人員設定執行緒池最大工作者執行緒數,但是一般不要輕易設定該值,但你可以通過 ThreadPool.GetMaxThreads()ThreadPool.GetMinThreads()GetAvailableThreads() 方法來獲取一些相關資訊。

通過 ThreadPool.QueueUserWorkItem() 方法和 Timer 類處理的工作項總是儲存到 CLR 執行緒池的 全域性佇列 中。工作者執行緒採用一個 FIFO 演算法將工作項從 全域性佇列 取出,因為所有工作者執行緒都有可能去這個佇列拿去工作項,這個時候會使用 執行緒同步鎖 以確保工作項只會被工作者執行緒處理一次。這可能會造成效能瓶頸,對伸縮性和效能會造成某些限制。

預設的任務排程器中,非工作者執行緒排程 Task 時都是存放在全域性佇列,而工作者執行緒排程 Task 則是存放在他自己的本地佇列。

工作者執行緒處理 Task 的步驟:

  • 首先從本地佇列採用 LIFO 演算法取得一個 Task 進行處理。
  • 如果本地佇列沒有 Task,則從其他的工作者執行緒本地佇列拿一個 Task 自己來處理。(會使用執行緒同步鎖)
  • 所有本地佇列都為空,則工作者執行緒會使用 FIFO 演算法去全域性佇列拿一個 Task 進行處理。
  • 如果全域性佇列為空,則執行緒處於休眠狀態,時間過長則銷燬自身。

PS:

結合上下文,說明工作項首先被新增到了全域性佇列,然後由工作者執行緒取到自己的本地佇列進行處理。

執行緒池會動態地根據工作項的多少動態地調整工作者執行緒的數量,一般不需要開發人員進行管控。