1. 程式人生 > >C# 多執行緒七之Parallel

C# 多執行緒七之Parallel

1、簡介

關於Parallel不想說太多,因為它是Task的語法糖,至少我是這麼理解的,官方文件也是這麼說的,它本身就是基本Task的.假設我們有一個集合,不管是什麼集合,我們要遍歷它,首先想到的是For(如何涉及到修改或者讀可以用for)或者Foreach(如果單純的讀),但是它兩是同步的去操作集合,但是使用Parallel的靜態For或者Foreach那就可以讓多個執行緒參與這個工作,這樣就能充分的利用CPU,但是你需要考慮CPU上下文產生的效能消耗,以及Parallel本身的效能消耗,所以,這也能解釋為什麼,你的迴圈裡面執行的是不耗時的操作,使用for或者foreach的速度比使用Parallel的要快,所以使用Parallel還是要慎重.而且使用Parallel還需要注意的一點就是,不能有多執行緒爭用問題,就是你的迴圈體裡面不能有操作靜態資源的操作.如果真的需要,那你可以加鎖,但是那就失去它的優勢了.

 

2、使用注意點

(1)、不能操作共享資源,程式碼如下:

        static int shareData = 10;
        static void Main(string[] args)
        {
            Parallel.For(1, 100000, i => Add(i));
            Console.Write(shareData);
            Console.ReadKey();
        }

        static void Add(int i)
        {
            shareData 
+= i; }

程式碼邏輯很簡單,1+2+3+......+100000這個過程中每次都加一個10,怎麼說都是正數,但是,信不信它能給你加出個負數來(可能是記憶體溢位了),而且每次的結果都不一樣(重要)!自己試試吧!

 

解決方案很簡單,加個鎖,程式碼如下:

    class ParallerStudy
    {
        static int shareData = 10;
        static object lockObj = new object();
        static void Main(string[] args)
        {
            Parallel.For(
1, 100000, i => Add(i)); Console.Write(shareData); Console.ReadKey(); } static void Add(int i) { lock (lockObj) { shareData += i; } } }

這個肯定是正確的值,因為每次的輸出都是這個,這裡因為如果給迴圈的最終值設小的話,他好像是同步去做了,不會有問題,所以這裡給了個100000,這個時候它會開多個執行緒去做.

 

(2)、它可以向Task一樣丟擲異常,都是AggregateException,程式碼如下:

這裡就給截圖了,不寫程式碼了.

 

(3)、效能開銷

這個不用多說,它是基於Task,開銷還是有的,如果不清楚,去看我前面的文章

 

(4)、支援取消

 

取消貌似只能取消整個Parallel運算,不支援取消內部的方法,我試了不行,而且必須在執行Parallel之前取消它,之後都不行.很其怪,可能我的呼叫方式有問題,如果你們有好的方法,歡迎在下面評論.

 

(4)、可以設定最多的執行緒數 

實戰中有演示

 

(5)、排程器

這裡就不介紹了,後續的隨筆中會介紹

 

(6)、三個重要的委託

實戰中有演示 

 

3、實戰

(1)、下面寫個使用Parallel多執行緒去讀檔案的例子

程式碼如下:

    class ParallerStudy
    {
        static void Main(string[] args)
        {
            var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test";
            var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly);
            Console.WriteLine("{0}目錄下所有的檔案長度總和為:{1}", targetPath, totalLength);
            Console.ReadKey();
        }

        /// <summary>
        /// 多執行緒讀取多個檔案的內容
        /// </summary>
        /// <param name="i"></param>
        /// <returns></returns>
        static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions)
        {
            var opts = new ParallelOptions();
            //只允許開六個執行緒去做這個事情
            opts.MaxDegreeOfParallelism =3;
            var files = Directory.EnumerateFiles(path);
            long totalFileLength = 0;
            Parallel.ForEach<string, long>(
                files,
                opts,
                //初始委託,該方法會線上程執行主要任務前執行,可用於引數校驗等操作
                () =>
                {
                    Console.WriteLine("開啟讀取檔案,當前執行緒Id為{0}", Thread.CurrentThread.ManagedThreadId);
                    return 0;
                },
                //主體委託,開始幹正事
                (file, loopstate, index, taskLocalCount) =>
                {
                    long fileLength = 0;
                    FileStream fs = null;
                    try
                    {
                        fs = File.OpenRead(file);
                        fileLength = fs.Length;
                        Console.WriteLine("當前執行緒Id為{1},當前檔名為{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name);

                    }
                    catch (IOException) { }//排除拒絕訪問的檔案
                    finally
                    {
                        if (fs != null)
                            fs.Dispose();
                    }
                    return taskLocalCount + fileLength;
                },
                //終結委託,一般用於彙總主體委託的結果值,所以如果這裡涉及訪問共享資源的話,一般會用同步構造,也就是加鎖等操作
                taskLocalCount =>
                {
                    //taskLocalCount=taskLocalCount + fileLength,單個檔案的長度
                    //同步構造,不需要加鎖,當每個執行緒讀取完對應檔案的長度後,將長度加到totalFileLength中,這個時候多個執行緒訪問這個變數可能會出現
                    //多執行緒爭用問題,但是使用Interlocked.Add相當與給totalFileLength加鎖
                    Interlocked.Add(ref totalFileLength, taskLocalCount);
                });
            return totalFileLength;
        }
    }

