1. 程式人生 > >TPL Dataflow .Net 資料流元件,瞭解一下?

TPL Dataflow .Net 資料流元件,瞭解一下?

回顧上文

  作為單體程式,依賴的第三方服務雖不多,但是2C的程式還是有不少內容可講; 作為一個常規網際網路系統,無外乎就是接受請求、處理請求,輸出響應。

由於業務漸漸增長,資料處理的過程會越來越複雜和冗長,【連貫高效的處理資料】 越來越被看重,  .Net 提供了TPL  Dataflow元件使我們更高效的實現基於資料流和 流水線操作的程式碼。

    下圖是單體程式中 資料處理的用例圖。

 

 程式中用到的TPL Dataflow 元件,Dataflow是微軟前幾年給出的資料處理庫, 是由不同的處理塊組成,可將這些塊組裝成一個處理管道,"塊"對應處理管道中的"階段", 可類比AspNetCore 中Middleware 和pipeline.。

  • TPL Dataflow庫為訊息傳遞和並行化CPU密集型和I / O密集型應用程式提供了程式設計基礎,這些應用程式具有高吞吐量和低延遲。它還可以讓您明確控制資料的緩衝方式並在系統中移動。

  • 為了更好地理解資料流程式設計模型,請考慮從磁碟非同步載入影象並建立這些影象的應用程式。
    •   傳統的程式設計模型通常使用回撥和同步物件(如鎖)來協調任務和訪問共享資料, 從巨集觀看傳統模型: 任務是一步步緊接著完成的。

    •   通過使用資料流程式設計模型,您可以建立在從磁碟讀取影象時處理影象的資料流物件。在資料流模型下,您可以宣告資料在可用時的處理方式以及資料之間的依賴關係。由於執行時管理資料之間的依賴關係,因此通常可以避免同步訪問共享資料的要求。此外,由於執行時排程基於資料的非同步到達而工作,因此資料流可以通過有效地管理底層執行緒來提高響應性和吞吐量。    也就是說: 你定義的是任務內容和任務之間的依賴,不關注資料什麼時候流到這個任務 。

  •    需要注意的是:TPL Dataflow 非分散式資料流,訊息在程序內傳遞,   使用nuget引用 System.Threading.Tasks.Dataflow 包。

TPL Dataflow 核心概念

 1.  Buffer & Block

TPL Dataflow 內建的Block覆蓋了常見的應用場景,當然如果內建塊不能滿足你的要求,你也可以自定“塊”。

Block可以劃分為下面3類:

  • Buffering Only    【Buffer不是快取Cache的概念, 而是一個緩衝區的概念】

  • Execution

  • Grouping 

使用以上塊混搭處理管道, 大多數的塊都會執行一個操作,有些時候需要將訊息分發到不同Block,這時可使用特殊型別的緩衝塊給管道“”分叉”。

2. Execution Block

  可執行的塊有兩個核心元件:
  • 輸入、輸出訊息的緩衝區(一般稱為Input,Output佇列)

  • 在訊息上執行動作的委託

  訊息在輸入和輸出時能夠被緩衝:當Func委託的執行速度比輸入的訊息速度慢時,後續訊息將在到達時進行緩衝;當下一個塊的輸入緩衝區中沒有容量時,將在輸出時緩衝。

每個塊我們可以配置:

  • 緩衝區的總容量, 預設無上限

  • 執行操作委託的併發度, 預設情況下塊按照順序處理訊息,一次一個。

我們將塊連結在一起形成一個處理管道,生產者將訊息推向管道。

TPL Dataflow有一個基於pull的機制(使用Receive和TryReceive方法),但我們將在管道中使用塊連線和推送機制。

  • TransformBlock(Execution category)-- 由輸入輸出緩衝區和一個Func<TInput, TOutput>委託組成,消費的每個訊息,都會輸出另外一個,你可以使用這個Block去執行輸入訊息的轉換,或者轉發輸出的訊息到另外一個Block。

  • TransformManyBlock (Execution category) -- 由輸入輸出緩衝區和一個Func<TInput, IEnumerable<TOutput>>委託組成, 它為輸入的每個訊息輸出一個 IEnumerable<TOutput>

  • BroadcastBlock (Buffering category)-- 由只容納1個訊息的緩衝區和Func<T, T>委託組成。緩衝區被每個新傳入的訊息所覆蓋,委託僅僅為了讓你控制怎樣克隆這個訊息,不做訊息轉換。

            該塊可以連結到多個塊(管道的分叉),雖然它一次只緩衝一條訊息,但它一定會在該訊息被覆蓋之前將該訊息轉發到連結塊(連結塊還有緩衝區)。

  • ActionBlock (Execution category)-- 由緩衝區和Action<T>委託組成,他們一般是管道的結尾,他們不再給其他塊轉發訊息,他們只會處理輸入的訊息。

  • BatchBlock (Grouping category)-- 告訴它你想要的每個批處理的大小,它將累積訊息,直到它達到那個大小,然後將它作為一組訊息轉發到下一個塊。

  還有一下其他的Block型別:BufferBlock、WriteOnceBlock、JoinBlock、BatchedJoinBlock,我們暫時不會深入。

