1. 程式人生 > >轉載 三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等

轉載 三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等

而且 了解 字段 width i++ 類繼承 play int 安全

隨筆 - 353, 文章 - 1, 評論 - 5, 引用 - 0

三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等

目錄

  • 一、隔離執行:不共享數據,讓每個task都有一份自己的數據拷貝。
    • 1、傳統方式
    • 2、ThreadLocal類
  • 二、同步類型:通過調整task的執行,有序的執行task。
    • 常用的同步類型
    • 1、Lock鎖
    • 2、Interlocked 聯鎖
    • 3、Mutex互斥體
  • 三、申明性同步
  • 四、並發集合
  • 五、Barrier(屏障同步)


在並行計算中,不可避免的會碰到多個任務共享變量,實例,集合。雖然task自帶了兩個方法:task.ContinueWith()和Task.Factory.ContinueWhenAll()來實現任務串行化,但是這些簡單的方法遠遠不能滿足我們實際的開發需要,從.net 4.0開始,類庫給我們提供了很多的類來幫助我們簡化並行計算中復雜的數據同步問題。

回到頂部

一、隔離執行:不共享數據,讓每個task都有一份自己的數據拷貝。

對數據共享問題處理的方式是“分離執行”,我們通過把每個Task執行完成後的各自計算的值進行最後的匯總,也就是說多個Task之間不存在數據共享了,各自做各自的事,完全分離開來。

1、傳統方式

每個Task執行時不存在數據共享了,每個Task中計算自己值,最後我們匯總每個Task的Result。我們可以通過Task中傳遞的state參數來進行隔離執行:

技術分享圖片
    int
Sum = 0
;
    Task<int>[] tasks = new Task<int>[10];
    for (int i = 0; i < 10; i++)
    {
        tasks[i] = new Task<int>((obj) =>
        {
            var start = (int)obj;
            for (int j = 0; j < 1000; j++)
            {
                start = start + 1;
            }
            return start;
        }, Sum);
        tasks[i].Start();
    }
    Task.WaitAll(tasks);
    for (var i = 0; i < 10; i++)
    {
        Sum += tasks[i].Result;
    }
    Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
技術分享圖片

2、ThreadLocal類

在.Net中提供了System.Threading.ThreadLocal來創建分離。

ThreadLocal是一種提供線程本地存儲的類型,它可以給每個線程一個分離的實例,來提供每個線程單獨的數據結果。上面的程序我們可以使用TreadLocal:

技術分享圖片
    int Sum = 0;
    Task<int>[] tasks = new Task<int>[10];
    
var tl = new ThreadLocal<int>
();
    for (int i = 0; i < 10; i++)
    {
        tasks[i] = new Task<int>((obj) =>
        {
            tl.Value = (int)obj;
            for (int j = 0; j < 1000; j++)
            {
                tl.Value++;
            }
            returntl.Value;
        }, Sum);
        tasks[i].Start();
    }
    Task.WaitAll(tasks);
    for (var i = 0; i < 10; i++)
    {
        Sum += tasks[i].Result;
    }
    Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
技術分享圖片

但是我們要註意的一點TreadLocal是針對每個線程的,不是針對每個Task的。一個Tread中可能有多個Task。

ThreadLocal類舉例:

技術分享圖片
static ThreadLocal<string> local; 
static void Main() 
{ 
 //創建ThreadLocal並提供默認值 
local = new ThreadLocal<string>(() => "hehe"); 

 //修改TLS的線程 
Thread th = new Thread(() => 
 { 
     local.Value = "Mgen"; 
     Display(); 
 }); 

 th.Start(); 
 th.Join(); 
 Display(); 
} 

//顯示TLS中數據值 
static void Display() 
{ 
 Console.WriteLine("{0} {1}", Thread.CurrentThread.ManagedThreadId, local.Value); 
}
技術分享圖片 回到頂部

二、同步類型:通過調整task的執行,有序的執行task

同步類型是一種用來調度Task訪問臨界區域的一種特殊類型。在.Net 4.0中提供了多種同步類型給我們使用,主要分為:輕量級的、重量級的和等待處理型的,在下面我們會介紹常用的同步處理類型。

常用的同步類型

首先來看看.Net 4.0中常見的幾種同步類型以及處理的相關問題:

