1. 程式人生 > >我是如何一步步的在並行程式設計中將lock鎖次數降到最低實現無鎖程式設計

我是如何一步步的在並行程式設計中將lock鎖次數降到最低實現無鎖程式設計

在並行程式設計中,經常會遇到多執行緒間操作共享集合的問題,很多時候大家都很難逃避這個問題做到一種無鎖程式設計狀態,你也知道一旦給共享集合套上lock之後,併發和伸縮能力往往會造成很大影響,這篇就來談談如何儘可能的減少lock鎖次數甚至沒有。

一:緣由

1. 業務背景

昨天在review程式碼的時候,看到以前自己寫的這麼一段程式碼,精簡後如下:

        private static List<long> ExecuteFilterList(int shopID, List<MemoryCacheTrade> trades, List<FilterConditon> filterItemList, MatrixSearchContext searchContext)
        {
            var customerIDList = new List<long>();

            var index = 0;

            Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 4 },
                            (filterItem) =>
            {
                var context = new FilterItemContext()
                {
                    StartTime = searchContext.StartTime,
                    EndTime = searchContext.EndTime,
                    ShopID = shopID,
                    Field = filterItem.Field,
                    FilterType = filterItem.FilterType,
                    ItemList = filterItem.FilterValue,
                    SearchList = trades.ToList()
                };

                var smallCustomerIDList = context.Execute();

                lock (filterItemList)
                {
                    if (index == 0)
                    {
                        customerIDList.AddRange(smallCustomerIDList);
                        index++;
                    }
                    else
                    {
                        customerIDList = customerIDList.Intersect(smallCustomerIDList).ToList();
                    }
                }
            });

            return customerIDList;
        }

這段程式碼實現的功能是這樣的,filterItemList承載著所有原子化的篩選條件,然後用多執行緒的形式併發執行裡面的item,最後將每個item獲取的客戶人數集合在高層進行整體求交,畫個簡圖就是下面這樣。

2. 問題分析

其實這程式碼存在著一個很大的問題,在Parallel中直接使用lock鎖的話,filterItemList有多少個,我的lock就會鎖多少次,這對併發和伸縮性是有一定影響的,現在就來想想怎麼優化吧!

3. 測試案例

為了方便演示,我模擬了一個小案例,方便大家看到實時結果,修改後的程式碼如下:

        public static void Main(string[] args)
        {
            var filterItemList = new List<string>() { "conditon1", "conditon2", "conditon3", "conditon4", "conditon5", "conditon6" };
            ParallelTest1(filterItemList);
        }

        public static void ParallelTest1(List<string> filterItemList)
        {
            var totalCustomerIDList = new List<int>();

            bool isfirst = true;

            Parallel.ForEach(filterItemList, new ParallelOptions() { MaxDegreeOfParallelism = 2 }, (query) =>
            {
                var smallCustomerIDList = GetCustomerIDList(query);

                lock (filterItemList)
                {
                    if (isfirst)
                    {
                        totalCustomerIDList.AddRange(smallCustomerIDList);
                        isfirst = false;
                    }
                    else
                    {
                        totalCustomerIDList = totalCustomerIDList.Intersect(smallCustomerIDList).ToList();
                    }

                    Console.WriteLine($"{DateTime.Now} 被鎖了");
                }
            });

            Console.WriteLine($"最後交集客戶ID:{string.Join(",", totalCustomerIDList)}");
        }

        public static List<int> GetCustomerIDList(string query)
        {
            var dict = new Dictionary<string, List<int>>()
            {
                ["conditon1"] = new List<int>() { 1, 2, 4, 7 },
                ["conditon2"] = new List<int>() { 1, 4, 6, 7 },
                ["conditon3"] = new List<int>() { 1, 4, 5, 7 },
                ["conditon4"] = new List<int>() { 1, 2, 3, 7 },
                ["conditon5"] = new List<int>() { 1, 2, 4, 5, 7 },
                ["conditon6"] = new List<int>() { 1, 3, 4, 7, 9 },
            };

            return dict[query];
        }

