1. 程式人生 > >C++11實現生產者消費者模式

C++11實現生產者消費者模式

併發程式設計中,資料產生和資料處理在不同的執行緒,這些執行緒傳遞資料常用的就是生產者消費者模式

以下是模仿Java的BlockingQueue實現的生產者消費者模式:

#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>


enum PopResult{ POP_OK, POP_STOP, POP_UNEXPECTED };

template<class T>
class BlockingQueue : public std::queue
<T>
{ public: std::mutex m_lock; std::condition_variable m_cond; bool m_stopFlag = false; virtual ~BlockingQueue() = default; void push(const T& value) { std::lock_guard<decltype(m_lock)> lock(m_lock); queue::push(value); m_cond.notify_one(); } void
push(T&& value) { std::lock_guard<decltype(m_lock)> lock(m_lock); queue::push(std::move(value)); m_cond.notify_one(); } PopResult pop(T& out) { std::unique_lock<decltype(m_lock)> lock(m_lock); if (m_stopFlag) // 停止 return
POP_STOP; if (empty()) m_cond.wait(lock); if (m_stopFlag) // 停止 return POP_STOP; if (empty()) // 意外喚醒? return POP_UNEXPECTED; out = std::move(front()); queue::pop(); return POP_OK; } void Stop() { std::lock_guard<decltype(m_lock)> lock(m_lock); m_stopFlag = true; m_cond.notify_all(); } };

使用方法:

#include <iostream>
#include <thread>


BlockingQueue<int> g_queue;


// 生產者執行緒
void Produce()
{
    for (int i = 0; i < 10; ++i)
        g_queue.push(i);
}

// 消費者執行緒
void Consume()
{
    int data;
    while (true)
    {
        // 取資料
        PopResult res = g_queue.pop(data);
        if (res == POP_STOP) // 執行緒應該停止
            break;
        if (res == POP_UNEXPECTED) // 意外喚醒
            continue;

        // 處理資料
        std::cout << data << std::endl;
    }
}

int _tmain(int argc, _TCHAR* argv[])
{
    // 啟動生產者執行緒和消費者執行緒(也可以啟動多個執行緒)
    std::thread produceThread(Produce);
    std::thread consumerThread(Consume);

    // 等待資料處理完
    Sleep(1000);

    produceThread.join();
    // 停止執行緒
    g_queue.Stop();
    consumerThread.join();

    return 0;
}