1. 程式人生 > >C# 多執行緒+佇列處理大批量資料,進而縮短處理時間

C# 多執行緒+佇列處理大批量資料,進而縮短處理時間

public void DealData(){

               int pageSize = 200;

//建立佇列

                        var queue = new MessageQueueManager<Model>();

                        queue.Setup();
                        //開啟生產者執行緒
                        var producerManager = new TaskManager();
                        producerManager.Setup(5);
                        producerManager.Start((index) =>
                        {
                            var pageIndex = Convert.ToInt32(index) + 1;
                            while (true)
                            {
                                try
                                {
                                    var modelList = GetModelList(pageIndex,pageSize);

                                    if (modelList == null)
                                    {
                                        break;
                                    }

                                    //迴圈學習統計加入消費者佇列
                                    foreach (var model in modelList )
                                    {
                                        //自旋
                                        while (!queue.IsCanEnqueue)
                                        {
                                            Thread.Sleep(5 * 1000);
                                        }
                                        queue.Enqueue(model);
                                    }
                                    if (modelList.Count < pageSize)
                                    {
                                        break;
                                    }
                                    pageIndex += taskCount;
                                }
                                catch (Exception ex)
                                {
                                    pageIndex += taskCount;
                                    if (pageIndex > 2000)
                                    {
                                        break;
                                    }
                                    continue;
                                }
                            }
                        });


                        //消費者執行緒
                        var consumerManager = new TaskManager();
                        //這裡設定一個執行緒進行處理資料
                        consumerManager.Setup(1);
                        consumerManager.Start(() =>
                        {
                            //從佇列中取出資料進行處理
                            queue.Dequeue((info) =>
                            {
                                try
                                {

   //處理資料

                                   // AddOrUpdate( info);
                                }
                                catch (Exception ex)
                                {
                                    
                                }
                            });
                        });


                        producerManager.Wait();
                        queue.Cancel();

                        consumerManager.Wait();

}

/// <summary>
    /// 訊息器
    /// </summary>
    public class MessageQueueManager<T>
    {
        private readonly BlockingCollection<T> _messageCollection = new BlockingCollection<T>(10000);


        private CancellationTokenSource _cancellationTokenSource;
        private CancellationToken _cancellationToken;


        public void Enqueue(T message)
        {
            if (_messageCollection.Count < _messageCollection.BoundedCapacity)
                _messageCollection.Add(message);
            if (_cancellationToken.IsCancellationRequested && !_messageCollection.IsCompleted)
                _messageCollection.CompleteAdding();
        }


        public void Dequeue(Action<T> writeAction)
        {
            foreach (var message in _messageCollection.GetConsumingEnumerable())
            {
                writeAction(message);
                Thread.Sleep(10);
            }
        }


        public void Setup()
        {
            _cancellationTokenSource = new CancellationTokenSource();
            _cancellationToken = _cancellationTokenSource.Token;
        }


        public void Cancel()
        {
            if (!_cancellationTokenSource.IsCancellationRequested)
                _cancellationTokenSource.Cancel(false);
            if (!_messageCollection.IsCompleted) _messageCollection.CompleteAdding();
        }


        public bool IsCanEnqueue
        {
            get
            {
                return _messageCollection.BoundedCapacity > _messageCollection.Count;
            }
        }
    }

//執行緒管理器

 public class TaskManager
    {
        public List<Task> _taskList;
        public int _total;


        public void Setup(int count)
        {
            _total = count;
            _taskList = new List<Task>(count);
        }


        public void Start(Action doing)
        {
            for (int i = 0; i < _total; i++)
            {
                var task = Task.Factory.StartNew(doing);
                _taskList.Add(task);
            }
        }


        public void Start(Action<int> doing)
        {
            for (int i = 0; i < _total; i++)
            {
                var task = Task.Factory.StartNew((index) => doing((int)index), i);
                _taskList.Add(task);
            }
        }


        public void Wait()
        {
            Task.WaitAll(_taskList.ToArray());
        }
    }