1. 程式人生 > >【Programa】.Net 並行執行程式的使用心得

【Programa】.Net 並行執行程式的使用心得

一、摘要

官方介紹:提供對並行迴圈和區域的支援。

名稱空間:using System.Threading.Tasks

三個靜態方法:Parallel.Invoke,Parallel.For,Parallel.ForEach

常用到的引數型別:ParallelLoopResult,ParallelOptions,ParallelLoopState

二、引數

我們先來介紹引數,明白了引數的作用,在選擇和呼叫三個靜態方法及其過載,就遊刃有餘了。

1、ParallelLoopResult:提供執行 Parallel 迴圈的完成狀態

屬性:

1)public bool IsCompleted { get; } 

   如果該迴圈已執行完成(該迴圈的所有迭代均已執行,並且該迴圈沒有收到提前結束的請求),則為 true;否則為 false。

2)public long? LowestBreakIteration { get; } 

  返回一個表示從中呼叫 Break 語句的最低迭代的整數

用途:判斷當並行迴圈結束時,是否因呼叫了break方法或stop方法而提前退出並行迴圈,或所有迭代均已執行。

判斷依據:

條件 結果
IsCompleted 執行完成

!IsCompleted &&

LowestBreakIteration==null

使用了Stop語句而提前終止

!IsCompleted &&

LowestBreakIteration!=null

 使用了Break語句而提前終止

 

 

 

 

 

 

 

 

 

 

2、ParallelLoopState:可用來使 Parallel 迴圈的迭代與其他迭代互動。此類的例項由 Parallel 類提供給每個迴圈;不能在您的使用者程式碼中建立例項。

屬性:

1)public bool ShouldExitCurrentIteration { get; }

  獲取迴圈的當前迭代是否應基於此迭代或其他迭代發出的請求退出。如果當前迭代應退出,則為 true;否則為 false。

2) public bool IsStopped { get; } 

  獲取迴圈的任何迭代是否已呼叫 System.Threading.Tasks.ParallelLoopState.Stop。如果任何迭代已停止迴圈,則為 true;否則為 false。

3) public bool IsExceptional { get; }

  獲取迴圈的任何迭代是否已引發相應迭代未處理的異常。如果引發了未經處理的異常,則為 true;否則為 false。

4) public long? LowestBreakIteration { get; }

  獲取從中呼叫 System.Threading.Tasks.ParallelLoopState.Break 的最低迴圈迭代。一個表示從中呼叫 Break 的最低迭代的整數。

方法:(在下面方法介紹時,有進一步的介紹)

1)Break():通知並行迴圈在執行完當前迭代之後儘快停止執行,可確保低索引步驟完成。且可確保正在執行的迭代繼續執行直到完成。

2)Stop():通知並行迴圈儘快停止執行。對於尚未執行的迭代不能會嘗試執行低索引迭代。不保證所有已執行的迭代都執行完。

用途:提早退出並行迴圈。

說明:

1)不能同時在同一個並行迴圈中同時使用Break和Stop。

2)Stop比Break更常用。break語句用在並行迴圈中的效果和用在序列迴圈中不同。Break用在並行迴圈中,委託的主體方法在每次迭代的時候被呼叫,退出委託的主體方法對並行迴圈的執行沒有影響。Stop停止迴圈比Break快。

3、ParallelOptions:儲存用於配置  Parallel 類的方法的操作的選項。

屬性

1)public CancellationToken CancellationToken { get; set; }

  獲取或設定傳播有關應取消操作的通知。

2)public int MaxDegreeOfParallelism { get; set; }

  獲取或設定此 ParallelOptions 例項所允許的最大並行度。

3)public TaskScheduler TaskScheduler { get; set; } [沒用過,不知道功效]

  獲取或設定與此 System.Threading.Tasks.ParallelOptions 例項關聯的 System.Threading.Tasks.TaskScheduler

說明:

1)通過設定CancellationToken來取消並行迴圈,當前正在執行的迭代會執行完,然後丟擲System.OperationCanceledException型別的異常。

2)TPL的方法總是會試圖利用所有可用核心以達到最好的效果,但是很可能.NET Framework內部使用的啟發式演算法所得到的注入和使用的執行緒數比實際需要的多(通常都會高於硬體執行緒數,這樣會更好地支援CPU和I/O混合型的工作負載)。

   通常將最大並行度設定為小於等於邏輯核心數。如果設定為等於邏輯核心數,那麼要確保不會影響其他程式的執行。設定為小於邏輯核心數是為了有空閒核心來處理其他緊急的任務。

