1. 程式人生 > >併發系列64章(TPL 資料流)第七章

併發系列64章(TPL 資料流)第七章

前言

什麼是TPL?全稱:transmission control protocol

傳輸層對應於OSI七層參考模型的傳輸層,它提供兩種端到端的通訊服務。

然後思維方式回到為什麼有這個TPL 資料流上。

TPL 資料流庫向具有高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操作的應用程式的並行化和訊息傳遞提供了基礎。 它還能顯式控制快取資料的方式以及在系統中移動的方式。 

為了更好地瞭解資料流程式設計模型,請考慮一個以非同步方式從磁碟載入影象並建立複合影象的應用程式。 

傳統程式設計模型通常需要使用回撥和同步物件(例如鎖)來協調任務和訪問共享資料。 

通過使用資料流程式設計模型,您可以從磁碟讀取時建立處理影象的資料流物件。 

在資料流模型下,您可以聲明當資料可用時的處理方式,以及資料之間的所有依賴項。 由於執行時管理資料之間的依賴項,因此通常可以避免這種要求來同步訪問共享資料。

此外,因為執行時計劃基於資料的非同步到達,所以資料流可以通過有效管理基礎執行緒提高響應能力和吞吐量。

分析一下,這段話。

TPL 資料流庫向具有高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操作的應用程式的並行化和訊息傳遞提供了基礎。

解決一個問題就是:

高吞吐量和低滯後時間的佔用大量 CPU 和 I/O 操作的應用程式。

如何解決的:

應用程式的並行化和訊息傳遞提供了基礎。通過並行解決的。

例子:

非同步方式從磁碟載入影象並建立複合影象的應用程式

遇到的問題:

協調任務和訪問共享資料 需要 回撥和同步。

就是說共享資料的時候,需要同步。

總結問題:共享資料代價大。

如果解決的:

由於執行時管理資料之間的依賴項,因此通常可以避免這種要求來同步訪問共享資料。

總結:解決了依賴,那麼不需要同步了。

綜上所述:TPL 資料流庫的作用在於解決資料之間的依賴,避免同步訪問共享資料。

正文

連結資料流塊

var multiplyBlock = new TransformBlock<int, int>(item=>item*2);
var subtractBlock = new TransformBlock<int, int>(item=> { Console.WriteLine(item);  return item - 2; });
multiplyBlock.LinkTo(subtractBlock);
multiplyBlock.Post(10);
Console.ReadKey();

打印出來就是20了。

傳遞出錯資訊

需要處理資料流網格中發生的錯誤

傳遞出錯資訊

如果資料流塊的委託丟擲異常,這個塊就是故障塊。一但資料流進入了故障狀態,就會刪除所有資料(停止接收新的資料)。

什麼意思呢?

static async void datalfow()
{
	var multiplyBlock = new TransformBlock<int, int>(item =>
	{
		if (item == 1)
		{
			throw new InvalidOperationException("not good");
		}
		return item * 2;
	}
	);
	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item); return item - 2; });
	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
	try
	{
		multiplyBlock.Post(10);
		multiplyBlock.Post(1);
		multiplyBlock.Post(20);
		await subtractBlock.Completion;
	}
	catch(AggregateException e)
	{
		Console.WriteLine(e);
	}
}

結果是:


有沒有發現multiplyBlock.Post(20);,沒有執行?

因為一但一個有錯誤,那麼就會終止,並銷燬資料。

這裡和上面不同的是,new DataflowLinkOptions { PropagateCompletion = true}。

多個這個東西,那麼這個有啥用呢?

因為我們連結塊的時候,這個庫不會幫助我們傳遞塊執行的狀態,如果不傳遞的話,下一個塊是不曉得上一個塊到底啥情況,這樣不利於我們捕獲異常。

而這種傳遞做法,我們只要在最後的處理模組,統一處理錯誤就可以。

斷開連結

這個我從來就沒有遇到過。是這樣子的,適用一種這樣的場景。

比如說有一個數據塊需要動態替換,需要斷開現有的模組然後接上新的資料塊。

static async void datalfow()
{
	var multiplyBlock = new TransformBlock<int, int>(item =>
	{
		if (item == 1)
		{
			throw new InvalidOperationException("not good");
		}
		Console.WriteLine("item:" + item);
		return item * 2;
	}
	);
	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
	var link=multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
	try
	{
		for (int i = 0; i < 20; i++)
		{
			multiplyBlock.Post(2);
			if (i==10)
			{
				await Task.Delay(1000);
				link.Dispose();
				multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
			}
		}
		await subtractBlock.Completion;
	}
	catch(AggregateException e)
	{
		Console.WriteLine(e);
	}
}

結果是:

值得注意的是,我這裡了一個:

await Task.Delay(1000);

這是模擬動態執行的時候,因為當我post結束的時候,資料塊連結還沒開始傳遞。

注:

除非保證連結是空閒的情況下,否則在斷開資料塊的連結時候會出現競爭。

競爭的是先斷開還是先傳遞。

但是這種競爭是安全的,他會保證要不斷開,要不傳遞帶下一個資料塊。

限制流量

前面我們都是線性連結,就是一條路走到黑。但是呢,有時候出現分叉的時候,那麼該如何均衡呢?

之所以考慮均衡,是因為比如傳遞到下一個資料塊的時候,是會有快取的。如果有條分叉,一條分叉無限去快取,那另外一條可能吃不上飯了。

static async void datalfow()
{
	var multiplyBlock = new TransformBlock<int, int>(item =>
	{
		if (item == 1)
		{
			throw new InvalidOperationException("not good");
		}
		Console.WriteLine("item:" + item);
		return item * 2;
	}
	);
	var subtractBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item-2); return item - 2; });
	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; });
	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
	try
	{
		for (int i = 0; i < 100; i++)
		{
			multiplyBlock.Post(2);
		}
		await subtractBlock.Completion;
	}
	catch(AggregateException e)
	{
		Console.WriteLine(e);
	}
}

這種就屬於沒吃上飯的情況。

static async void datalfow()
{
	var multiplyBlock = new TransformBlock<int, int>(item =>
	{
		if (item == 1)
		{
			throw new InvalidOperationException("not good");
		}
		Console.WriteLine("item:" + item);
		return item * 2;
	}
	);
	var options = new DataflowBlockOptions {BoundedCapacity=1 };
	var subtractBlock = new TransformBlock<int, int>(item => {
		return item - 2;
	}, options);
	var appendBlock = new TransformBlock<int, int>(item => { Console.WriteLine(item+2); return item + 2; }, options);
	multiplyBlock.LinkTo(subtractBlock, new DataflowLinkOptions { PropagateCompletion = true});
	multiplyBlock.LinkTo(appendBlock, new DataflowLinkOptions { PropagateCompletion = true });
	try
	{
		for (int i = 0; i < 100; i++)
		{
			multiplyBlock.Post(2);
		}
		await subtractBlock.Completion;
	}
	catch(AggregateException e)
	{
		Console.WriteLine(e);
	}
}

限制快取為1,那麼這時候我們就會相互切換。

下一章

整理:

1.資料流塊的並行處理

2.建立自定義資料流塊

參考

https://www.cnblogs.com/yswenli/p/8042594.h