3. Pipeline Chain React

  當輸入緩衝區達到上限容量,為其供貨的上游塊的輸出緩衝區將開始填充,當輸出緩衝區已滿時,該塊必須暫停處理,直到緩衝區有空間,這意味著一個Block的處理瓶頸可能導致所有前面的塊的緩衝區被填滿。

  但是不是所有的塊變滿時,都會暫停,BroadcastBlock 有允許1個訊息的緩衝區,每個訊息都會被覆蓋, 因此如果這個廣播塊不能將訊息轉發到下游,則在下個訊息到達的時候訊息將丟失,這在某種意義上是一種限流(比較生硬).

程式設計實踐

    將按照上圖實現TPL Dataflow 

①  定義Dataflow  pipeline
        public EqidPairHandler(IHttpClientFactory httpClientFactory, RedisDatabase redisCache, IConfiguration con, LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _httpClient = httpClientFactory.CreateClient("bce-request");
            _redisDB0 = redisCache[0];
            _redisDB = redisCache;
            _logger = loggerFactory.CreateLogger(nameof(EqidPairHandler));
            var option = new DataflowLinkOptions { PropagateCompletion = true };

            publisher = _redisDB.RedisConnection.GetSubscriber();
            _eqid2ModelTransformBlock = new TransformBlock<EqidPair, EqidModel>
              (
                   // redis piublih 沒有做在TransformBlock fun裡面, 因為publih失敗可能影響後續的block傳遞
                   eqidPair => EqidResolverAsync(eqidPair),
                   new ExecutionDataflowBlockOptions
                   {
                       MaxDegreeOfParallelism = con.GetValue<int>("MaxDegreeOfParallelism")
                   }
              );
            // https://docs.microsoft.com/en-us/dotnet/standard/parallel-programming/walkthrough-creating-a-dataflow-pipeline
            _logBatchBlock = new LogBatchBlock<EqidModel>(logConfig, loggerFactory);
            _logPublishBlock = new ActionBlock<EqidModel>(x => PublishAsync(x) );

            _broadcastBlock = new BroadcastBlock<EqidModel>(x => x); // 由只容納一個訊息的快取區和拷貝函式組成
            _broadcastBlock.LinkTo(_logBatchBlock.InputBlock, option);
            _broadcastBlock.LinkTo(_logPublishBlock, option);
            _eqid2ModelTransformBlock.LinkTo(_broadcastBlock, option);
        }
