利用雙緩沖隊列來減少鎖的競爭
在日常的開發中,日誌的記錄是必不可少的。但是我們也清楚對同一個文本進行寫日誌只能單線程的去寫,那麽我們也經常會使用簡單lock鎖來保證只有一個線程來寫入日誌信息。但是在多線程的去寫日誌信息的時候,由於記錄日誌信息是需要進行I/O交互的,導致我們占用鎖的時間會加長,從而導致大量線程的阻塞與等待。
這種場景下我們就會去思考,我們該怎麽做才能保證當有多個線程來寫日誌的時候我們能夠在不利用鎖的情況下讓他們依次排隊去寫呢?這個時候我們就可以考慮下使用雙緩沖隊列來完成。
所謂雙緩沖隊列就是有兩個隊列,一個是用來專門負責數據的寫入,另一個是專門負責數據的讀取,當邏輯線程讀取完數據後負責將自己的隊列與I/O線程的隊列進行交換。
我們該怎麽利用這雙緩沖隊列來完成我們想要的效果呢?
當有多個線程來寫日誌的時候,這個時候我們要這些要寫的信息先放到我們負責寫入的隊列當中,然後將邏輯讀的線程設為非阻塞。此時邏輯讀的線程就可以開始工作了。(一開始時邏輯讀的隊列是空的)在當邏輯讀的線程讀取他自己隊列的數據(並執行一些邏輯)之後,將邏輯讀的隊列的引用和負責寫入的隊列進行引用交換。這就是簡單的一個雙緩沖隊列實現的一個思路。具體實現代碼如下:
public class User { public string Mobile { get; set; } public string Pwd { get; set; } public override string ToString() { return $"{Mobile},{Pwd}"; } }
public class DoubleQueue { private ConcurrentQueue<User> _writeQueue; private ConcurrentQueue<User> _readQueue; private volatile ConcurrentQueue<User> _currentQueue; private AutoResetEvent _dataEvent; private ManualResetEvent _finishedEvent; private ManualResetEvent _producerEvent; public DoubleQueue() { _writeQueue = new ConcurrentQueue<User>(); _readQueue = new ConcurrentQueue<User>(); _currentQueue = _writeQueue; _dataEvent = new AutoResetEvent(false); _finishedEvent = new ManualResetEvent(true); _producerEvent = new ManualResetEvent(true); Task.Factory.StartNew(() => ConsumerQueue(), TaskCreationOptions.None); } public void ProducerFunc(User user) { _producerEvent.WaitOne(); _finishedEvent.Reset(); _currentQueue.Enqueue(user); _dataEvent.Set(); _finishedEvent.Set(); } public void ConsumerQueue() { ConcurrentQueue<User> consumerQueue; User user; int allcount = 0; Stopwatch watch = Stopwatch.StartNew(); while (true) { _dataEvent.WaitOne(); if (_currentQueue.Count > 0) { _producerEvent.Reset(); _finishedEvent.WaitOne(); consumerQueue = _currentQueue; _currentQueue = (_currentQueue == _writeQueue) ? _readQueue : _writeQueue; _producerEvent.Set(); while (consumerQueue.Count > 0) { if (consumerQueue.TryDequeue(out user)) { FluentConsole.White.Background.Red.Line(user.ToString()); allcount++; } FluentConsole.White.Background.Red.Line($"當前個數{allcount.ToString()},花費了{watch.ElapsedMilliseconds.ToString()}ms;"); System.Threading.Thread.Sleep(20); } } } } }
FluentConsole 是一個控制臺應用程序的輸出插件,開源的,有興趣的可以自己去玩玩。
internal class Program { private static object obj = new object(); private static void Main(string[] args) { DoubleQueue doubleQueue = new DoubleQueue(); Parallel.For(0, 3000, i => { User user = new User() { Mobile = i.ToString().PadLeft(11, ‘0‘), Pwd = i.ToString().PadLeft(8, ‘8‘) }; doubleQueue.ProducerFunc(user); }); Stopwatch watch = Stopwatch.StartNew(); int allcount = 0; Parallel.For(0, 3000, i => { User user = new User() { Mobile = i.ToString().PadLeft(11, ‘0‘), Pwd = i.ToString().PadLeft(8, ‘8‘) }; lock (obj) { FluentConsole.White.Background.Red.Line(user.ToString()); allcount++; FluentConsole.White.Background.Red.Line($"當前個數{allcount.ToString()},花費了{watch.ElapsedMilliseconds.ToString()}ms;"); System.Threading.Thread.Sleep(20); } }); FluentConsole.Black.Background.Red.Line("執行完成"); Console.Read(); } }
第一個利用雙緩沖隊列來執行,第二個利用lock鎖來執行。下面分別是第一種方法和第二種方法執行時CPU的消耗。
我們可以發現利用雙隊列緩沖的情況下我們減少了CPU的占有。但是我們可能會增加執行的時間。
參考文章:http://www.codeproject.com/Articles/27703/Producer-Consumer-Using-Double-Queues
別人在08年就已經想到了,而我卻在現在才稍微有點想法。
源碼下載
後面再大家的評論和建議之下,將代碼改為如下:
public class DoubleQueue { private ConcurrentQueue<User> _writeQueue; private ConcurrentQueue<User> _readQueue; private volatile ConcurrentQueue<User> _currentQueue; private AutoResetEvent _dataEvent; public DoubleQueue() { _writeQueue = new ConcurrentQueue<User>(); _readQueue = new ConcurrentQueue<User>(); _currentQueue = _writeQueue; _dataEvent = new AutoResetEvent(false); Task.Factory.StartNew(() => ConsumerQueue(), TaskCreationOptions.None); } public void ProducerFunc(User user) { _currentQueue.Enqueue(user); _dataEvent.Set(); } public void ConsumerQueue() { ConcurrentQueue<User> consumerQueue; User user; int allcount = 0; Stopwatch watch = Stopwatch.StartNew(); while (true) { _dataEvent.WaitOne(); if (!_currentQueue.IsEmpty) { _currentQueue = (_currentQueue == _writeQueue) ? _readQueue : _writeQueue; consumerQueue = (_currentQueue == _writeQueue) ? _readQueue : _writeQueue; while (!consumerQueue.IsEmpty) { while (!consumerQueue.IsEmpty) { if (consumerQueue.TryDequeue(out user)) { FluentConsole.White.Background.Red.Line(user.ToString()); allcount++; } } FluentConsole.White.Background.Red.Line($"當前個數{allcount.ToString()},花費了{watch.ElapsedMilliseconds.ToString()}ms;"); System.Threading.Thread.Sleep(20); } } } } }
利用雙緩沖隊列來減少鎖的競爭