C++11實現生產者消費者模式
阿新 • • 發佈:2019-01-22
併發程式設計中,資料產生和資料處理在不同的執行緒,這些執行緒傳遞資料常用的就是生產者消費者模式
以下是模仿Java的BlockingQueue實現的生產者消費者模式:
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
enum PopResult{ POP_OK, POP_STOP, POP_UNEXPECTED };
template<class T>
class BlockingQueue : public std::queue <T>
{
public:
std::mutex m_lock;
std::condition_variable m_cond;
bool m_stopFlag = false;
virtual ~BlockingQueue() = default;
void push(const T& value)
{
std::lock_guard<decltype(m_lock)> lock(m_lock);
queue::push(value);
m_cond.notify_one();
}
void push(T&& value)
{
std::lock_guard<decltype(m_lock)> lock(m_lock);
queue::push(std::move(value));
m_cond.notify_one();
}
PopResult pop(T& out)
{
std::unique_lock<decltype(m_lock)> lock(m_lock);
if (m_stopFlag) // 停止
return POP_STOP;
if (empty())
m_cond.wait(lock);
if (m_stopFlag) // 停止
return POP_STOP;
if (empty()) // 意外喚醒?
return POP_UNEXPECTED;
out = std::move(front());
queue::pop();
return POP_OK;
}
void Stop()
{
std::lock_guard<decltype(m_lock)> lock(m_lock);
m_stopFlag = true;
m_cond.notify_all();
}
};
使用方法:
#include <iostream>
#include <thread>
BlockingQueue<int> g_queue;
// 生產者執行緒
void Produce()
{
for (int i = 0; i < 10; ++i)
g_queue.push(i);
}
// 消費者執行緒
void Consume()
{
int data;
while (true)
{
// 取資料
PopResult res = g_queue.pop(data);
if (res == POP_STOP) // 執行緒應該停止
break;
if (res == POP_UNEXPECTED) // 意外喚醒
continue;
// 處理資料
std::cout << data << std::endl;
}
}
int _tmain(int argc, _TCHAR* argv[])
{
// 啟動生產者執行緒和消費者執行緒(也可以啟動多個執行緒)
std::thread produceThread(Produce);
std::thread consumerThread(Consume);
// 等待資料處理完
Sleep(1000);
produceThread.join();
// 停止執行緒
g_queue.Stop();
consumerThread.join();
return 0;
}