ring buffer 親測好用 C++ 11 加入 條件變數
阿新 • • 發佈:2018-12-17
#pragma once #include <atomic> #include <mutex> class CCycleBuffer { public: bool isFull(); bool isEmpty(); void empty(); int getReadableLength(); int getWriteableLength(); CCycleBuffer(int size); virtual~CCycleBuffer(); int write(char* buf, intcount); int read(char* buf, int count); int getStart() { return m_nReadPos; } int getEnd() { return m_nWritePos; } private: std::atomic_bool m_bEmpty, m_bFull; char* m_pBuf; int m_nBufSize; std::atomic_int m_nReadPos; std::atomic_int m_nWritePos;//std::mutex m_mutex; std::mutex mtx; std::unique_lock <std::mutex> *lck; std::condition_variable cv; int test; };
#include "CCycleBuffer.h" #include <assert.h> #include <memory.h> // 定義 CCycleBuffer::CCycleBuffer(int size) { m_nBufSize = size; m_nReadPos= 0; m_nWritePos = 0; m_pBuf = new char[m_nBufSize]; m_bEmpty = true; m_bFull = false; lck = new std::unique_lock <std::mutex>(mtx); test = 0; } CCycleBuffer::~CCycleBuffer() { delete[] m_pBuf; } /************************************************************************/ /* 向緩衝區寫入資料,返回實際寫入的位元組數 */ /************************************************************************/ int CCycleBuffer::write(char* buf, int count) { std::atomic_int m_nReadPos; m_nReadPos = (int)this->m_nReadPos; if (count <= 0) return 0; // 緩衝區已滿,不能繼續寫入 if (m_bFull) { return 0; } else if (m_nReadPos == m_nWritePos)// 緩衝區為空時 { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (empty) →| |------------||------------------------------| ↑← leftcount →| m_nReadPos m_nWritePos */ int leftcount = m_nBufSize - m_nWritePos; if (leftcount > count) { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + m_nWritePos, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else if (m_nReadPos < m_nWritePos)// 有剩餘空間可寫入 { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxx||---------------| ↑ ↑← leftcount →| m_nReadPos m_nWritePos */ // 剩餘緩衝區大小(從寫入位置到緩衝區尾) int leftcount = m_nBufSize - m_nWritePos; int test = m_nWritePos; if (leftcount > count) // 有足夠的剩餘空間存放 { memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else // 剩餘空間不足 { // 先填充滿剩餘空間,再回頭找空間存放 std::atomic_int tmp; int leftcount2=0; memcpy(m_pBuf + test, buf, leftcount); tmp = count - leftcount; leftcount2 = (m_nReadPos > tmp) ? tmp : m_nReadPos; memcpy(m_pBuf, buf + leftcount, leftcount2); //m_mutex.lock(); m_nWritePos = leftcount2; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount + m_nWritePos; } } else { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑← leftcount →↑ | m_nWritePos m_nReadPos */ int leftcount = m_nReadPos - m_nWritePos; if (leftcount > count) { // 有足夠的剩餘空間存放 memcpy(m_pBuf + m_nWritePos, buf, count); //m_mutex.lock(); m_nWritePos += count; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return count; } else { // 剩餘空間不足時要丟棄後面的資料 memcpy(m_pBuf + m_nWritePos, buf, leftcount); //m_mutex.lock(); m_nWritePos += leftcount; m_bFull = (this->m_nReadPos == m_nWritePos); //m_mutex.unlock(); if(m_bEmpty) cv.notify_all(); m_bEmpty = false; return leftcount; } } } /************************************************************************/ /* 從緩衝區讀資料,返回實際讀取的位元組數 */ /************************************************************************/ int CCycleBuffer::read(char* buf, int count) { std::atomic_int m_nWritePos; m_nWritePos = (int)this->m_nWritePos; if (count <= 0) return 0; //m_bFull = false; if (m_bEmpty) // 緩衝區空,不能繼續讀取資料 { cv.wait(*lck); //return 0; } else if (m_nReadPos == m_nWritePos) // 緩衝區滿時 { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (data) →| |xxxxxxxxxxxx||xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx| | ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else { std::atomic_int tmp; int leftcount2; memcpy(buf, m_pBuf + m_nReadPos, leftcount); tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } else if (m_nReadPos < m_nWritePos) // 寫指標在前(未讀資料是連線的) { /* == 記憶體模型 == |← m_nBufSize →| |← (empty)→ ← (data) → ← (empty) →| |------------||xxxxxxxxxxxxxxx||---------------| ↑← leftcount →↑ | m_nReadPos m_nWritePos */ int leftcount = m_nWritePos - m_nReadPos; int c = (leftcount > count) ? count : leftcount; memcpy(buf, m_pBuf + m_nReadPos, c); //m_mutex.lock(); m_nReadPos += c; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return c; } else // 讀指標在前(未讀資料可能是不連線的) { /* == 記憶體模型 == |← m_nBufSize →| |← (data) → ← (empty) → ← (data) →| |xxxxxxxxxxxx||---------------||xxxxxxxxxxxxxxx| | ↑ ↑← leftcount →| m_nWritePos m_nReadPos */ int leftcount = m_nBufSize - m_nReadPos; if (leftcount > count) // 未讀緩衝區夠大,直接讀取資料 { memcpy(buf, m_pBuf + m_nReadPos, count); //m_mutex.lock(); m_nReadPos += count; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return count; } else // 未讀緩衝區不足,需回到緩衝區頭開始讀 { memcpy(buf, m_pBuf + m_nReadPos, leftcount); std::atomic_int tmp; int leftcount2; tmp = count - leftcount; leftcount2 = (m_nWritePos > tmp) ? tmp : m_nWritePos; memcpy(buf + leftcount, m_pBuf, leftcount2); //m_mutex.lock(); m_nReadPos = leftcount2; m_bEmpty = (m_nReadPos == this->m_nWritePos); //m_mutex.unlock(); m_bFull = false; return leftcount + m_nReadPos; } } } /************************************************************************/ /* 獲取緩衝區有效資料長度 */ /************************************************************************/ int CCycleBuffer::getReadableLength() { if (m_bEmpty) { return 0; } else if (m_bFull) { return m_nBufSize; } else if (m_nReadPos < m_nWritePos) { return m_nWritePos - m_nReadPos; } else { return m_nBufSize - m_nReadPos + m_nWritePos; } } int CCycleBuffer::getWriteableLength() { return m_nBufSize-getReadableLength(); } void CCycleBuffer::empty() { m_nReadPos = 0; m_nWritePos = 0; m_bEmpty = true; m_bFull = false; } bool CCycleBuffer::isEmpty() { return m_bEmpty; } bool CCycleBuffer::isFull() { return m_bFull; }
#include <iostream> // std::cout #include <thread> // std::thread #include <mutex> // std::mutex, std::unique_lock #include <condition_variable> // std::condition_variable #include <chrono> #include "CCycleBuffer.h" //#define TESTTHREAD #define TESTRINGBUF #ifdef TESTTHREAD std::mutex mtx; // 全域性互斥鎖. std::condition_variable cv; // 全域性條件變數. bool ready = false; // 全域性標誌位. #endif CCycleBuffer rbuf(10); char buf[] = "123456"; char buf4r[6] = { 0 }; int main() { std::cout<<"begin main..."<<std::endl; #ifdef TESTTHREAD std::thread* threads[10]; // spawn 10 threads: for (int i = 0; i < 10; ++i) { threads[i] = new std::thread([](int id){ std::unique_lock <std::mutex> lck(mtx); //while (!ready) // 如果標誌位不為 true, 則等待... cv.wait(lck); // 當前執行緒被阻塞, 當全域性標誌位變為 true 之後, // 執行緒被喚醒, 繼續往下執行列印執行緒編號id. std::cout << "thread " << id << '\n'; },i); } std::cout << "10 threads ready to race...\n"; std::unique_lock <std::mutex> lck(mtx); ready = true; // 設定全域性標誌位為 true. cv.notify_all(); // 喚醒所有執行緒. for (auto & th : threads) th->join(); #endif #ifdef TESTRINGBUF int ret; std::thread* threads[2]; //bzero(buf4r,sizeof(buf4r)); //ret = rbuf.getWriteableLength(); //ret = rbuf.write(buf,4); //ret = rbuf.read(buf4r,2); threads[0] = new std::thread([]() { std::cout<<"begin write thread ..."<<std::endl; while (true) { if (5 < rbuf.getWriteableLength()) { rbuf.write(buf, 5); } std::this_thread::sleep_for(std::chrono::seconds(2)); } }); threads[1] = new std::thread([]() { int ret; static int cnt = 0; bool noData; std::cout<<"begin read thread ..."<<std::endl; while (true) { ret = rbuf.read(buf4r, 1); if (1 == ret) { noData = false; //std::cout<<"read thread ..."<<std::endl; std::cout << "Read:["<<buf4r[0]<<"]"<<std::endl; if (0 == ++cnt%5) { //std::cout << std::endl; } //std::cout << "xxx" << std::endl; } else { //if (noData == false) { //noData = true; std::cout << " read nothing " << std::endl; } } } }); #endif // TESTRINGBUF for (auto &th: threads) { th->join(); } //getchar(); return 0; }