C# 多執行緒+佇列處理大批量資料,進而縮短處理時間
public void DealData(){
int pageSize = 200;
//建立佇列
var queue = new MessageQueueManager<Model>();queue.Setup();
//開啟生產者執行緒
var producerManager = new TaskManager();
producerManager.Setup(5);
producerManager.Start((index) =>
{
var pageIndex = Convert.ToInt32(index) + 1;
while (true)
{
try
{
var modelList = GetModelList(pageIndex,pageSize);
if (modelList == null)
{
break;
}
//迴圈學習統計加入消費者佇列
foreach (var model in modelList )
{
//自旋
while (!queue.IsCanEnqueue)
{
Thread.Sleep(5 * 1000);
}
queue.Enqueue(model);
}
if (modelList.Count < pageSize)
{
break;
}
pageIndex += taskCount;
}
catch (Exception ex)
{
pageIndex += taskCount;
if (pageIndex > 2000)
{
break;
}
continue;
}
}
});
//消費者執行緒
var consumerManager = new TaskManager();
//這裡設定一個執行緒進行處理資料
consumerManager.Setup(1);
consumerManager.Start(() =>
{
//從佇列中取出資料進行處理
queue.Dequeue((info) =>
{
try
{
//處理資料
}
catch (Exception ex)
{
}
});
});
producerManager.Wait();
queue.Cancel();
consumerManager.Wait();
}
/// <summary>
/// 訊息器
/// </summary>
public class MessageQueueManager<T>
{
private readonly BlockingCollection<T> _messageCollection = new BlockingCollection<T>(10000);
private CancellationTokenSource _cancellationTokenSource;
private CancellationToken _cancellationToken;
public void Enqueue(T message)
{
if (_messageCollection.Count < _messageCollection.BoundedCapacity)
_messageCollection.Add(message);
if (_cancellationToken.IsCancellationRequested && !_messageCollection.IsCompleted)
_messageCollection.CompleteAdding();
}
public void Dequeue(Action<T> writeAction)
{
foreach (var message in _messageCollection.GetConsumingEnumerable())
{
writeAction(message);
Thread.Sleep(10);
}
}
public void Setup()
{
_cancellationTokenSource = new CancellationTokenSource();
_cancellationToken = _cancellationTokenSource.Token;
}
public void Cancel()
{
if (!_cancellationTokenSource.IsCancellationRequested)
_cancellationTokenSource.Cancel(false);
if (!_messageCollection.IsCompleted) _messageCollection.CompleteAdding();
}
public bool IsCanEnqueue
{
get
{
return _messageCollection.BoundedCapacity > _messageCollection.Count;
}
}
}
//執行緒管理器
public class TaskManager
{
public List<Task> _taskList;
public int _total;
public void Setup(int count)
{
_total = count;
_taskList = new List<Task>(count);
}
public void Start(Action doing)
{
for (int i = 0; i < _total; i++)
{
var task = Task.Factory.StartNew(doing);
_taskList.Add(task);
}
}
public void Start(Action<int> doing)
{
for (int i = 0; i < _total; i++)
{
var task = Task.Factory.StartNew((index) => doing((int)index), i);
_taskList.Add(task);
}
}
public void Wait()
{
Task.WaitAll(_taskList.ToArray());
}
}