1. 程式人生 > >二、並行編程 - Task任務

二、並行編程 - Task任務

enter {0} window 通過 傳統 art gre div tel

初識Task

兩種構建Task的方式,只是StartNew方法直接構建出了一個Task之後又調用了其Start方法。

    Task.Factory.
StartNew
(() =>
    {
        Console.WriteLine("Hello word!");
    });

    Task task = new Task(() =>
    {
        Console.WriteLine("Hello,Word!");
    });
    task.Start();

在Task內部執行的內容我們稱作為Task的Body,Task提供了多個初始化重載的方法。

public Task(Action action);
public Task(Action<object> action, object state);
public Task(Action action, CancellationToken cancellationToken);
public Task(Action action, TaskCreationOptions creationOptions);

例如使用了重載方法的State參數:

    Task task2 = new Task((
obj
) => { Console.WriteLine("
Message: {0}", obj); }, "Say \"Hello\" from task2"); task2.Start();

任務返回值

使用返回值的Result屬性可獲取是在一個Task運行完成才會獲取的,所以task2是在task1運行完成後,才開始運行,也就是說上面的兩個result的值不管運行多少次都是不會變的。其中我們也可以通過CurrentId來獲取當前運行的Task的編號。

var loop = 0;
    var task1 = new Task<int>(() => 
    {
        for (var i = 0
; i < 1000; i++) loop += i; return loop; }); task1.Start(); var loopResut = task1.Result; var task2 = new Task<long>(obj=> { long res = 0; var looptimes = (int)obj; for (var i = 0; i < looptimes; i++) res += i; return res; },loopResut); task2.Start(); var resultTask2 = task2.Result; Console.WriteLine("任務1的結果‘:{0}\n任務2的結果:{1}", loopResut,resultTask2);

任務延續

所謂的延續的Task就是在第一個Task完成後自動啟動下一個Task。我們通過ContinueWith方法來創建延續的Task。我們假設有一個接受xml解析的服務,首先從某個地方接受文件,然後解析入庫,最後發送是否解析正確的回執。在每次調用ContinueWith方法時,每次會把上次Task的引用傳入進來,以便檢測上次Task的狀態,比如我們可以使用上次Task的Result屬性來獲取返回值。

    var 
ReceiveTask
 = new Task(() => ReceiveXml());
    var ResolveTask = 
ReceiveTask
.
ContinueWith
<bool>((r) => ResolveXml());
    var SendFeedBackTask = ResolveTask.
ContinueWith
<string>((s) => SendFeedBack(s.Result));
    ReceiveTask.Start();
    Console.WriteLine(SendFeedBackTask.Result);

上面的代碼我們也可以這麽寫:

   var SendFeedBackTask = Task.Factory.StartNew(() => ReceiveXml())
                            .ContinueWith<bool>(s => ResolveXml())
                            .ContinueWith<string>(r => SendFeedBack(r.Result));
    Console.WriteLine(SendFeedBackTask.Result);

分離嵌套任務

有些情況下我們需要創建嵌套的Task,嵌套裏面又分為分離的和不分離的。其創建的方式很簡單,就是在Task的body裏面創建一個新的Task。如果新的Task未指定AttachedToParent選項,那麽就是分離嵌套的。我們看下面這段代碼。下面的代碼中outTask.Wait()表示等待outTask執行完成。

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    });
});
outTask.Wait();
Console.WriteLine("Outer task completed.");
Console.ReadKey();

我們可以看到運行結果是:

技術分享圖片

子任務

我們將上面的代碼加上TaskCreationOptions選項:

var outTask = Task.Factory.StartNew(() =>
{
    Console.WriteLine("Outer task beginning...");
    var childTask = Task.Factory.StartNew(() =>
    {
        Thread.SpinWait(3000000);
        Console.WriteLine("Detached nested task completed.");
    },TaskCreationOptions.AttachedToParent);
});
outTask.Wait();
Console.WriteLine("Outer task completed.");

看到運行結果:

技術分享圖片

取消任務

我們通過cancellation的tokens來取消一個Task。在很多Task的Body裏面包含循環,我們可以在輪詢的時候判斷IsCancellationRequested屬性是否為True,如果是True的話,就可以停止循環以及釋放資源,同時拋出OperationCanceledException異常出來。來看一段示例代碼:

var cts = new CancellationTokenSource(); var ct =
 cts.Token;

var task = Task.Factory.StartNew(() =>
{
    for (var i = 0; i < 10000000; i++)
    {
        if (ct.IsCancellationRequested)
        {
            Console.WriteLine("任務開始取消...");
            throw new OperationCanceledException(ct);
        }
         
    }
},ct);
 
ct.Register(() =>
{
    Console.WriteLine("已經取消");
});
 
Thread.Sleep(5000);
cts.Cancel();

try
{
    task.Wait();
}
catch (AggregateException e)
{
    foreach (var v in e.InnerExceptions)
        Console.WriteLine("msg: " + v.Message);
}

等待時間執行

在TPL中我們可以通過三種方式進行等待,一是通過CancellTaken的WaitHanle進行等待、第二種則是通過傳統的Tread.Sleep方法、第三種則通過Thread.SpainWait方法。

1、CancellToken方式:每次我們等待十秒鐘之後,再進行下次輸出。

var cts = new CancellationTokenSource();
    var ct = cts.Token;
 
    var task = new Task(() =>
    {
        for (var i = 0; i < 100000; i++)
        {
            var cancelled = 
ct.WaitHandle.WaitOne(1000
);
            Console.WriteLine(" {0}. Cancelled? {1}", i, cancelled);
            if (cancelled)
            {
                throw new OperationCanceledException(ct);
            }
        }
    }, ct);
    task.Start();