同步類型以及解決問題

  • lock關鍵字、Montor類、SpinLock類:有序訪問臨界區域
  • Interlocked類:數值類型的增加或則減少
  • Mutex類:交叉同步
  • WaitAll方法:同步多個鎖定(主要是Task之間的調度)
  • 申明性的同步(如Synchronization):使類中的所有的方法同步

1、Lock鎖

其實最簡單同步類型的使用辦法就是使用lock關鍵字。在使用lock關鍵字時,首先我們需要創建一個鎖定的object,而且這個object需要所有的task都能訪問,其次能我們需要將我們的臨界區域包含在lock塊中。我們之前例子中代碼可以這樣加上lock:

技術分享圖片
            int Sum = 0; 
            Task[] tasks = new Task[10]; 
     
var obj = new Object();
for (int i = 0; i < 10; i++)
            {
                tasks[i] = new Task(() =>
                {
                    for (int j = 0; j < 1000; j++)
                    {
                        lock (obj)
                        { Sum = Sum + 1; }
                    }
                });
                tasks[i].Start();
            }
            Task.WaitAll(tasks);
            Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
技術分享圖片

其實lock關鍵字是使用Monitor的一種簡短的方式,lock關鍵字自動通過調用Monitor.Enter\Monitor.Exit方法來處理獲得鎖以及釋放鎖。

2、Interlocked 聯鎖

Interlocked通過使用操作系統或則硬件的一些特性提供了一些列高效的靜態的同步方法。其中主要提供了這些方法:Exchange、Add、Increment、CompareExchange四種類型的多個方法的重載。我們將上面的例子中使用Interlocked:

技術分享圖片
            int Sum = 0; 
            Task[] tasks = new Task[10]; 
            for (int i = 0; i < 10; i++)
            {
                tasks[i] = new Task(() =>
                {
                    for (int j = 0; j < 1000; j++)
                    {
                        Interlocked.Increment(ref Sum);
                    }
                }); 
             tasks[i].Start();
            } 
            Task.WaitAll(tasks);
            Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
技術分享圖片

3、Mutex互斥體

Mutex也是一個同步類型,在多個線程進行訪問的時候,它只向一個線程授權共享數據的獨立訪問。我們可以通過Mutex中的WaitOne方法來獲取Mutex的所有權,但是同時我們要註意的是,我們在一個線程中多少次調用過WaitOne方法,就需要調用多少次ReleaseMutex方法來釋放Mutex的占有。上面的例子我們通過Mutex這樣實現:

技術分享圖片
int Sum = 0; 
Task[] tasks = new Task[10]; 
var mutex = new
 Mutex();
for (int i = 0; i < 10; i++)
{
    tasks[i] = new Task(() =>
    {
        for (int j = 0; j < 1000; j++)
        {
            bool lockAcquired = 
mutex.WaitOne();
            try
            {
                Sum++;
            }
            finally
            {
                if (lockAcquired) mutex.ReleaseMutex();
            }
        }
    });
    tasks[i].Start();
} 
Task.WaitAll(tasks);
Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, Sum);
技術分享圖片 回到頂部

三、申明性同步

我們可以通過使用Synchronization 特性來標識一個類,從而使一個類型的字段以及方法都實現同步化。在使用Synchronization 時,我們需要將我們的目標同步的類繼承於System.ContextBoundObject類型。我們來看看之前的例子我們同步標識Synchronization 的實現:

技術分享圖片
[Synchronization]
class SumClass : ContextBoundObject
{
    private int _Sum;

    public void Increment()
    {
        _Sum++;
    }

    public int GetSum()
    {
        return _Sum;
    }
}
class Program
{
    static void Main(string[] args)
    {
        var sum =
new
 SumClass();
        Task[] tasks = new Task[10];
        for (int i = 0; i < 10; i++)
        {
            tasks[i] = new Task(() =>
            {
                for (int j = 0; j < 1000; j++)
                {
                    sum.Increment();
                }
            });
            tasks[i].Start();
        }
        Task.WaitAll(tasks);
        Console.WriteLine("Expected value {0}, Parallel value: {1}", 10000, sum.GetSum());
 
    }
}
技術分享圖片 回到頂部

四、並發集合

