1. 程式人生 > >C++11執行緒安全佇列和安全棧

C++11執行緒安全佇列和安全棧

文章程式碼取自C++11併發程式設計指南,記錄於此方便日後檢視
#include "stdafx.h"
#include <thread>
#include <iostream>
#include <vector>
#include <algorithm>
#include <map>
#include <mutex>
#include <stack>
#include <string>
#include <exception>
#include <memory> // For std::shared_ptr<>
#include <queue>
#include <condition_variable>
#include <atomic>

using namespace std;
struct empty_stack : std::exception
{
	const char* what() const throw() {
		return "empty stack!";
	};
};

// 執行緒安全的棧 
template<typename T = int>
class threadsafe_stack
{
public:
	threadsafe_stack(){}
	threadsafe_stack(const threadsafe_stack& other)
	{
		// 在建構函式體中的執行拷貝
		std::lock_guard<std::mutex> lock(other.m);
		data = other.data; 
	}
	
	// 刪除賦值運算子
	threadsafe_stack& operator=(const threadsafe_stack&) = delete;
	
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lock(m);
		data.push(new_value);
	} 
        // 如果為空則丟擲empty_stack異常
	// front和pop的功能
	std::shared_ptr<T> pop()
	{
		std::lock_guard<std::mutex> lock(m);
		// 在呼叫pop前,檢查棧是否為空
		if (data.empty()) throw empty_stack(); 
		
		// 在修改堆疊前,分配出返回值
		std::shared_ptr<T> const res(std::make_shared<T>(data.top())); 
		data.pop();
		return res;
	} 
	void pop(T& value)
	{
		std::lock_guard<std::mutex> lock(m);
		if (data.empty()) throw empty_stack();
		value = data.top();
		data.pop();
	}
	bool empty() const
	{
		std::lock_guard<std::mutex> lock(m);
		return data.empty();
	}
private:
	std::stack<T> data;
	mutable std::mutex m;
};

template<typename T = int>
class threadsafe_queue
{
public:
	threadsafe_queue(){}
	threadsafe_queue(threadsafe_queue const& other)
	{
		std::lock_guard<std::mutex> lk(other.mut);
		data_queue = other.data_queue;
	} 
	void push(T new_value)
	{
		std::lock_guard<std::mutex> lk(mut);
		data_queue.push(new_value);
		data_cond.notify_one();
	}
	void wait_and_pop(T& value)
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this]{return !data_queue.empty(); });
		value = data_queue.front();
		data_queue.pop();
	} 
        std::shared_ptr<T> wait_and_pop()
	{
		std::unique_lock<std::mutex> lk(mut);
		data_cond.wait(lk, [this]{return !data_queue.empty(); });
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	}
	bool try_pop(T& value)
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return false;
		value = data_queue.front();
		data_queue.pop();
		return true;
	}
       std::shared_ptr<T> try_pop()
	{
		std::lock_guard<std::mutex> lk(mut);
		if (data_queue.empty())
			return std::shared_ptr<T>();
		std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()));
		data_queue.pop();
		return res;
	} 
	bool empty() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.empty();
	}
	int size() const
	{
		std::lock_guard<std::mutex> lk(mut);
		return data_queue.size();
	}
private:
	mutable std::mutex mut; // 1 互斥量必須是可變的
	std::queue<T> data_queue;
	std::condition_variable data_cond;
};

threadsafe_stack<int> MyStack;
threadsafe_queue<std::string> MyQueue;
atomic<bool> MyQueueFlag = false;
void PushStackThread(int StartNum/*起始號碼*/, int Sum/*個數*/)
{
	for (int index = 0; index < Sum; ++index)
	{
		MyStack.push(StartNum++);
	}
}

void PopStackThread()
{
	while (!MyStack.empty())
	{
		try
		{
			shared_ptr<int> re = MyStack.pop();
			cout << *re << endl;
		}
		catch (empty_stack &ex)
		{
			cout << ex.what() << endl;
		}
	}
	cout << "end" << endl;
}

void Add(int &elem)
{
	elem = elem + 1000;
}
void ReadQueue()
{
	while (MyQueueFlag)
	{
 		std::string Value;
 		if (MyQueue.try_pop(Value))
 		{
 			cout << "try_pop value is " << Value << endl;
 		}
 		else
 		{
 			cout << "try_pop failed!" << endl;
 			std::this_thread::sleep_for(std::chrono::milliseconds(50));
 		}
//  		shared_ptr<std::string> pValue = MyQueue.try_pop();
//  		if (pValue)
//  		{
//  			cout << "try_pop shared_ptr value is " << pValue << endl;
//  		}
// 		auto pValueWait = MyQueue.wait_and_pop();
// 		if (!pValueWait)
// 		{
// 			cout << "wait_and_pop shared_ptr value is " << pValueWait << endl;
// 		}
// 		std::string WaitValue;
// 		MyQueue.wait_and_pop(WaitValue);
// 		if (!WaitValue.empty())
// 		{
// 			cout << "wait_and_pop value is " << WaitValue << endl;
// 		}
	}	
}
void WriteQueue()
{
	int Cnt = 0;
	while (MyQueueFlag)
	{
		if (MyQueue.size() > 1000)
		{
			std::this_thread::sleep_for(std::chrono::milliseconds(10));
			continue;
		}
		char Value[64] = { 0 };
		sprintf_s(Value, 64, "%d", ++Cnt);
		MyQueue.push(std::string(Value));
	}
}
int _tmain(int argc, _TCHAR* argv[])
{
	bool IsEmptyMyStack = MyStack.empty();

#if 0 // 單執行緒
	for (int index = 0; index < 10; ++index)
	{
		MyStack.push(index + 1);
	}

	for (int index = 0; index < 10; ++index)
	{
		shared_ptr<int> pValue = MyStack.pop();
		cout << *pValue << endl;
	}
//#else // 多執行緒操作
	vector<std::thread> MyThreads;
	for (int index = 0; index < 10; ++index)
	{
		MyThreads.push_back(std::thread(PushStackThread, index * 10, 10));
	}
	// 等待所有執行緒結束
	for_each(MyThreads.begin(), MyThreads.end(), std::mem_fn(&std::thread::detach));

	// 至此堆疊中已經添加了100個數據
	vector<std::thread> MyPopThreads;
	for (int index = 0; index < 10; ++index)
	{
		MyPopThreads.push_back(std::thread(PopStackThread));
	}
	for_each(MyPopThreads.begin(), MyPopThreads.end(), std::mem_fn(&std::thread::join));
#endif
#if 0
	vector<int> MyAdd;
	for (int index = 0; index < 100; ++index)
	{
		MyAdd.push_back(index);
	}
	for_each(MyAdd.begin(), MyAdd.end(), Add);

#endif
	MyQueueFlag = true;
	std::thread t(WriteQueue);
	
	vector<std::thread> MyThreads;
	for (int index = 0; index < 100; index++)
	{
		MyThreads.push_back(std::thread(ReadQueue));
	}
	
	std::this_thread::sleep_for(std::chrono::minutes(1));
	MyQueueFlag = false;
	t.join();
	for_each(MyThreads.begin(), MyThreads.end(), mem_fn(&std::thread::join));

	return 0;
}