Python3 與 C# 並發編程之~ 上篇
NetCore並發編程
示例代碼:https://github.com/lotapp/BaseCode/tree/master/netcore/4_Concurrency
先簡單說下概念(其實之前也有說,所以簡說下):
- 並發:同時做多件事情
- 多線程:並發的一種形式
- 並行處理:多線程的一種(線程池產生的一種並發類型,eg:異步編程)
- 響應式編程:一種編程模式,對事件進行響應(有點類似於JQ的事件)
Net裏面很少用進程,在以前基本上都是線程+池+異步+並行+協程
我這邊簡單引入一下,畢竟主要是寫Python的教程,Net只是幫你們回顧一下,如果你發現還沒聽過這些概念,或者你的項目中還充斥著各種Thread
和ThreadPool
https://docs.microsoft.com/zh-cn/dotnet/standard/parallel-processing-and-concurrency
1.異步編程(Task)
Task的目的其實就是為了簡化Thread
和ThreadPool
的代碼,下面一起看看吧:
異步用起來比較簡單,一般IO,DB,Net用的比較多,很多時候都會采用重試機制,舉個簡單的例子:
/// <summary> /// 模擬一個網絡操作(別忘了重試機制) /// </summary> /// <param name="url">url</param> /// <returns></returns> private async static Task<string> DownloadStringAsync(string url) { using (var client = new HttpClient()) { // 設置第一次重試時間 var nextDelay = TimeSpan.FromSeconds(1); for (int i = 0; i < 3; i++) { try { return await client.GetStringAsync(url); } catch { } await Task.Delay(nextDelay); // 用異步阻塞的方式防止服務器被太多重試給阻塞了 nextDelay *= 2; // 3次重試機會,第一次1s,第二次2s,第三次4s } // 最後一次嘗試,錯誤就拋出 return await client.GetStringAsync(url); } }
然後補充說下Task異常的問題,當你await的時候如果有異常會拋出,在第一個await處捕獲處理即可
如果async
和await
就是理解不了的可以這樣想:async
就是為了讓await
生效(為了向後兼容)
對了,如果返回的是void,你設置成Task就行了,觸發是類似於事件之類的方法才使用void,不然沒有返回值都是使用Task
項目裏經常有這麽一個場景:等待一組任務完成後再執行某個操作,看個引入案例:
/// <summary> /// 1.批量任務 /// </summary> /// <param name="list"></param> /// <returns></returns> private async static Task<string[]> DownloadStringAsync(IEnumerable<string> list) { using (var client = new HttpClient()) { var tasks = list.Select(url => client.GetStringAsync(url)).ToArray(); return await Task.WhenAll(tasks); } }
再舉一個場景:同時調用多個同效果的API,有一個返回就好了,其他的忽略
/// <summary>
/// 2.返回首先完成的Task
/// </summary>
/// <param name="list"></param>
/// <returns></returns>
private static async Task<string> GetIPAsync(IEnumerable<string> list)
{
using (var client = new HttpClient())
{
var tasks = list.Select(url => client.GetStringAsync(url)).ToArray();
var task = await Task.WhenAny(tasks); // 返回第一個完成的Task
return await task;
}
}
一個async方法被await調用後,當它恢復運行時就會回到原來的上下文中運行。
如果你的Task不再需要上下文了可以使用:task.ConfigureAwait(false)
,eg:寫個日記還要啥上下文?
逆天的建議是:在核心代碼裏面一種使用ConfigureAwait
,用戶頁面相關代碼,不需要上下文的加上
其實如果有太多await在上下文裏恢復那也是比較卡的,使用ConfigureAwait
之後,被暫停後會在線程池裏面繼續運行
再看一個場景:比如一個耗時操作,我需要指定它的超時時間:
/// <summary>
/// 3.超時取消
/// </summary>
/// <returns></returns>
private static async Task<string> CancellMethod()
{
//實例化取消任務
var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(3)); // 設置失效時間為3s
try
{
return await DoSomethingAsync(cts.Token);
}
// 任務已經取消會引發TaskCanceledException
catch (TaskCanceledException ex)
{
return "false";
}
}
/// <summary>
/// 模仿一個耗時操作
/// </summary>
/// <returns></returns>
private static async Task<string> DoSomethingAsync(CancellationToken token)
{
await Task.Delay(TimeSpan.FromSeconds(5), token);
return "ok";
}
異步這塊簡單回顧就不說了,留兩個擴展,你們自行探討:
- 進度方面的可以使用
IProgress<T>
,就當留個作業自己摸索下吧~ - 使用了異步之後盡量避免使用
task.Wait
ortask.Result
,這樣可以避免死鎖
Task其他新特征去官網看看吧,引入到此為止了。
2.並行編程(Parallel)
這個其實出來很久了,現在基本上都是用PLinq
比較多點,主要就是:
- 數據並行:重點在處理數據(eg:聚合)
- 任務並行:重點在執行任務(每個任務塊盡可能獨立,越獨立效率越高)
數據並行
以前都是Parallel.ForEach
這麽用,現在和Linq結合之後非常方便.AsParallel()
就OK了
說很抽象看個簡單案例:
static void Main(string[] args)
{
IEnumerable<int> list = new List<int>() { 1, 2, 3, 4, 5, 7, 8, 9 };
foreach (var item in ParallelMethod(list))
{
Console.WriteLine(item);
}
}
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
return list.AsParallel().Select(x => x * x);
}
正常執行的結果應該是:
1
4
9
25
64
16
49
81
並行之後就是這樣了(不管順序了):
25
64
1
9
49
81
4
16
當然了,如果你就是對順序有要求可以使用:.AsOrdered()
/// <summary>
/// 舉個例子
/// </summary>
private static IEnumerable<int> ParallelMethod(IEnumerable<int> list)
{
return list.AsParallel().AsOrdered().Select(x => x * x);
}
其實實際項目中,使用並行的時候:任務時間適中,太長不適合,太短也不適合
記得大家在項目裏經常會用到如Sum
,Count
等聚合函數,其實這時候使用並行就很合適
var list = new List<long>();
for (long i = 0; i < 1000000; i++)
{
list.Add(i);
}
Console.WriteLine(GetSumParallel(list));
private static long GetSumParallel(IEnumerable<long> list)
{
return list.AsParallel().Sum();
}
time dotnet PLINQ.dll
499999500000
real 0m0.096s
user 0m0.081s
sys 0m0.025s
不使用並行:(稍微多了點,CPU越密集差距越大)
499999500000
real 0m0.103s
user 0m0.092s
sys 0m0.021s
其實聚合有一個通用方法,可以支持復雜的聚合:(以上面sum為例)
.Aggregate(
seed:0,
func:(sum,item)=>sum+item
);
稍微擴展一下,PLinq也是支持取消的,.WithCancellation(CancellationToken)
Token的用法和上面一樣,就不復述了,如果需要和異步結合,一個Task.Run
就可以把並行任務交給線程池了
也可以使用Task的異步方法,設置超時時間,這樣PLinq超時了也就終止了
PLinq這麽方便,其實也是有一些小弊端的,比如它會直接最大程度的占用系統資源,可能會影響其他的任務,而傳統的Parallel則會動態調整
任務並行(並行調用)
這個PLinq好像沒有對應的方法,有新語法你可以說下,來舉個例子:
await Task.Run(() =>
Parallel.Invoke(
() => Task.Delay(TimeSpan.FromSeconds(3)),
() => Task.Delay(TimeSpan.FromSeconds(2))
));
取消也支持:
Parallel.Invoke(new ParallelOptions() { CancellationToken = token }, actions);
擴充說明
其實還有一些比如數據流和響應編程沒說,這個之前都是用第三方庫,剛才看官網文檔,好像已經支持了,所以就不賣弄了,感興趣的可以去看看,其實項目裏面有流數據相關的框架,eg:Spark
,都是比較成熟的解決方案了基本上也不太使用這些了。
然後還有一些沒說,比如NetCore裏面不可變類型(列表、字典、集合、隊列、棧、線程安全字典等等)以及限流、任務調度等,這些關鍵詞我提一下,也方便你去搜索自己學習拓展
先到這吧,其他的自己探索一下吧,最後貼一些Nuget庫,你可以針對性的使用:
數據流:Microsoft.Tpl.Dataflow
響應編程(Linq的Rx操作):Rx-Main
不可變類型:Microsoft.Bcl.Immutable
Python3 與 C# 並發編程之~ 上篇