用途:

1)從迴圈外部取消並行迴圈

2)指定並行度

 三、方法介紹

1、Parallel.Invoke

1)public static void Invoke(params Action[] actions);儘可能並行執行提供的每個操作。

    public class Test
    {
        private void Action()
        {
            Thread.Sleep(1000);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(Action, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

執行結果:

注:Action()休眠1s,Action1()休眠2s,執行總耗時為2043ms,可以看做 nvoke方法只有在actions全部執行完才會返回,並且耗時取決於最大耗時的方法。

2)public static void Invoke(ParallelOptions parallelOptions, params Action[] actions);執行所提供的每個操作,而且儘可能並行執行,除非使用者取消了操作。

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();
        private void Action()
        {
            Thread.Sleep(1000);
            //標記取消並行操作
            parallelOptions.CancellationToken = new CancellationToken(true);
            Console.WriteLine("Action :ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        private void Action1()
        {
            Thread.Sleep(2000);
            Console.WriteLine("Action1:ThreadID-{0}", Thread.CurrentThread.ManagedThreadId);
        }
        public void Parallel_Invoke()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                Parallel.Invoke(parallelOptions, Action1, Action, Action1, Action1, Action1);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

說明:

1)Invoke方法只有在actions全部執行完才會返回。

2)不能保證actions中的所有操作同時執行。比如actions大小為4,但硬體執行緒數為2,那麼同時執行的運算元最多為2。

3)actions中的操作並行的執行且與順序無關,若編寫與執行順序有關的併發程式碼,應選擇其他方法。

4)如果使用Invoke載入多個操作,多個操作執行時間迥異,總的執行時間以消耗時間最長操作為基準,這會導致很多邏輯核心長時間處於空閒狀態。

 2、Parallel.For

 1)public static ParallelLoopResult For(int fromInclusive, int toExclusive, Action<int> body); 

    public class Test
    {
        private void Action(int i)
        {
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //fromInclusive: 開始索引(含)。
                //toExclusive: 結束索引(不含)。
                //body: 將為每個迭代呼叫一次的委託。
                Parallel.For(0, 5, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

執行結果:

注:可以看出,方法並不是順序執行

2)public static ParallelLoopResult For(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Action<int, ParallelLoopState> body);

使用ParallelLoopState.Break() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,我們呼叫Break()
            if (i > 5)
            {
                parallelLoopState.Break();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設定最大並行數為3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 執行結果:

使用ParallelLoopState.Stop() 退出迭代:

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,我們呼叫Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設定最大並行數為3
                parallelOptions.MaxDegreeOfParallelism = 3;
                Parallel.For(0, 10, parallelOptions, Action);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

 

執行結果:

注:當使用Break()退出迭代時,程式保證了索引小於5的方法都執行完成,即可確保低索引步驟完成;當使用Stop()退出迭代時,程式執行到索引為5時,就立即退出了(正在進行的迭代方法,會執行完成),不確保低索引執行完成。

3)public static ParallelLoopResult For<TLocal>(int fromInclusive, int toExclusive, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<int, ParallelLoopState, TLocal, TLocal> body, Action<TLocal> localFinally);

    public class Test
    {
        ParallelOptions parallelOptions = new ParallelOptions();

        private void Action(int i, ParallelLoopState parallelLoopState)
        {
            //當執行到 索引等於5時,我們呼叫Stop()
            if (i > 5)
            {
                parallelLoopState.Stop();
            }
            Console.WriteLine("Action :ThreadID-{0}|i={1}", Thread.CurrentThread.ManagedThreadId, i);
        }
        private string LocalInit()
        {
            var init = "go";
            Console.WriteLine("LocalInit:ThreadID-{0}|init={1}", Thread.CurrentThread.ManagedThreadId, init);
            return init;
        }
        private void LocalFinally(string x)
        {
            Console.WriteLine("LocalFinally:ThreadID-{0}|result={1}", Thread.CurrentThread.ManagedThreadId, x);
        }
        private string Body(int i, ParallelLoopState parallelLoopState, string x)
        {
            x = x + "_" + i;
            Console.WriteLine("Body:ThreadID-{0}|i={1}|x={2}", Thread.CurrentThread.ManagedThreadId, i, x);
            return x;
        }
        public void Parallel_For()
        {
            Console.WriteLine("開始:********");
            Stopwatch stopwatch = new Stopwatch();
            stopwatch.Start();
            try
            {
                //設定最大並行數為3
                parallelOptions.MaxDegreeOfParallelism = 3;
                //LocalInit: 用於返回每個任務的本地資料的初始狀態的函式委託。
                //Body: 將為每個迭代呼叫一次的委託。
                //LocalFinally: 用於對每個任務的本地狀態執行一個最終操作的委託。
                //<TLocal>: 執行緒本地資料的型別。
                Parallel.For(0, 5, parallelOptions, LocalInit, Body, LocalFinally);
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
            stopwatch.Stop();
            Console.WriteLine("結束:***{0}***", stopwatch.ElapsedMilliseconds);
        }
    }

執行結果:

注:LocalInit()執行了3次,Body()執行了5次,LocalFinally()執行了3次(這只是執行的一種情況),可以看出在同一執行緒中,init引數是共享傳遞的,即LocalInit()=>Body()..n次迭代..Body()=>LoaclFinally()

* localInit只是在每個 Task/Thread 開始參與到對集合元素的處理時執行一次, 【而不是針對每個集合元素都執行一次】類似的, localFinally只有在 Task/Thread 完成所有分配給它的任務之後,才被執行一次。
 CLR會為每個 Task/Thread 維護一個thread - local storage,可以理解為 Task/Thread 在整個執行過程中的狀態。 當一個 Task/Thread 參與到執行中時,localInit中返回的TLocal型別值會被作為這個狀態的初始值,隨著body的執行,
 這個狀態值會被改變,而body的返回型別也是TLocal,意味著每一次body執行結束,會把最新的TLocal值返回給CLR, 而CLR會把這個值設定到 Task/Thread 的thread - local storage上去,從而實現 Task/Thread 狀態的更新。
 最後,localFinally可以返回這個狀態值,作為 Task/Thread 完成它所負責的所有處理任務後的最終結果。

說明:

1)不支援浮點。

2)無法保證迭代的執行順序。

3)如果fromInclusive大於或等於toExclusive,方法立即返回而不會執行任何迭代。