------ output ------
2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了
2020/04/21 15:53:34 被鎖了
最後交集客戶ID:1,7

二:第一次優化

從結果中可以看到,filterItemList有6個,鎖次數也是6次,那如何降低呢? 其實實現Parallel程式碼的FCL大神也考慮到了這個問題,從底層給了一個很好的過載,如下所示:


public static ParallelLoopResult ForEach<TSource, TLocal>(OrderablePartitioner<TSource> source, ParallelOptions parallelOptions, Func<TLocal> localInit, Func<TSource, ParallelLoopState, long, TLocal, TLocal> body, Action<TLocal> localFinally);

這個過載很特別,多了兩個引數localInit和localFinally,過會說一下什麼意思,先看修改後的程式碼體會一下


        public static void ParallelTest2(List<string> filterItemList)
        {
            var totalCustomerIDList = new List<int>();
            var isfirst = true;

            Parallel.ForEach<string, List<int>>(filterItemList,
              new ParallelOptions() { MaxDegreeOfParallelism = 2 },
              () => { return null; },
             (query, loop, index, smalllist) =>
             {
                 var smallCustomerIDList = GetCustomerIDList(query);

                 if (smalllist == null) return smallCustomerIDList;

                 return smalllist.Intersect(smallCustomerIDList).ToList();
             },
            (finalllist) =>
            {
                lock (filterItemList)
                {
                    if (isfirst)
                    {
                        totalCustomerIDList.AddRange(finalllist);
                        isfirst = false;
                    }
                    else
                    {
                        totalCustomerIDList = totalCustomerIDList.Intersect(finalllist).ToList();
                    }
                    Console.WriteLine($"{DateTime.Now} 被鎖了");
                }
            });
            Console.WriteLine($"最後交集客戶ID:{string.Join(",", totalCustomerIDList)}");
        }

------- output ------
2020/04/21 16:11:46 被鎖了
2020/04/21 16:11:46 被鎖了
最後交集客戶ID:1,7
Press any key to continue . . .

很好,這次優化將lock次數從6次降到了2次,這裡我用了 new ParallelOptions() { MaxDegreeOfParallelism = 2 } 設定了併發度為最多2個CPU核,程式跑起來後會開兩個執行緒,將一個大集合劃分為2個小集合,相當於1個集合3個條件,第一個執行緒在執行3個條件的起始處會執行你的localInit函式,在3個條件迭代完之後再執行你的localFinally,第二個執行緒也是按照同樣方式執行自己的3個條件,說的有點晦澀,畫一張圖說明吧。

三: 第二次優化

如果你瞭解Task<T>這種帶有返回值的Task,這就好辦了,多少個filterItemList就可以開多少個Task,反正Task底層是使用執行緒池承載的,所以不用怕,這樣就完美的實現無鎖程式設計。


        public static void ParallelTest3(List<string> filterItemList)
        {
            var totalCustomerIDList = new List<int>();
            var tasks = new Task<List<int>>[filterItemList.Count];

            for (int i = 0; i < filterItemList.Count; i++)
            {
                tasks[i] = Task.Factory.StartNew((query) =>
                {
                    return GetCustomerIDList(query.ToString());
                }, filterItemList[i]);
            }

            Task.WaitAll(tasks);

            for (int i = 0; i < tasks.Length; i++)
            {
                var smallCustomerIDList = tasks[i].Result;
                if (i == 0)
                {
                    totalCustomerIDList.AddRange(smallCustomerIDList);
                }
                else
                {
                    totalCustomerIDList = totalCustomerIDList.Intersect(smallCustomerIDList).ToList();
                }
            }

            Console.WriteLine($"最後交集客戶ID:{string.Join(",", totalCustomerIDList)}");
        }

------ output -------

最後交集客戶ID:1,7
Press any key to continue . . .

四:總結

我們將原來的6個lock優化到了無鎖程式設計,但並不說明無鎖程式設計就一定比帶有lock的效率高,大家要結合自己的使用場景合理的使用和混合搭配。

好了,本篇就說到這裡,希望對您有幫助。


如您有更多問題與我互動,掃描下方進來吧~