1. 程式人生 > >c++使用細粒度鎖以及傀儡節點的執行緒安全佇列

c++使用細粒度鎖以及傀儡節點的執行緒安全佇列

研究了一下執行緒安全的資料結構,參考了 《c++併發程式設計實戰》這本書上的程式碼寫了一個能夠編譯執行的版本

這份程式碼解決的核心問題是在細粒度鎖的併發情況下可能出現的多個鎖同時鎖住一個節點

解決的方法是使用傀儡節點即在佇列為“空”的情況下依然有兩個節點一個為空的頭節點一個為尾的空的傀儡節點

實現程式碼:

/*
 * wait_lock_thread_queue.cpp
 *
 *  Created on: Apr 28, 2018
 *      Author: [email protected]
 *      使用鎖和等待以及實現細粒度鎖
 *      的執行緒安全佇列
 *      而且通過新增傀儡節點來解決佇列
 *      在併發模式下只有一個節點時
 *      push和pop操作時要用兩個
 *      互斥元同時鎖住一個節點的
 *      問題。
 */
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <future>

template<typename T>
class threadsafe_queue
{
public:
	threadsafe_queue():
		//為了在一開始就有一個傀儡節點可以使用,建立一個新的空節點
		_head(new node),_tail(_head.get())
	{}

	threadsafe_queue(const threadsafe_queue& other)=delete;

	threadsafe_queue& operator=(const threadsafe_queue& othre)=delete;

	std::shared_ptr<T> try_pop()
	{
		std::unique_ptr<node> old_head = try_pop_head();
		return old_head?old_head->data:std::shared_ptr<T>();
	}

	bool try_pop(T& value)
	{
		std::unique_ptr<node> const old_head = try_pop_head(value);
		return old_head;
	}

	std::shared_ptr<T> wait_and_pop()
	{
		std::unique_ptr<node> const old_head = wait_pop_head();
		std::cout<<"old_head->data = "<<*old_head->data<<std::endl;
		return old_head->data;
	}

	void wait_and_pop(T& value)
	{
		std::unique_ptr<node> const old_head = wait_pop_head(value);
	}

	void push(T new_value)
	{
		/*
		 * 建立一個T的新的例項
		 * 並且在shared_ptr<>中取得所有權
		 * 使用make_shared是為了避免第二次為引用計數分配記憶體的開銷
		 */
		std::shared_ptr<T> new_data(
				std::make_shared<T>(std::move(new_value)));
		//建立的新的節點將會作為新的傀儡節點,因此無需向建構函式提供new_value
		std::unique_ptr<node> p(new node);
		//加鎖
		std::lock_guard<std::mutex> tail_lock(_tail_mutex);
		//將舊的傀儡節點上的資料設定為新分配的new_value的副本
		_tail->data = new_data;
		node* const new_tail = p.get();
		_tail->next = std::move(p);
		_tail = new_tail;
		_data_cond.notify_one();
	}

	bool empty()
	{
		std::lock_guard<std::mutex> head_lock(_head_mutex);
		return (_head.get() == get_tail());
	}
private:
	struct node
	{
		std::shared_ptr<T> data;
		std::unique_ptr<node> next;
	};

	std::mutex _head_mutex;
	std::unique_ptr<node> _head;
	std::mutex _tail_mutex;
	node* _tail;
	std::condition_variable _data_cond;
private:
	node* get_tail()
	{
		std::lock_guard<std::mutex> tail_lock(_tail_mutex);
		return _tail;
	}

	std::unique_ptr<node> pop_head()
	{
		std::unique_ptr<node> old_head = std::move(_head);
		_head = std::move(old_head->next);
		return old_head;
	}

	//確保資料被相關的wait_pop_head修改時持有相同的鎖,所以返回一個鎖的例項
	std::unique_lock<std::mutex> wait_for_data()
	{
		std::unique_lock<std::mutex> head_lock(_head_mutex);
		//判斷是否是空佇列不是則返回鎖
		_data_cond.wait(head_lock,[&]{return _head.get() != get_tail();});
		return std::move(head_lock);
	}

	std::unique_ptr<node> wait_pop_head()
	{
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		return pop_head();
	}

	std::unique_ptr<node> wait_pop_head(T& value)
	{
		std::unique_lock<std::mutex> head_lock(wait_for_data());
		value = std::move(*_head->data);
		return pop_head();
	}

	std::unique_ptr<node> try_pop_head()
	{
		std::lock_guard<std::mutex> head_lock(_head_mutex);
		if(_head.get() == get_tail())
		{
			return std::unique_ptr<node>();
		}
		return pop_head();
	}

	std::unique_ptr<node> try_pop_head(T& value)
	{
		std::lock_guard<std::mutex> head_lock(_head_mutex);
		if(_head.get == get_tail())
		{
			return std::unique_ptr<node>();
		}
		value = std::move(*_head->data);
		return pop_head;
	}
};

int main()
{
	threadsafe_queue<int> my_queue;
	int c = 0;
	//建立執行緒時使用lambda
		std::thread d(
	   [&]{
			int d = 0;
			for(;;)
			{
				if(!my_queue.empty())
					{
					d++;
					if((d%10) == 0)
					{
						std::cout<<"執行緒d:d 求餘10=0 開始刪除一個佇列元素 d = "<<d<<std::endl;
						std::cout<<"執行緒d pop的值是:"<<*my_queue.wait_and_pop()<<std::endl;
						c++;
						if(c > 70)
						{
							return;
						}
					}
				}
			}
		  }
		);

		std::thread d2(
	   [&]{
			int d = 0;
				for(;;)
				{
					if(!my_queue.empty())
						{
						d++;
						if((d%10) == 0)
						{
							std::cout<<"執行緒d2:d 求餘10=0 開始刪除一個佇列元素 d = "<<(int)(d)<<std::endl;
							std::cout<<"執行緒d2 pop的值是:"<<*my_queue.wait_and_pop()<<std::endl;
							c++;
							if(c > 70)
							{
								return;
							}
						}
					}
				}
		}
		);

	for(int i = 0;i<3;++i)
	{
		std::thread t(
		[&]{
			int j;
			for(;;)
			{
//				std::cout<<"執行緒id:"<<i<<"push "<<j<<std::endl;
				++j;
				my_queue.push(j);
			}
		  });
		   t.detach();
	}
	d.join();
	d2.join();


	return 0;
}

執行:g++ file_name.cpp -pthread -std=c++11編譯後執行