1. 程式人生 > >Linux C/C++多執行緒學習:生產者消費者問題

Linux C/C++多執行緒學習:生產者消費者問題

#include <iostream>
#include <mutex>
#include <condition_variable>
#include <unistd.h>
#include <thread>
using namespace std;

const int kProduceItems = 10;
const int kRepositorySize = 4;

template<class T>
class Repository {
public:
    T items_buff[kRepositorySize];
    mutex mtx; // 生產者消費者互斥量
    mutex produce_mutex; // 生產計數互斥量
    mutex consume_mutex; // 消費計數互斥量
    size_t produce_item_count;
    size_t consume_item_count;
    size_t produce_position; // 下一個生產的位置
    size_t consume_position; // 下一個消費的位置
    condition_variable repo_not_full; // 倉庫不滿條件變數
    condition_variable repo_not_empty; // 倉庫不空條件變數

    Repository() {
        produce_item_count = 0;
        consume_item_count = 0;
        produce_position = 0;
        consume_position = 0;
    };

    void Init() {
        fill_n(items_buff, sizeof(items_buff)/sizeof(items_buff[0]), 0);
        produce_item_count = 0;
        consume_item_count = 0;
        produce_position = 0;
        consume_position = 0;
    }
};

template<class T>
class Factory {
private:
    Repository<T> repo;

    void ProduceItem(T item) {
        unique_lock<mutex> lock(repo.mtx);
        // +1 後判斷,因為在初始時,兩者位於同一位置(因此倉庫中最大存在 kRepositorySize-1 個產品)
        while ((repo.produce_position+1) % kRepositorySize == repo.consume_position) {
            cout << "Repository is full, waiting..." << endl;
            (repo.repo_not_full).wait(lock); // 阻塞時釋放鎖,被喚醒時獲得鎖
        }
        repo.items_buff[repo.produce_position++] = item;
        if (repo.produce_position == kRepositorySize)
            repo.produce_position = 0;
        (repo.repo_not_empty).notify_all(); // 喚醒所有因空阻塞的程序
        lock.unlock();
    }

    T ConsumeItem() {
        unique_lock<mutex> lock(repo.mtx);
        while (repo.consume_position == repo.produce_position) {
            cout << "Repository is empty, waiting ..." << endl;
            (repo.repo_not_empty).wait(lock);
        }
        T data = repo.items_buff[repo.consume_position++];
        if (repo.consume_position == kRepositorySize)
            repo.consume_position = 0;
        (repo.repo_not_full).notify_all();
        lock.unlock();
        return data;
    }

public:
    void Reset() {
        repo.Init();
    }

    void ProduceTask() {
        bool ready_to_exit = false;
        while (true) {
            sleep(1); // 如果不sleep ,執行太快,一個程序會完成所有生產
            unique_lock<mutex> lock(repo.produce_mutex);

            if (repo.produce_item_count < kProduceItems) {
                ++(repo.produce_item_count);
                T item = repo.produce_item_count;
                cout << "producer id: "<< this_thread::get_id() << " is producing "
                     << item << "^th item..." << endl;
                ProduceItem(item);
            } else {
                ready_to_exit = true;
            }

            lock.unlock();
            // sleep(1);
            if (ready_to_exit)
                break;
        }
        printf("Producer thread %lld is exiting...\n", std::this_thread::get_id());
    }

    void ConsumeTask() {
        bool ready_to_exit =false;
        while (true) {
            sleep(1); // 如果不sleep ,執行太快,一個程序會消費所有產品
            unique_lock<mutex> lock(repo.consume_mutex);

            if (repo.consume_item_count < kProduceItems) {
                T item = ConsumeItem();
                cout << "consumer id: " << this_thread::get_id() << " is consuming "
                     << item << "^th item" << endl;
                ++(repo.consume_item_count);
            } else {
                ready_to_exit = true;
            }

            lock.unlock();
            // sleep(1);
            if (ready_to_exit)
                break;
        }
        printf("Consumer thread %lld is exiting...\n", std::this_thread::get_id());
    }
};

int main() {
    cout << "Main thread id :" << this_thread::get_id() << endl;

    Factory<int> myfactory;

    thread producer1(&Factory<int>::ProduceTask, &myfactory);
    thread producer2(&Factory<int>::ProduceTask, &myfactory);
    thread producer3(&Factory<int>::ProduceTask, &myfactory);

    thread consumer1(&Factory<int>::ConsumeTask, &myfactory);
    thread consumer2(&Factory<int>::ConsumeTask, &myfactory);
    thread consumer3(&Factory<int>::ConsumeTask, &myfactory);

    producer1.join();
    producer2.join();
    producer3.join();

    consumer1.join();
    consumer2.join();
    consumer3.join();

    return 0;
}