1. 程式人生 > >ring buffer 親測好用 C++ 11 加入 條件變數

ring buffer 親測好用 C++ 11 加入 條件變數

 

#pragma once
#include <atomic>
#include <mutex>
class CCycleBuffer
{
public:
    bool isFull();
    bool isEmpty();
    void empty();
    int getReadableLength();
    int getWriteableLength();
    CCycleBuffer(int size);
    virtual~CCycleBuffer();
    int write(char* buf, int
count); int read(char* buf, int count); int getStart() { return m_nReadPos; } int getEnd() { return m_nWritePos; } private: std::atomic_bool m_bEmpty, m_bFull; char* m_pBuf; int m_nBufSize; std::atomic_int m_nReadPos; std::atomic_int m_nWritePos;
//std::mutex m_mutex; std::mutex mtx; std::unique_lock <std::mutex> *lck; std::condition_variable cv; int test; };

 

#include "CCycleBuffer.h"
#include <assert.h>
#include <memory.h>
// 定義 
CCycleBuffer::CCycleBuffer(int size)

{
    m_nBufSize = size;
    m_nReadPos 
= 0; m_nWritePos = 0; m_pBuf = new char[m_nBufSize]; m_bEmpty = true; m_bFull = false; lck = new std::unique_lock <std::mutex>(mtx); test = 0; } CCycleBuffer::~CCycleBuffer() { delete[] m_pBuf; } /************************************************************************/ /* 向緩衝區寫入資料,返回實際寫入的位元組數 */ /************************************************************************/ int CCycleBuffer::write(char* buf, int count) { std::atomic_int m_nReadPos; m_nReadPos = (int)this->m_nReadPos; if (count <= 0) return 0; // 緩衝區已滿,不能繼續寫入 if (m_bFull) { return 0; } else if (m_nReadPos == m_nWritePos)// 緩衝區為空時 { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (empty) →| |------------||------------------------------| ↑← leftcount →| m_nReadPos m_nWritePos */ int leftcount = m_nBufSize - m_nWritePos; if (leftcount > count) { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + m_nWritePos, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else if (m_nReadPos < m_nWritePos)// 有剩餘空間可寫入 { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxx||---------------| ↑ ↑← leftcount →| m_nReadPos m_nWritePos */ // 剩餘緩衝區大小(從寫入位置到緩衝區尾) int leftcount = m_nBufSize - m_nWritePos; int test = m_nWritePos; if (leftcount > count) // 有足夠的剩餘空間存放 { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else // 剩餘空間不足 { // 先填充滿剩餘空間,再回頭找空間存放 std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + test, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑← leftcount →↑ | m_nWritePos m_nReadPos */ int leftcount = m_nReadPos - m_nWritePos; if (leftcount > count) { // 有足夠的剩餘空間存放 memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { // 剩餘空間不足時要丟棄後面的資料 memcpy(m_pBuf + m_nWritePos, buf, leftcount); //m_mutex.lock(); m_nWritePos += leftcount; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount; } } } /************************************************************************/ /* 從緩衝區讀資料,返回實際讀取的位元組數 */ /************************************************************************/ int CCycleBuffer::read(char* buf, int count) { std::atomic_int m_nWritePos; m_nWritePos = (int)this->m_nWritePos; if (count <= 0) return 0; //m_bFull = false; if (m_bEmpty) // 緩衝區空,不能繼續讀取資料 { cv.wait(*lck); //return 0; } else if (m_nReadPos == m_nWritePos) // 緩衝區滿時 { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (data) →| |xxxxxxxxxxxx||xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx| | ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else { std::atomic_int tmp; int leftcount2; memcpy(buf, m_pBuf + m_nReadPos, leftcount); tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } else if (m_nReadPos < m_nWritePos) // 寫指標在前(未讀資料是連線的) { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxxxx||---------------| ↑← leftcount →↑ | m_nReadPos m_nWritePos */ int leftcount = m_nWritePos - m_nReadPos; int c = (leftcount > count) ? count : leftcount; memcpy(buf, m_pBuf + m_nReadPos, c); //m_mutex.lock(); m_nReadPos += c; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return c; } else // 讀指標在前(未讀資料可能是不連線的) { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑ ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) // 未讀緩衝區夠大,直接讀取資料 { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else // 未讀緩衝區不足,需回到緩衝區頭開始讀 { memcpy(buf, m_pBuf + m_nReadPos, leftcount); std::atomic_int tmp; int leftcount2; tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } } /************************************************************************/ /* 獲取緩衝區有效資料長度 */ /************************************************************************/ int CCycleBuffer::getReadableLength() { if (m_bEmpty) { return 0; } else if (m_bFull) { return m_nBufSize; } else if (m_nReadPos < m_nWritePos) { return m_nWritePos - m_nReadPos; } else { return m_nBufSize - m_nReadPos + m_nWritePos; } } int CCycleBuffer::getWriteableLength() { return m_nBufSize-getReadableLength(); } void CCycleBuffer::empty() { m_nReadPos = 0; m_nWritePos = 0; m_bEmpty = true; m_bFull = false; } bool CCycleBuffer::isEmpty() { return m_bEmpty; } bool CCycleBuffer::isFull() { return m_bFull; }

 

#include <iostream>                // std::cout
#include <thread>                // std::thread
#include <mutex>                // std::mutex, std::unique_lock
#include <condition_variable>    // std::condition_variable
#include <chrono>
#include "CCycleBuffer.h"
//#define TESTTHREAD 
#define TESTRINGBUF

#ifdef TESTTHREAD
std::mutex mtx; // 全域性互斥鎖.
std::condition_variable cv; // 全域性條件變數.
bool ready = false; // 全域性標誌位.
#endif
CCycleBuffer rbuf(10);
char buf[] = "123456";
char  buf4r[6] = { 0 };
int main()
{
    std::cout<<"begin main..."<<std::endl;
#ifdef TESTTHREAD
    std::thread* threads[10];
    // spawn 10 threads:
    for (int i = 0; i < 10; ++i) {
        threads[i] = new  std::thread([](int id){
            std::unique_lock <std::mutex> lck(mtx);
            //while (!ready) // 如果標誌位不為 true, 則等待...
            cv.wait(lck); // 當前執行緒被阻塞, 當全域性標誌位變為 true 之後,
        // 執行緒被喚醒, 繼續往下執行列印執行緒編號id.
            std::cout << "thread " << id << '\n';
        },i);
    }
        

    std::cout << "10 threads ready to race...\n";
    std::unique_lock <std::mutex> lck(mtx);
    ready = true; // 設定全域性標誌位為 true.
    cv.notify_all(); // 喚醒所有執行緒.

    for (auto & th : threads)
        th->join();
#endif
#ifdef TESTRINGBUF
    

    int ret;
    std::thread* threads[2];
    //bzero(buf4r,sizeof(buf4r));
    //ret = rbuf.getWriteableLength();
    //ret = rbuf.write(buf,4);
    //ret = rbuf.read(buf4r,2);
    threads[0] = new std::thread([]() {
        std::cout<<"begin write thread ..."<<std::endl;
        while (true)
        {
            if (5 < rbuf.getWriteableLength())
            {
                rbuf.write(buf, 5);
            }
            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    });
    threads[1] = new std::thread([]() {
        int ret;
        static int cnt = 0;
        bool noData;
        std::cout<<"begin read thread ..."<<std::endl;
        while (true)
        {
            ret = rbuf.read(buf4r, 1);
            if (1 == ret)
            {
                noData = false;
                //std::cout<<"read thread ..."<<std::endl;
                std::cout << "Read:["<<buf4r[0]<<"]"<<std::endl;
                if (0 == ++cnt%5)
                {
                    //std::cout << std::endl;
                }
                //std::cout << "xxx" << std::endl;
            }
            else
            {
                //if (noData == false)
                {
                    //noData = true;
                    std::cout << " read nothing " << std::endl;
                }
            }
        }
    });
#endif // TESTRINGBUF
    for (auto &th: threads)
    {
        th->join();
    }
    //getchar();
    return 0;
}