1. 程式人生 > >Boost lockfree deque 生產者與消費者多對多線程應用

Boost lockfree deque 生產者與消費者多對多線程應用

cas 生產者 pointers data thread prev included clas signed

  boost庫中有一個boost::lockfree::queue類型的 隊列,對於一般的需要隊列的程序,其效率都算不錯的了,下面使用一個用例來說明。

  程序是一個典型的生產者與消費者的關系,都可以使用多線程,其效率要比使用上層的互斥鎖要快很多,因為它直接使用底層的原子操作來進行同步數據的。

  freedeque.h

  1 #pragma once#ifndef INCLUDED_UTILS_LFRINGQUEUE  
  2 #define INCLUDED_UTILS_LFRINGQUEUE  
  3 
  4 #define _ENABLE_ATOMIC_ALIGNMENT_FIX  
  5
#define ATOMIC_FLAG_INIT 0 6 7 8 #pragma once 9 10 11 #include <vector> 12 #include <mutex> 13 #include <thread> 14 #include <atomic> 15 #include <chrono> 16 #include <cstring> 17 #include <iostream> 18 19
// Lock free ring queue 20 21 template < typename _TyData, long _uiCount = 100000 > 22 class lfringqueue 23 { 24 public: 25 lfringqueue(long uiCount = _uiCount) : m_lTailIterator(0), m_lHeadIterator(0), m_uiCount(uiCount) 26 { 27 m_queue = new _TyData*[m_uiCount]; 28
memset(m_queue, 0, sizeof(_TyData*) * m_uiCount); 29 } 30 31 ~lfringqueue() 32 { 33 if (m_queue) 34 delete[] m_queue; 35 } 36 37 bool enqueue(_TyData *pdata, unsigned int uiRetries = 1000) 38 { 39 if (NULL == pdata) 40 { 41 // Null enqueues are not allowed 42 return false; 43 } 44 45 unsigned int uiCurrRetries = 0; 46 while (uiCurrRetries < uiRetries) 47 { 48 // Release fence in order to prevent memory reordering 49 // of any read or write with following write 50 std::atomic_thread_fence(std::memory_order_release); 51 52 long lHeadIterator = m_lHeadIterator; 53 54 if (NULL == m_queue[lHeadIterator]) 55 { 56 long lHeadIteratorOrig = lHeadIterator; 57 58 ++lHeadIterator; 59 if (lHeadIterator >= m_uiCount) 60 lHeadIterator = 0; 61 62 // Don‘t worry if this CAS fails. It only means some thread else has 63 // already inserted an item and set it. 64 if (std::atomic_compare_exchange_strong(&m_lHeadIterator, &lHeadIteratorOrig, lHeadIterator)) 65 { 66 // void* are always atomic (you wont set a partial pointer). 67 m_queue[lHeadIteratorOrig] = pdata; 68 69 if (m_lEventSet.test_and_set()) 70 { 71 m_bHasItem.test_and_set(); 72 } 73 return true; 74 } 75 } 76 else 77 { 78 // The queue is full. Spin a few times to check to see if an item is popped off. 79 ++uiCurrRetries; 80 } 81 } 82 return false; 83 } 84 85 bool dequeue(_TyData **ppdata) 86 { 87 if (!ppdata) 88 { 89 // Null dequeues are not allowed! 90 return false; 91 } 92 93 bool bDone = false; 94 bool bCheckQueue = true; 95 96 while (!bDone) 97 { 98 // Acquire fence in order to prevent memory reordering 99 // of any read or write with following read 100 std::atomic_thread_fence(std::memory_order_acquire); 101 //MemoryBarrier(); 102 long lTailIterator = m_lTailIterator; 103 _TyData *pdata = m_queue[lTailIterator]; 104 //volatile _TyData *pdata = m_queue[lTailIterator]; 105 if (NULL != pdata) 106 { 107 bCheckQueue = true; 108 long lTailIteratorOrig = lTailIterator; 109 110 ++lTailIterator; 111 if (lTailIterator >= m_uiCount) 112 lTailIterator = 0; 113 114 //if ( lTailIteratorOrig == atomic_cas( (volatile long*)&m_lTailIterator, lTailIterator, lTailIteratorOrig )) 115 if (std::atomic_compare_exchange_strong(&m_lTailIterator, &lTailIteratorOrig, lTailIterator)) 116 { 117 // Sets of sizeof(void*) are always atomic (you wont set a partial pointer). 118 m_queue[lTailIteratorOrig] = NULL; 119 120 // Gets of sizeof(void*) are always atomic (you wont get a partial pointer). 121 *ppdata = (_TyData*)pdata; 122 123 return true; 124 } 125 } 126 else 127 { 128 bDone = true; 129 m_lEventSet.clear(); 130 } 131 } 132 *ppdata = NULL; 133 return false; 134 } 135 136 137 long countguess() const 138 { 139 long lCount = trycount(); 140 141 if (0 != lCount) 142 return lCount; 143 144 // If the queue is full then the item right before the tail item will be valid. If it 145 // is empty then the item should be set to NULL. 146 long lLastInsert = m_lTailIterator - 1; 147 if (lLastInsert < 0) 148 lLastInsert = m_uiCount - 1; 149 150 _TyData *pdata = m_queue[lLastInsert]; 151 if (pdata != NULL) 152 return m_uiCount; 153 154 return 0; 155 } 156 157 long getmaxsize() const 158 { 159 return m_uiCount; 160 } 161 162 bool HasItem() 163 { 164 return m_bHasItem.test_and_set(); 165 } 166 167 void SetItemFlagBack() 168 { 169 m_bHasItem.clear(); 170 } 171 172 private: 173 long trycount() const 174 { 175 long lHeadIterator = m_lHeadIterator; 176 long lTailIterator = m_lTailIterator; 177 178 if (lTailIterator > lHeadIterator) 179 return m_uiCount - lTailIterator + lHeadIterator; 180 181 // This has a bug where it returns 0 if the queue is full. 182 return lHeadIterator - lTailIterator; 183 } 184 185 private: 186 std::atomic<long> m_lHeadIterator; // enqueue index 187 std::atomic<long> m_lTailIterator; // dequeue index 188 _TyData **m_queue; // array of pointers to the data 189 long m_uiCount; // size of the array 190 std::atomic_flag m_lEventSet = ATOMIC_FLAG_INIT; // a flag to use whether we should change the item flag 191 std::atomic_flag m_bHasItem = ATOMIC_FLAG_INIT; // a flag to indicate whether there is an item enqueued 192 }; 193 194 #endif //INCLUDED_UTILS_LFRINGQUEUE

  

/*
* File:   main.cpp
* Author: Peng
*
* Created on February 22, 2014, 9:55 PM
*/
#include <iostream> 
#include <string>  
#include "freedeque.h" 
#include <sstream>  
#include <boost/thread/thread.hpp>  
#include <boost/lockfree/queue.hpp>    
#include <boost/atomic.hpp>  
#include<boost/thread/lock_guard.hpp>
#include<boost/thread/mutex.hpp>
#include<boost/date_time/posix_time/posix_time.hpp>

const int NUM_ENQUEUE_THREAD = 5;
const int NUM_DEQUEUE_THREAD = 10;
const long NUM_ITEM = 50000;
const long NUM_DATA = NUM_ENQUEUE_THREAD * NUM_ITEM;

class Data {
public:
	Data(int i = 0) : m_iData(i)
	{
		std::stringstream ss;
		ss << i;
		m_szDataString = ss.str();    
	}

	bool operator< (const Data & aData) const
	{
		if (m_iData < aData.m_iData)
			return true;
		else
			return false;
	}

	int& GetData()
	{
		return m_iData;
	}
private:
	int m_iData;
	std::string m_szDataString;
};

Data* g_arrData = new Data[NUM_DATA];
boost::mutex mtx;

constexpr long size = 0.5 * NUM_DATA;
lfringqueue < Data, 10000> LockFreeQueue;
boost::lockfree::queue<Data*> BoostQueue(10000);

bool GenerateRandomNumber_FindPointerToTheNumber_EnQueue(int n)
{
	for (long i = 0; i < NUM_ITEM; i++)
	{
		int x = i + NUM_ITEM * n;
		Data* pData = g_arrData + x;
		LockFreeQueue.enqueue(pData);	
	}
	return true;
}



void print(Data* pData) {
	if (!pData)
		return;

	boost::lock_guard<boost::mutex> lock(mtx);

	std::cout << pData->GetData() << std::endl;
	
}

bool Dequeue()
{
	Data *pData = NULL;
	
	while (true)
	{
		if (LockFreeQueue.dequeue(&pData) && pData)
		{
			print(pData);
		}
		else {
			boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(5));
		}
	}

	return true;
}

int main(int argc, char** argv)
{
	for (int i = 0; i < NUM_DATA; ++i)
	{
		Data data(i);
		//DataArray[i] = data;
		*(g_arrData + i) = data;
	}

	std::thread PublishThread[NUM_ENQUEUE_THREAD];
	std::thread ConsumerThread[NUM_DEQUEUE_THREAD];
	std::chrono::duration<double> elapsed_seconds;

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i] = std::thread(GenerateRandomNumber_FindPointerToTheNumber_EnQueue, i);
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i] = std::thread{ Dequeue };
	}

	for (int i = 0; i < NUM_DEQUEUE_THREAD; i++)
	{
		ConsumerThread[i].join();
	}

	for (int i = 0; i < NUM_ENQUEUE_THREAD; i++)
	{
		PublishThread[i].join();
	}

	delete[] g_arrData;
	return 0;
}

  說明:模板文件是原作者寫的,為了驗證其正確性,後面的測試程序我改寫了一下,最後測試程序是無法退出來的,這裏只是測試,沒有進一步完善了。

  在測試中發現deque應該是大小限制的,再增大data的數據程序會阻塞在某個地方沒有進一步再查找原因了,以後有時候再做修改,對於一般的工程都夠用了。

Boost lockfree deque 生產者與消費者多對多線程應用