當多個線程對某個非線程安全容器並發地進行讀寫操作時,這些操作將導致不可預估的後果或者會導致報錯。為了解決這個問題我們可以使用lock關鍵字或者Monitor類來給容器上鎖。但鎖的引入使得我們的代碼更加復雜,同時也帶來了更多的同步消耗。而.NET Framework 4提供的線程安全且可拓展的並發集合能夠使得我們的並行代碼更加容易編寫,此外,鎖的使用次數的減少也減少了麻煩的死鎖與競爭條件的問題。.NET Framework 4主要提供了如下幾種並發集合:BlockingCollection,ConcurrentBag,ConcurrentDictionary,ConcurrentQueue,ConcurrentStack。這些集合通過使用一種叫做比較並交換(compare and swap, CAS)指令和內存屏障的技術來避免使用重量級的鎖。

在.Net 4.0中提供了很多並發的集合類型來讓我們處理數據同步的集合的問題,這裏面包括:

1.ConcurrentQueue:提供並發安全的隊列集合,以先進先出的方式進行操作;
2.ConcurrentStack:提供並發安全的堆棧集合,以先進後出的方式進行操作;
3.ConcurrentBag:提供並發安全的一種無序集合;
4.ConcurrentDictionary:提供並發安全的一種key-value類型的集合。

我們在這裏只做ConcurrentQueue的一個嘗試,並發隊列是一種線程安全的隊列集合,我們可以通過Enqueue()進行排隊、TryDequeue()進行出隊列操作:

技術分享圖片
for (var j = 0; j < 10; j++)
    {
        var queue = new
ConcurrentQueue<int>
();
        var count = 0;
        for (var i = 0; i < 1000; i++)
        {
            queue.Enqueue(i);
        }
        var tasks = new Task[10];
        for (var i = 0; i < tasks.Length; i++)
        {
            tasks[i] = new Task(() =>
            {
                while (queue.Count > 0)
                {
                    int item;
                    var isDequeue = queue.
TryDequeue
(out item);
                    if(isDequeue) Interlocked.Increment(ref count);
                }
            });
            tasks[i].Start();
        }
        try
        {
            Task.WaitAll(tasks);
        }
        catch (AggregateException e)
        {
            e.Handle((ex) =>
            {
                Console.WriteLine("Exception Message:{0}",ex.Message);
                return true;
            });
        }
        Console.WriteLine("Dequeue items count :{0}", count);
    }
技術分享圖片 回到頂部

五、Barrier(屏障同步)

barrier叫做屏障,就像下圖中的“紅色線”,如果我們的屏障設為4個task就認為已經滿了的話,那麽執行中先到的task必須等待後到的task,通知方式也就是barrier.SignalAndWait(),屏障中線程設置操作為new Barrier(4,(i)=>{})。SignalAndWait給我們提供了超時的重載,為了能夠取消後續執行

技術分享圖片

技術分享圖片
//四個task執行
  static Task[] tasks = new Task[4];

  static 
Barrier
 barrier = null;

  static void Main(string[] args)
  {
      barrier 
= new Barrier(tasks.Length, (i)
 =>
      {
          Console.WriteLine("**********************************************************");
          Console.WriteLine("\n屏障中當前階段編號:{0}\n", i.CurrentPhaseNumber);
          Console.WriteLine("**********************************************************");
      });

      for (int j = 0; j < tasks.Length; j++)
      {
          tasks[j] = Task.Factory.StartNew((obj) =>
          {
              var single = Convert.ToInt32(obj);

              LoadUser(single);
              barrier.SignalAndWait();

              LoadProduct(single);
              barrier.SignalAndWait();

              LoadOrder(single);
              barrier.SignalAndWait();
          }, j);
      }

      Task.WaitAll(tasks);

      Console.WriteLine("指定數據庫中所有數據已經加載完畢!");

      Console.Read();
  }

  static void LoadUser(int num)
  {
      Console.WriteLine("當前任務:{0}正在加載User部分數據!", num);
  }

  static void LoadProduct(int num)
  {
      Console.WriteLine("當前任務:{0}正在加載Product部分數據!", num);
  }

  static void LoadOrder(int num)
  {
      Console.WriteLine("當前任務:{0}正在加載Order部分數據!", num);
  }
技術分享圖片

技術分享圖片

轉載 三、並行編程 - Task同步機制。TreadLocal類、Lock、Interlocked、Synchronization、ConcurrentQueue以及Barrier等