1. 程式人生 > >Python3 與 C# 並發編程之~ 上篇

Python3 與 C# 並發編程之~ 上篇

動態 ken summary using 任務調度 影響 特征 可能 arp

NetCore並發編程

示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency

先簡單說下概念(其實之前也有說,所以簡說下):

  1. 並發:同時做多件事情
  2. 多線程:並發的一種形式
  3. 並行處理:多線程的一種(線程池產生的一種並發類型,eg:異步編程
  4. 響應式編程:一種編程模式,對事件進行響應(有點類似於JQ的事件)

Net裏面很少用進程,在以前基本上都是線程+池+異步+並行+協程

我這邊簡單引入一下,畢竟主要是寫Python的教程,Net只是幫你們回顧一下,如果你發現還沒聽過這些概念,或者你的項目中還充斥著各種ThreadThreadPool

的話,真的得系統的學習一下了,現在官網的文檔已經很完善了,記得早幾年啥都沒有,也只能挖那些外國開源項目:

https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency

1.異步編程(Task)

Task的目的其實就是為了簡化ThreadThreadPool的代碼,下面一起看看吧:

異步用起來比較簡單,一般IO,DB,Net用的比較多,很多時候都會采用重試機制,舉個簡單的例子:

/// <summary>
/// 模擬一個網絡操作(別忘了重試機制)
/// </summary>
/// <param name="url">url</param>
/// <returns></returns>
private async static Task<string> DownloadStringAsync(string url)
{
    using (var client = new HttpClient())
    {
        // 設置第一次重試時間
        var nextDelay = TimeSpan.FromSeconds(1);
        for (int i = 0; i < 3; i++)
        {
            try
            {
                return await client.GetStringAsync(url);
            }
            catch { }
            await Task.Delay(nextDelay); // 用異步阻塞的方式防止服務器被太多重試給阻塞了
            nextDelay *= 2; // 3次重試機會,第一次1s,第二次2s,第三次4s
        }
        // 最後一次嘗試,錯誤就拋出
        return await client.GetStringAsync(url);
    }
}

然後補充說下Task異常的問題,當你await的時候如果有異常會拋出,在第一個await處捕獲處理即可

如果asyncawait就是理解不了的可以這樣想:async就是為了讓await生效(為了向後兼容)

對了,如果返回的是void,你設置成Task就行了,觸發是類似於事件之類的方法才使用void,不然沒有返回值都是使用Task

項目裏經常有這麽一個場景:等待一組任務完成後再執行某個操作,看個引入案例:

/// <summary>
/// 1.批量任務
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        return await Task.WhenAll(tasks);
    }
}

再舉一個場景:同時調用多個同效果的API,有一個返回就好了,其他的忽略

/// <summary>
/// 2.返回首先完成的Task
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private static async Task<string> GetIPAsync(IEnumerable<string> list)
{
    using (var client = new HttpClient())
    {
        var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
        var task = await Task.WhenAny(tasks); // 返回第一個完成的Task
        return await task;
    }
}

一個async方法被await調用後,當它恢復運行時就會回到原來的上下文中運行。

如果你的Task不再需要上下文了可以使用:task.ConfigureAwait(false),eg:寫個日記還要啥上下文?

逆天的建議是:在核心代碼裏面一種使用ConfigureAwait,用戶頁面相關代碼,不需要上下文的加上

其實如果有太多await在上下文裏恢復那也是比較卡的,使用ConfigureAwait之後,被暫停後會在線程池裏面繼續運行

再看一個場景:比如一個耗時操作,我需要指定它的超時時間:

 /// <summary>
/// 3.超時取消
/// </summary>
/// <returns></returns>
private static async Task<string> CancellMethod()
{
    //實例化取消任務
    var cts = new CancellationTokenSource();
    cts.CancelAfter(TimeSpan.FromSeconds(3)); // 設置失效時間為3s
    try
    {
        return await DoSomethingAsync(cts.Token);
    }
    // 任務已經取消會引發TaskCanceledException
    catch (TaskCanceledException ex)
    {

        return "false";
    }
}
/// <summary>
/// 模仿一個耗時操作
/// </summary>
/// <returns></returns>
private static async Task<string> DoSomethingAsync(CancellationToken token)
{
    await Task.Delay(TimeSpan.FromSeconds(5), token);
    return "ok";
}