看這個例子前,還在想真的有這麼厲害嗎?其實也就那樣,根據輸出可以發現,一個開了3個執行緒,去讀10個檔案,我還在想這裡面會不會有多執行緒爭用問題,但是沒有,你看它怎麼做的,每個執行緒只會去讀一個檔案,讀的快的,立即去讀另外的檔案,我執行了N次,發現並沒有一個檔案多個執行緒讀的問題,所以每個執行緒只會去讀一個檔案,自然就不會有多執行緒爭用問題了.

 

(2)、關於ParallelLoopState的用法

Stop()和Break方法最常用,當子任務處理批量的任務時,如果滿足某種條件,則告訴其餘的任務不需要在處理了.

這個物件主要用於Parallel開啟的子任務群,它們內部之間的交流,程式碼如下:

        static void Main(string[] args)
        {
            var targetPath = $"{AppDomain.CurrentDomain.BaseDirectory}Test";
            var totalLength = DictionaryFilesContent(targetPath, "", SearchOption.TopDirectoryOnly);
            Console.WriteLine("{0}目錄下所有的檔案長度總和為:{1}", targetPath, totalLength);
            Console.ReadKey();
        }

        /// <summary>
        /// 多執行緒讀取多個檔案的內容
        /// </summary>
        /// <param name="i"></param>
        /// <returns></returns>
        static long DictionaryFilesContent(string path, string searchPattern, SearchOption searchOptions)
        {
            var opts = new ParallelOptions();
            //只允許開六個執行緒去做這個事情
            opts.MaxDegreeOfParallelism =3;
            var files = Directory.EnumerateFiles(path);
            long totalFileLength = 0;
            Parallel.ForEach<string, long>(
                files,
                opts,

                //初始委託,該方法會線上程執行主要任務前執行,可用於引數校驗等操作
                () =>
                {
                    Console.WriteLine("開啟讀取檔案,當前執行緒Id為{0}", Thread.CurrentThread.ManagedThreadId);
                    return 0;
                },
                //主體委託,開始幹正事
                (file, loopstate, index, taskLocalCount) =>
                {
                    long fileLength = 0;
                    FileStream fs = null;
                    try
                    {
                        //處理完第3項,就不要在處理了,這個第三項的意思是不是第三個檔案,也可能是第五個檔案
                        if (index == 3)
                        {
                            loopstate.Stop();
                        }
                        fs = File.OpenRead(file);
                        fileLength = fs.Length;
                        Console.WriteLine("當前執行緒Id為{1},當前檔名為{2}", index, Thread.CurrentThread.ManagedThreadId, fs.Name);

                    }
                    catch (IOException) { }//排除拒絕訪問的檔案
                    finally
                    {
                        if (fs != null)
                            fs.Dispose();
                    }
                    return taskLocalCount + fileLength;
                },
                //終結委託,一般用於彙總主體委託的結果值,所以如果這裡涉及訪問共享資源的話,一般會用同步構造,也就是加鎖等操作
                taskLocalCount =>
                {
                    //taskLocalCount=taskLocalCount + fileLength,單個檔案的長度
                    //同步構造,不需要加鎖,當每個執行緒讀取完對應檔案的長度後,將長度加到totalFileLength中,這個時候多個執行緒訪問這個變數可能會出現
                    //多執行緒爭用問題,但是使用Interlocked.Add相當與給totalFileLength加鎖
                    Interlocked.Add(ref totalFileLength, taskLocalCount);
                });
            return totalFileLength;
        }

 

 還有其它的一些用法,這裡就不介紹了,Api裡面都有介紹.

 

(3)、Parallel的返回值

就說一個LowestBreakIteration,如果這個返回值為null,說明子任務群有個呼叫了Stop方法,如果不為null,說明有個呼叫了Break方法且值為呼叫Break任務對應的Index值.