public class LogBatchBlock<T> : ILogDestination<T> where T : IModelBase
    {
        private readonly string _dirPath;
        private readonly Timer _triggerBatchTimer;
        private readonly Timer _openFileTimer;
        private DateTime? _nextCheckpoint;
        private TextWriter _currentWriter;
        private readonly LogHead _logHead;
        private readonly object _syncRoot = new object();
        private readonly ILogger _logger;
        private readonly BatchBlock<T> _packer;
        private readonly ActionBlock<T[]> batchWriterBlock;
        private readonly TimeSpan _logFileIntervalTimeSpan;

        /// <summary>
        /// Generate  request log file.
        /// </summary>
        public LogBatchBlock(LogConfig logConfig, ILoggerFactory loggerFactory)
        {
            _logger = loggerFactory.CreateLogger<LogBatchBlock<T>>();

            _dirPath = logConfig.DirPath;
            if (!Directory.Exists(_dirPath))
            {
                Directory.CreateDirectory(_dirPath);
            }
            _logHead = logConfig.LogHead;

            _packer = new BatchBlock<T>(logConfig.BatchSize);
            batchWriterBlock = new ActionBlock<T[]>(models => WriteToFile(models));     
            _packer.LinkTo(batchWriterBlock, new DataflowLinkOptions { PropagateCompletion = true });

            _triggerBatchTimer = new Timer(state =>
            {
                _packer.TriggerBatch();
            }, null, TimeSpan.Zero, TimeSpan.FromSeconds(logConfig.Period));

            _logFileIntervalTimeSpan = TimeSpan.Parse(logConfig.LogFileInterval);
            _openFileTimer = new Timer(state =>
            {
                AlignCurrentFileTo(DateTime.Now);
            }, null, TimeSpan.Zero, _logFileIntervalTimeSpan);
        }

        public ITargetBlock<T> InputBlock => _packer;

        private void AlignCurrentFileTo(DateTime dt)
        {
            if (!_nextCheckpoint.HasValue)
            {
                OpenFile(dt);
            }
            if (dt >= _nextCheckpoint.Value)
            {
                CloseFile();
                OpenFile(dt);
            }
        }

        private void OpenFile(DateTime now, string fileSuffix = null)
        {
            string filePath = null;
            try
            {
                var currentHour = now.Date.AddHours(now.Hour);
                _nextCheckpoint = currentHour.Add(_logFileIntervalTimeSpan);
                int hourConfiguration = _logFileIntervalTimeSpan.Hours;
                int minuteConfiguration = _logFileIntervalTimeSpan.Minutes;
                filePath = $"{_dirPath}/u_ex{now.ToString("yyMMddHH")}{fileSuffix}.log";

                var appendHead = !File.Exists(filePath);
                if (filePath != null)
                {
                    var stream = new FileStream(filePath, FileMode.Append, FileAccess.Write);
                    var sw = new StreamWriter(stream, Encoding.Default);
                    if (appendHead)
                    {
                        sw.Write(GenerateHead());
                    }
                    _currentWriter = sw;
                    _logger.LogDebug($"{currentHour} TextWriter has been created.");
                }

            }
            catch (UnauthorizedAccessException ex)
            {
                _logger.LogWarning("I/O error or specific type of scecurity error,{0}", ex);
                throw;
            }
            catch (Exception e)
            {
                if (fileSuffix == null)
                {
                    _logger.LogWarning($"OpenFile failed:{e.StackTrace.ToString()}:{e.Message}.", e.StackTrace);
                    OpenFile(now, $"-{Guid.NewGuid()}");
                }
                else
                {
                    _logger.LogError($"OpenFile failed after retry: {filePath}", e);
                    throw;
                }
            }
        }

        private void CloseFile()
        {
            if (_currentWriter != null)
            {
                _currentWriter.Flush();
                _currentWriter.Dispose();
                _currentWriter = null;
                _logger.LogDebug($"{DateTime.Now} TextWriter has been disposed.");
            }
            _nextCheckpoint = null;
        }

        private string GenerateHead()
        {
            StringBuilder head = new StringBuilder();
            head.AppendLine("#Software: " + _logHead.Software)
                .AppendLine("#Version: " + _logHead.Version)
                .AppendLine($"#Date: {DateTime.UtcNow.ToString("yyyy-MM-dd HH:mm:ss")}")
                .AppendLine("#Fields: " + _logHead.Fields);
            return head.ToString();
        }

        private void WriteToFile(T[] models)
        {
            try
            {
                lock (_syncRoot)
                {
                    var flag = false;
                    foreach (var model in models)
                    {
                        if (model == null)
                            continue;
                        flag = true;
                        AlignCurrentFileTo(model.ServerLocalTime);
                        _currentWriter.WriteLine(model.ToString());
                    }
                    if (flag)
                        _currentWriter.Flush();
                }
            }
            catch (Exception ex)
            {
                _logger.LogError("WriteToFile Error : {0}", ex.Message);
            }
        }

        public bool AcceptLogModel(T model)
        {
            return _packer.Post(model);
        }

        public string GetDirPath()
        {
            return _dirPath;
        }

        public async Task CompleteAsync()
        {
            _triggerBatchTimer.Dispose();
            _openFileTimer.Dispose();
            _packer.TriggerBatch();
            _packer.Complete();
            await InputBlock.Completion;
            lock (_syncRoot)
            {
                CloseFile();
            }
        }
    }
仿IIS日誌儲存程式碼

② 異常處理

  上述程式在部署時就遇到相關的坑位,在測試環境_eqid2ModelTransformBlock 內Func委託穩定執行,程式並未出現異樣;

  部署到生產之後, 該Pipeline會執行一段時間就停止工作,一直很困惑, 後來通過監測_eqid2ModelTransformBlock.Completion 屬性,該塊提前進入“完成態”   :   程式在執行某次Func委託時報錯,Block提前進入完成態

TransfomrBlock.Completion 一個Task物件,當TPL Dataflow不再處理訊息並且能保證不再處理訊息的時候,就被定義為完成態, Task物件的TaskStatus列舉值將標記此Block進入完成態的真實原因

- TaskStatus.RanToCompletion       根據Block定義的任務成功完成

- TaskStatus.Fault                            因為未處理的異常 導致"過早的完成"

- TaskStatus.Cancled                       因為取消操作 導致 "過早的完成"

  我們需要小心處理異常, 一般情況下我們使用try、catch包含所有的執行程式碼以確保所有的異常都被處理。

 

    可將TPL Dataflow 做為程序內訊息佇列,本文只是一個入門參考,更多複雜用法還是看官網, 你需要記住的是, 這是一個.Net 程序內資料流元件, 能讓你專注於流程。

 

作者:JulianHuang

感謝您的認真閱讀,如有問題請大膽斧正;覺得有用,請下方或加關注。

本文歡迎轉載,但請保留此段宣告,且在文章頁面明顯位置註明本文的作者及原文連結。