4)對於body引數中含有的ParallelLoopState例項,其作用為提早中斷並行迴圈。

5)只有在迭代全部完成以後才會返回結果,否則迴圈將一直阻塞。

3、Parallel.ForEach 

1)public static ParallelLoopResult ForEach(IEnumerable<TSource> source, Action<TSource> body);

2)public static ParallelLoopResult ForEach<TSource>(IEnumerable<TSource> source, ParallelOptions parallelOptions, Action<TSource, ParallelLoopState> body);

3)public static ParallelLoopResult ForEach<TSource>(Partitioner<TSource> source, Action<TSource> body);

用法及其過載和Parallel.For不盡相同(把fromInclusive-toExclusive 換成你想遍歷的集合List),就不在這裡贅述了(關鍵是現在好餓啊...要去吃飯了)

四、異常處理

1)異常優先於從迴圈外部取消和使用Break()方法或Stop()方法提前退出並行迴圈。

2)並行迴圈體丟擲一個未處理的異常,並行迴圈就不能再開始新的迭代。

3)預設情況下當某次迭代丟擲一個未處理異常,那麼正在執行的迭代如果沒丟擲異常,正在執行的迭代會執行完。
***當所有迭代都執行完(有可能其他的迭代在執行的過程中也丟擲異常),並行迴圈將在呼叫它的執行緒中丟擲異常。
***並行迴圈執行的過程中,可能有多個迭代丟擲異常,所以一般使用AggregateException來捕獲異常。AggregateException繼承自Exception。
***為了防止僅使用AggregateException未能捕獲某些異常,使用AggregateException的同時還要使用Exception。

異常捕獲:

try
{
    //Do something
}
catch(AggregateException e)
{
    Foreach(Exception ex in e.InnerExceptions)
    {
        //Do something
    }
}
catch(Exception e)
{
    //Do something
}

五、總結

1.Parallel執行方法組,不是順序的,和方法位置先後,索引大小無關;

2.迭代全部完成以後才會返回結果(前提沒有Break、Stop、Exception),否則迴圈將一直阻塞;

3.總體耗時一般取決於耗時最長的方法;

4.某一方法出現異常,程式將停止新的迭代,當前正在進行的方法,會繼續執行完成,後丟擲異常。

5.可以去吃一碗牛肉麵了...