1. 程式人生 > >(一)通用工具之同步佇列(sync_queue)

(一)通用工具之同步佇列(sync_queue)

介紹

我們經常需要在多執行緒間通訊,例如網路通訊執行緒和邏輯執行緒,網路執行緒需要把收到的資料 傳遞到 邏輯執行緒進行處理;同樣 邏輯執行緒 需要把傳送的資料,傳遞到網路執行緒進行傳送。 這時我們就需要一種資料結構 同步佇列。 由於C++11 對執行緒提供了支援,我們需要一種支援 先入先出的資料結構即可 ,STL庫裡面已經有現成的 std::deuqe, std::queue。但C++11 引入了右值引用,類的成員函式添加了移動建構函式,利用這個特性讓 std::vector在某些操作情況下可能效能更佳。 同步佇列的操作和普通的一樣:隊尾插入,隊頭出隊。

入隊

為了和stl裡面的容器操作介面保持一致,入隊函式如下
		void push_back(const T& x)
		{
			std::unique_lock<std::mutex> lck(_mutex);
			_queue.push_back(x);
		}

這個函式很簡單,就是對容器加鎖,然後插入資料,防止多個執行緒同時對佇列操作。 C++11 中許多STL容器的插入操作 引入了一個新的函式 emplace_back/emplace, 這個同樣是和右值引用相關,如果插入的資料是右值,那就會呼叫這個資料的移動建構函式,而不是拷貝建構函式,這樣就會比 push_back() 少一次拷貝。同樣我們也實現這個操作:
		template<typename _Tdata>
		void emplace_back(_Tdata&& v)
		{
			std::unique_lock<std::mutex> lck(_mutex);
			_queue.emplace_back(std::forward<_Tdata>(v));
		}

和push_back 一樣,先加鎖,再操作容器。_Tdata 是一個未定的引用型別,可以是右值或者左值,由具體傳入的引數確定(詳見C++11相關資料)。由於這個模板函式被呼叫後就已經例項化,Tdata 將具有確定的型別,在函式內部將會變為左值,std::forward 被稱為完美轉發,將會保持引數的原有型別,傳遞給另一個函式。這樣我們就可以把右值引用型別引數傳遞給 容器的 emplace_back 函式。

出隊

出隊函式入下
		T pop_front()
		{
			std::unique_lock<std::mutex> lck(_mutex);
			if(_queue.empty())
			{
				return T();
			}
			assert(!_queue.empty());
			T t(_queue.front());
			_queue.pop_front();
			return t;
		}

先加鎖,如果佇列為空,則返回一個預設的物件。不為空則彈出隊首資料。下面將會提供一個獲取佇列長度的函式。 使用的時候應該 先檢查長度 再出隊操作。

獲取佇列長度

		size_t size()
		{
			std::unique_lock<std::mutex> lck(_mutex);
			return _queue.size();
		}

返回容器資料個數即可。

Move操作

一般情況下 我們是 邊入隊,邊出隊,由於每個操作都是對佇列中的一個元素的操作,可能更加頻繁的加鎖解鎖。使用std::move 返回容器的右值引用物件,這樣可以獲取容器中的所有元素,並且清空容器,這是一個批量操作,比單個元素操作更高效。
		std::deque<T> move()
		{
			std::unique_lock<std::mutex> lck(_mutex);
			auto tmp = std::move(_queue);
			m_notFull.notify_one();
			return std::move(tmp);
		}

實現程式碼

#pragma once
#include <mutex>
#include <condition_variable>
#include <cassert>
#include <type_traits>
#include <atomic>

namespace moon
{
	template<typename T, typename TContainer = std::deque<T>  , size_t max_size = 50>
	class sync_queue
	{
	public:
		sync_queue()
			:m_exit(false)
		{
		}

		sync_queue(const sync_queue& t) = delete;
		sync_queue& operator=(const sync_queue& t) = delete;

		void push_back(const T& x)
		{
			std::unique_lock<std::mutex> lck(m_mutex);
			m_notFull.wait(lck, [this] {return m_exit || (m_queue.size() < max_size); });
			m_queue.push_back(x);
		}

		template<typename _Tdata>
		void emplace_back(_Tdata&& v)
		{
			std::unique_lock<std::mutex> lck(m_mutex);
			m_notFull.wait(lck, [this] {return m_exit || (m_queue.size() < max_size); });
			m_queue.emplace_back(std::forward<_Tdata>(v));
		}

		size_t size()
		{
			std::unique_lock<std::mutex> lck(m_mutex);
			return m_queue.size();
		}

		//替代pop_front
		TContainer move()
		{
			std::unique_lock<std::mutex> lck(m_mutex);
			auto tmp = std::move(m_queue);
			m_notFull.notify_one();
			return std::move(tmp);
		}

		//當程式退出時呼叫此函式,觸發條件變數
		void exit()
		{
			m_exit = true;
		}

	private:
		std::mutex							m_mutex;
		std::condition_variable		m_notFull;
		TContainer							m_queue;
		std::atomic_bool					m_exit;
	};

}


TContainer支援 std::vector,std::deque. 這裡取消了pop_front 操作,因為 std::vector 沒有pop_front, 為了統一 使用 move函式。可以給佇列限制大小,防止一直入隊,佔用太多記憶體。

示例

這個示例演示了使用同步佇列進行 非同步 加法計算

#include <thread>
#include "sync_queue.h"

struct SAddContext
{
	SAddContext()
		:a(0),b(b)
	{

	}
	int a;
	int b;
};

struct SAddResult
{
	SAddResult()
		:a(0), b(0),result(0)
	{

	}
	int a;
	int b;
	int result;
};

int main()
{
	moon::sync_queue<SAddContext> que1;//main thread - calculate thread
	moon::sync_queue<SAddResult> que2;//calculate thread - print thread

	std::thread calculate([&que1,&que2]() {
		while (1)
		{
			//如果佇列為空 ,等待
			if (que1.size() == 0)
			{
				std::this_thread::sleep_for(std::chrono::milliseconds(100));
			}

			//獲取所有非同步計算請求
			auto data = que1.move();
			for (auto& dat : data)
			{
				SAddResult sr;
				sr.a = dat.a;
				sr.b = dat.b;
				sr.result = dat.a + dat.b;
				que2.push_back(sr);
			}
		}
	});

	std::thread printThread([&que2]() {
		while (1)
		{
			//如果佇列為空 ,等待
			if (que2.size() == 0)
			{
				std::this_thread::sleep_for(std::chrono::milliseconds(100));
			}

			auto data = que2.move();
			for (auto& dat : data)
			{
				printf("%d + %d = %d\r\n", dat.a, dat.b, dat.result);
			}
		}
	});

	int x = 0;
	int y = 0;

	while (std::cin >> x >> y)
	{
		SAddContext sc;
		sc.a = x;
		sc.b = y;
		que1.push_back(sc);
	}
};