異步這塊簡單回顧就不說了,留兩個擴展,你們自行探討:

  1. 進度方面的可以使用IProgress<T>,就當留個作業自己摸索下吧~
  2. 使用了異步之後盡量避免使用task.Wait or task.Result,這樣可以避免死鎖

Task其他新特征去官網看看吧,引入到此為止了。


2.並行編程(Parallel)

這個其實出來很久了,現在基本上都是用PLinq比較多點,主要就是:

  1. 數據並行:重點在處理數據(eg:聚合)
  2. 任務並行:重點在執行任務(每個任務塊盡可能獨立,越獨立效率越高)

數據並行

以前都是Parallel.ForEach這麽用,現在和Linq結合之後非常方便.AsParallel()就OK了

說很抽象看個簡單案例:

static void Main(string[] args)
{
    IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 };
    foreach (var item in ParallelMethod(list))
    {
        Console.WriteLine(item);
    }
}
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().Select(x => x * x);
}

正常執行的結果應該是:

1
4
9
25
64
16
49
81

並行之後就是這樣了(不管順序了):

25
64
1
9
49
81
4
16

當然了,如果你就是對順序有要求可以使用:.AsOrdered()

/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
    return list.AsParallel().AsOrdered().Select(x => x * x);
}

其實實際項目中,使用並行的時候:任務時間適中,太長不適合,太短也不適合

記得大家在項目裏經常會用到如SumCount等聚合函數,其實這時候使用並行就很合適

var list = new List<long>();
for (long i = 0; i < 1000000; i++)
{
    list.Add(i);
}
Console.WriteLine(GetSumParallel(list));
private static long GetSumParallel(IEnumerable<long> list)
{
    return list.AsParallel().Sum();
}

time dotnet PLINQ.dll

499999500000

real    0m0.096s
user    0m0.081s
sys 0m0.025s

不使用並行:(稍微多了點,CPU越密集差距越大)

499999500000

real    0m0.103s
user    0m0.092s
sys 0m0.021s

其實聚合有一個通用方法,可以支持復雜的聚合:(以上面sum為例)

.Aggregate(
            seed:0,
            func:(sum,item)=>sum+item
          );

稍微擴展一下,PLinq也是支持取消的,.WithCancellation(CancellationToken)

Token的用法和上面一樣,就不復述了,如果需要和異步結合,一個Task.Run就可以把並行任務交給線程池了

也可以使用Task的異步方法,設置超時時間,這樣PLinq超時了也就終止了

PLinq這麽方便,其實也是有一些小弊端的,比如它會直接最大程度的占用系統資源,可能會影響其他的任務,而傳統的Parallel則會動態調整


任務並行(並行調用)

這個PLinq好像沒有對應的方法,有新語法你可以說下,來舉個例子:

await Task.Run(() =>
    Parallel.Invoke(
        () => Task.Delay(TimeSpan.FromSeconds(3)),
        () => Task.Delay(TimeSpan.FromSeconds(2))
    ));

取消也支持:

Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);

擴充說明

其實還有一些比如數據流響應編程沒說,這個之前都是用第三方庫,剛才看官網文檔,好像已經支持了,所以就不賣弄了,感興趣的可以去看看,其實項目裏面有流數據相關的框架,eg:Spark,都是比較成熟的解決方案了基本上也不太使用這些了。

然後還有一些沒說,比如NetCore裏面不可變類型(列表、字典、集合、隊列、棧、線程安全字典等等)以及限流任務調度等,這些關鍵詞我提一下,也方便你去搜索自己學習拓展

先到這吧,其他的自己探索一下吧,最後貼一些Nuget庫,你可以針對性的使用:

數據流Microsoft.Tpl.Dataflow
響應編程(Linq的Rx操作):Rx-Main
不可變類型Microsoft.Bcl.Immutable

Python3 與 C# 並發編程之~ 上篇