技術分享圖片

2、上面的功能如果我們要是通過Tread.Sleep方式實現:

            var task = new Task(() =>
            {
                for (var i = 0; i < 100000; i++)
                {
                    Thread.Sleep(10000);
                    
var cancelled =
 ct.IsCancellationRequested;
                    Console.WriteLine(" {0}. Cancelled? {1}",
                                i, cancelled); if (cancelled)
                    {
                        throw new OperationCanceledException(ct);
                    }
                }
            },ct);

3、Thread.SpainWait則跟上面兩種方式完全不同,上面的兩種方式都是會在線程調度程序不考慮改線程,直等到運行結束。而Thread.SpainWait的作用實質上會將處理器置於十分緊密的循環中,主要的作用是來實現同步鎖的作用。並不常用,大部分情況下我們可以通過Lock的方式來實現。

等待任務執行

在很多時候我們也許需要等待同時開啟的幾個線程完成之後再來做其他事,在TPL中提供了幾種方式來等待任務執行。Task.Wait等待單個任務完成;Task.WaitAll等待所有的Task完成、TaskAny等在其中的任何一個或則多個任務完成。

1、Task.Wait:

共有5個重載:Wait()、Wait(CancellToken)、Wait(Int32)、Wait(TimeSpan)、Wait(TimeSpan、CancellToken)。各個重載方法的含義:

1)Wait():等待整個任務完成或者取消或者出現異常;
2)Wait(CancellToken):等待任務直到CancellToken調用取消或者完成,或者出現異常;
3)Wait(Int32):等待任務,未完成則到指定的時間;
4)Wait(TimeSpan):同上;
5)Wait(TimeSpan、CancellToken):等待任務到指定時間,或者CancellToken調用取消或者任務完成。

static void Main(string[] args)
{
    var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
        Task task = createTask(token,6);    task.Start();
    Console.WriteLine("Wait() complete.");
    task.Wait();
    Console.WriteLine("Task Completed.");
    
    task = createTask(token,3);
    task.Start();
    Console.WriteLine("Wait(2) secs for task to complete.");
    bool completed = task.Wait(2000);
    Console.WriteLine("Wait ended - task completed: {0}", completed);
    
    task = createTask(token,4);
    task.Start();
    Console.WriteLine("Wait(2,token) for task to complete.");
    completed = task.Wait(2000, token);
    Console.WriteLine("Wait ended - task completed: {0} task cancelled {1}",
    completed, task.IsCanceled);
    Console.WriteLine("Main method complete. Press enter to finish.");
    Console.ReadLine();
}
static Task createTask(CancellationToken token,int loop)
{
    return new Task(() =>
    {
        for (int i = 0; i < loop; i++)
        {
            token.ThrowIfCancellationRequested();
            Console.WriteLine("Task - Int value {0}", i);
            token.WaitHandle.WaitOne(1000);
        }
    }, token);
}

循環都會等待1秒鐘,這樣我們可以看看Wait(2000)的效果,看看運行後的效果:

技術分享圖片

2、Task.WaitAll方法

是等待所有的任務完成,也有5個重載, 也可以傳遞時間以及Token參數,進行等待時間以及取消Token的控制。

   var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    Task.WaitAll(task1, task2);
    Console.WriteLine("Tasks Completed.");

技術分享圖片

3、Task.WaitAny

等待任何一個任務完成,完成之後返回其完成的任務的Index:

var tokenSource = new CancellationTokenSource();
    CancellationToken token = tokenSource.Token;
    var task1 = createTask(token,2);
    var task2 = createTask(token, 5);
    task1.Start();
    task2.Start();
    Console.WriteLine("Waiting for tasks to complete.");
    var index = Task.WaitAny(task1, task2);
    Console.WriteLine("Tasks Completed.Index is {0}",index);

技術分享圖片

異常處理

在TPL中,異常的觸發器主要是這幾個:
Task.Wait(), Task.WaitAll(), Task,WaitAny(),Task.Result。而在TPL出現的異常都會以AggregateException的示例拋出,我們在進行基本的異常處理時,可以通過查看AggregateException的InnerExceptions來進行內部異常的捕獲:

var tokenSource = new CancellationTokenSource();
    var token = tokenSource.Token;
    var task1 = new Task(() =>
    {
        throw new NullReferenceException() { Source="task1"};
    });
    var task2 = new Task(() =>
    {
        throw new ArgumentNullException("a", "a para can not be null") { Source="task2"};
    });
    task1.Start(); task2.Start(); 
    try
    {
        Task.WaitAll(task1, task2);
    }
    catch(AggregateException ex)
    {
        foreach (Exception inner in ex.InnerExceptions)
        {
            Console.WriteLine("Exception type {0} from {1}",
            inner.GetType(), inner.Source);
        }
    }

同時,我們還可以通過Task的幾個屬性來判斷Task的狀態,如:IsCompleted, IsFaulted, IsCancelled,Exception。另外,AggregateException中還提供了Handle方法來給我們方法來給我們處理每個內部 異常,每個異常發生時都會調用Handle傳入的delegate ,同時我們需要通過返回True,False來告訴異常是否已經被處理,比如對於OperationCanceledException我們知道是取消了Task,是肯定可以處理的:

try
    {
        Task.WaitAll(task1, task2, task3, task4);
    }
    catch(AggregateException ex)
    {
        ex.Handle((e) =>
        {
            if (e is OperationCanceledException)
            {
                return true;
            }
            else
            {
                return false;
            }
        });
        
    }

二、並行編程 - Task任務