1. 程式人生 > >Qt實現環形緩衝區的兩種方法

Qt實現環形緩衝區的兩種方法

一個環形buffer,在尾部追加資料,從頭部讀取資料,適合用作IO的緩衝區。

一.使用QList和QByteArray

這個方法參考的是Qt原始碼中的QRingBuffer類,這個類不是Qt API的一部分,所以Qt助手裡是查不到的,它的存在只是為了服務其他的原始碼。

QRingBuffer的原始檔在D:\Qt\Qt5.7.0\5.7\Src\qtbase\src\corelib\tools目錄中,由qringbuffer_p.h和qringbuffer.cpp實現。

QRingBuffer實現的環形緩衝區大概如下圖所示。

qringbuffer.h

#ifndef QRINGBUFFER_P_H
#define QRINGBUFFER_P_H

#include <QByteArray>
#include <QList>

#ifndef QRINGBUFFER_CHUNKSIZE
#define QRINGBUFFER_CHUNKSIZE 4096
#endif
enum
{
    //1G-1位元組
    MaxAllocSize = (1 << (std::numeric_limits<int>::digits - 1)) - 1
};

enum
{
    //1G-1-16位元組
    MaxByteArraySize = MaxAllocSize - sizeof(QtPrivate::remove_pointer<QByteArray::DataPtr>::type)
};


class QRingBuffer
{
public:
    //預設分配QRINGBUFFER_CHUNKSIZE大小的buffer
    QRingBuffer(int growth = QRINGBUFFER_CHUNKSIZE) :
        head(0), tail(0), tailBuffer(0), basicBlockSize(growth), bufferSize(0) { }
    ~QRingBuffer(){}
    //獲取環形緩衝區指定位置的指標
    //length,輸出這個指定位置到緩衝區結尾的長度
    char *readPointerAtPosition(qint64 pos, qint64 &length);
    //申請空間:從尾開始,返回新空間的指標
    char *reserve(qint64 bytes);
    //申請空間:從頭開始,返回新空間的指標
    char *reserveFront(qint64 bytes);
    //縮短空間
    void truncate(qint64 pos)
    {
        if (pos < bufferSize)
            chop(bufferSize - pos);
    }
    //判斷buffers資料是否為空
    bool isEmpty()
    {
        return bufferSize == 0;
    }
    //從頭讀取一個字元,並轉換為int返回
    int getChar()
    {
        if (isEmpty())
            return -1;
        char c = *readPointer();
        free(1);
        return int(uchar(c));
    }
    //在緩衝區尾部新增字元
    void putChar(char c)
    {
        char *ptr = reserve(1);
        *ptr = c;
    }
    //在緩衝區頭部新增字元
    void ungetChar(char c)
    {
        if (head > 0) {
            --head;
            buffers.first()[head] = c;
            ++bufferSize;
        } else {
            char *ptr = reserveFront(1);
            *ptr = c;
        }
    }
    //清空緩衝區
    void clear();
    //讀取maxLength長度資料到data中,如果buffers中的資料少於maxLength,則讀取所有資料,
    //返回讀取資料的長度
    qint64 read(char *data, qint64 maxLength);
    //讀取buffers中的第一個buffer
    QByteArray read();
    //從指定位置pos拷貝maxLength長度的資料到data中
    //返回實際擷取的資料長度
    qint64 peek(char *data, qint64 maxLength, qint64 pos = 0);
    //擴充套件最後一個buffer
    void append(const char *data, qint64 size);
    //在最後新增一個新buffer
    void append(const QByteArray &qba);
    //從頭釋放lenght長度空間,一般需要配合reserve使用
    qint64 skip(qint64 length)
    {
        qint64 bytesToSkip = qMin(length, bufferSize);

        free(bytesToSkip);
        return bytesToSkip;
    }
    //從尾釋放length長度空間,一般需要配合reserve使用
    void chop(qint64 length);
    //讀取一行,包括該行的結束標誌'\n'
    qint64 readLine(char *data, qint64 maxLength);

    bool canReadLine()
    {
        return indexOf('\n', bufferSize) >= 0;
    }
private:
    //獲取下一個資料塊的大小
    //如果只剩一個buffer,返回最後一個buffer所含資料的大小;否則返回第一個buffer所含資料的大小。
    qint64 nextDataBlockSize()
    {
        return (tailBuffer == 0 ? tail : buffers.first().size()) - head;
    }
    //獲取緩衝區第一個有效資料的指標
    char *readPointer()
    {
        return bufferSize == 0 ? Q_NULLPTR : (buffers.first().data() + head);
    }
    qint64 indexOf(char c, qint64 maxLength, qint64 pos = 0);
    //釋放空間
    void free(qint64 bytes);
private:
    QList<QByteArray> buffers;
    //標識第一個buffer資料起始位置和最後一個buffer資料的結尾位置
    int head, tail;
    //大小為buffers.size()-1,如果為0,說明只剩一個buffer
    int tailBuffer;
    //初始分配空間的大小
    int basicBlockSize;
    //buffers資料總大小
    qint64 bufferSize;
};

#endif // QRINGBUFFER_P_H
qringbuffer.cpp
#include "qringbuffer.h"
#include <string.h>

char *QRingBuffer::readPointerAtPosition(qint64 pos, qint64 &length)
{
    if (pos >= 0)
    {
        pos += head;
        for (int i = 0; i < buffers.size(); ++i)
        {
            length = (i == tailBuffer ? tail : buffers[i].size());
            if (length > pos)
            {
                length -= pos;
                return buffers[i].data() + pos;
            }
            pos -= length;
        }
    }

    length = 0;
    return 0;
}

void QRingBuffer::free(qint64 bytes)
{
    Q_ASSERT(bytes <= bufferSize);

    while (bytes > 0)
    {
        const qint64 blockSize = buffers.first().size() - head;
        if (tailBuffer == 0 || blockSize > bytes)
        {
            if (bufferSize <= bytes)
            {
                if (buffers.first().size() <= basicBlockSize)
                {
                    bufferSize = 0;
                    head = tail = 0;
                } else
                {
                    clear();
                }
            }
            else
            {
                Q_ASSERT(bytes < MaxByteArraySize);
                head += int(bytes);
                bufferSize -= bytes;
            }
            return;
        }
        bufferSize -= blockSize;
        bytes -= blockSize;
        buffers.removeFirst();
        --tailBuffer;
        head = 0;
    }
}

char *QRingBuffer::reserve(qint64 bytes)
{
    if (bytes <= 0 || bytes >= MaxByteArraySize)
        return 0;

    if (buffers.isEmpty())
    {
        buffers.append(QByteArray());
        buffers.first().resize(qMax(basicBlockSize, int(bytes)));
    }
    else
    {
        const qint64 newSize = bytes + tail;
        //如果超過最後一個buffer所含資料的大小,則最後一個buffer需要從新分配
        if (newSize > buffers.last().size())
        {
            //滿足以下條件時,將最後一個buffer的容積縮小到其當前所含資料的大小,
            //然後新開闢一個buffer,並將該buffer資料的結尾位置tail設定為0
            if (newSize > buffers.last().capacity() && (tail >= basicBlockSize
                    || newSize >= MaxByteArraySize))
            {
                buffers.last().resize(tail);
                buffers.append(QByteArray());
                ++tailBuffer;
                tail = 0;
            }
            //將最後一個buffer進行擴容
            buffers.last().resize(qMax(basicBlockSize, tail + int(bytes)));
        }
    }
    char *writePtr = buffers.last().data() + tail;
    bufferSize += bytes;
    Q_ASSERT(bytes < MaxByteArraySize);
    tail += int(bytes);
    return writePtr;
}

char *QRingBuffer::reserveFront(qint64 bytes)
{
    if (bytes <= 0 || bytes >= MaxByteArraySize)
        return 0;

    if (head < bytes)
    {
        if (buffers.isEmpty())
        {
            buffers.append(QByteArray());
        }
        else
        {
            buffers.first().remove(0, head);
            if (tailBuffer == 0)
                tail -= head;
        }

        head = qMax(basicBlockSize, int(bytes));
        if (bufferSize == 0)
        {
            tail = head;
        }
        else
        {
            buffers.prepend(QByteArray());
            ++tailBuffer;
        }
        buffers.first().resize(head);
    }

    head -= int(bytes);
    bufferSize += bytes;
    return buffers.first().data() + head;
}

void QRingBuffer::chop(qint64 length)
{
    Q_ASSERT(length <= bufferSize);

    while (length > 0)
    {
        if (tailBuffer == 0 || tail > length)
        {
            if (bufferSize <= length)
            {
                if (buffers.first().size() <= basicBlockSize)
                {
                    bufferSize = 0;
                    head = tail = 0;
                }
                else
                {
                    clear();
                }
            }
            else
            {
                Q_ASSERT(length < MaxByteArraySize);
                tail -= int(length);
                bufferSize -= length;
            }
            return;
        }

        bufferSize -= tail;
        length -= tail;
        buffers.removeLast();
        --tailBuffer;
        tail = buffers.last().size();
    }
}

void QRingBuffer::clear()
{
    if (buffers.isEmpty())
        return;

    buffers.erase(buffers.begin() + 1, buffers.end());
    buffers.first().clear();

    head = tail = 0;
    tailBuffer = 0;
    bufferSize = 0;
}

qint64 QRingBuffer::indexOf(char c, qint64 maxLength, qint64 pos)
{
    if (maxLength <= 0 || pos < 0)
        return -1;

    qint64 index = -(pos + head);
    for (int i = 0; i < buffers.size(); ++i)
    {
        qint64 nextBlockIndex = qMin(index + (i == tailBuffer ? tail : buffers[i].size()),
                                           maxLength);

        if (nextBlockIndex > 0)
        {
            const char *ptr = buffers[i].data();
            if (index < 0)
            {
                ptr -= index;
                index = 0;
            }

            const char *findPtr = reinterpret_cast<const char *>(memchr(ptr, c,
                                                                 nextBlockIndex - index));
            if (findPtr)
                return qint64(findPtr - ptr) + index + pos;

            if (nextBlockIndex == maxLength)
                return -1;
        }
        index = nextBlockIndex;
    }
    return -1;
}

qint64 QRingBuffer::read(char *data, qint64 maxLength)
{
    const qint64 bytesToRead = qMin(bufferSize, maxLength);
    qint64 readSoFar = 0;
    while (readSoFar < bytesToRead)
    {
        const qint64 bytesToReadFromThisBlock = qMin(bytesToRead - readSoFar,
                                                     nextDataBlockSize());
        if (data)
            memcpy(data + readSoFar, readPointer(), bytesToReadFromThisBlock);
        readSoFar += bytesToReadFromThisBlock;
        free(bytesToReadFromThisBlock);
    }
    return readSoFar;
}


QByteArray QRingBuffer::read()
{
    if (bufferSize == 0)
        return QByteArray();

    QByteArray qba(buffers.takeFirst());

    //避免調整大小時不必要的記憶體分配,使QByteArray更高效
    qba.reserve(0);
    if (tailBuffer == 0)
    {
        qba.resize(tail);
        tail = 0;
    } else
    {
        --tailBuffer;
    }
    qba.remove(0, head);
    head = 0;
    bufferSize -= qba.size();
    return qba;
}


qint64 QRingBuffer::peek(char *data, qint64 maxLength, qint64 pos)
{
    qint64 readSoFar = 0;

    if (pos >= 0)
    {
        pos += head;
        for (int i = 0; readSoFar < maxLength && i < buffers.size(); ++i)
        {
            qint64 blockLength = (i == tailBuffer ? tail : buffers[i].size());

            if (pos < blockLength)
            {
                blockLength = qMin(blockLength - pos, maxLength - readSoFar);
                memcpy(data + readSoFar, buffers[i].data() + pos, blockLength);
                readSoFar += blockLength;
                pos = 0;
            }
            else
            {
                pos -= blockLength;
            }
        }
    }

    return readSoFar;
}

void QRingBuffer::append(const char *data, qint64 size)
{
    char *writePointer = reserve(size);
    if (size == 1)
        *writePointer = *data;
    else if (size)
        ::memcpy(writePointer, data, size);
}

void QRingBuffer::append(const QByteArray &qba)
{
    if (tail == 0)
    {
        if (buffers.isEmpty())
            buffers.append(qba);
        else
            buffers.last() = qba;
    }
    else
    {
        buffers.last().resize(tail);
        buffers.append(qba);
        ++tailBuffer;
    }
    tail = qba.size();
    bufferSize += tail;
}

qint64 QRingBuffer::readLine(char *data, qint64 maxLength)
{
    if (!data || --maxLength <= 0)
        return -1;

    qint64 i = indexOf('\n', maxLength);
    i = read(data, i >= 0 ? (i+1) : maxLength);

    data[i] = '\0';
    return i;
}
main.cpp
#include <qringbuffer.h>
#include <QDebug>

int main()
{
    //測試環形緩衝區的寫入和讀取+++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試環形緩衝區的寫入和讀取+++++++++++++++++++++++++++++++");
    //方法1
    QRingBuffer ringBuffer;
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog ",5);
    QByteArray qba("http://blog.csdn.net/caoshangpa");
    ringBuffer.append(qba);
    QByteArray head=ringBuffer.read();
    QByteArray tail=ringBuffer.read();
    qDebug()<<head<<tail;
    //方法2
    ringBuffer.clear();
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog ",5);
    ringBuffer.append(qba);
    char str[100]={'\0'};
    ringBuffer.read(str,100);
    qDebug()<<str;
    //測試在緩衝區頭和尾新增字元+++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試在緩衝區頭和尾新增字元+++++++++++++++++++++++++++++++");
    ringBuffer.clear();
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog ",5);
    ringBuffer.append(qba);
    //頭
    ringBuffer.ungetChar('{');
    //尾
    ringBuffer.putChar('}');
    memset(str,0,100);
    ringBuffer.read(str,100);
    qDebug()<<str;
    //測試讀取一行+++++++++++++++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試讀取一行+++++++++++++++++++++++++++++++++++++++++++");
    ringBuffer.clear();
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog\n",5);
    ringBuffer.append(qba);
    memset(str,0,100);
    if(ringBuffer.canReadLine())
    ringBuffer.readLine(str,100);
    qDebug()<<str;
    //測試拷貝資料+++++++++++++++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試拷貝資料+++++++++++++++++++++++++++++++++++++++++++");
    ringBuffer.clear();
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog ",5);
    ringBuffer.append(qba);
    memset(str,0,100);
    ringBuffer.peek(str,10),
    qDebug()<<str;
    //證明peek只是拷貝資料
    memset(str,0,100);
    ringBuffer.read(str,100);
    qDebug()<<str;
    //測試釋放指定長度緩衝區+++++++++++++++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試釋放指定長度緩衝區+++++++++++++++++++++++++++++++++++++++++++");
    ringBuffer.clear();
    ringBuffer.append("CSDN ",5);
    ringBuffer.append("blog ",5);
    ringBuffer.append(qba);
    memset(str,0,100);
    ringBuffer.skip(10),
    ringBuffer.read(str,100);
    qDebug()<<str;
    //測試申請指定長度緩衝區+++++++++++++++++++++++++++++++++++++++++++
    qDebug()<<QStringLiteral("測試申請指定長度緩衝區+++++++++++++++++++++++++++++++++++++++++++");
    ringBuffer.clear();
    ringBuffer.reserve(100);
    qint64 lenToTail=0;
    char * pos=ringBuffer.readPointerAtPosition(10,lenToTail);
    char * test="CSDN blog http://blog.csdn.net/caoshangpa";
    memcpy(pos,test,strlen(test));
    ringBuffer.skip(10);
    ringBuffer.chop(100-10-strlen(test));
    qDebug()<<ringBuffer.read();
    qDebug()<<lenToTail;
    system("pause");
}
測試結果



二.使用QSemaphore

用定時器將當前時間儲存到QStringList物件中,然後通過現場去QStringList物件中取出並列印,通過兩個QSemaphore物件進行同步,形成環形緩衝區。

thread.h

#ifndef THREAD_H
#define THREAD_H

#include <QThread>
#include <QSemaphore>
#include <QStringList>

class Thread : public QThread
{
    Q_OBJECT
public:
    Thread();
    ~Thread();

protected:
    void run();

private slots:
   void slotTimeout();

};

#endif // THREAD_H
thread.cpp
#include "thread.h"
#include <QDebug>
#include <QTimer>
#include <QTime>
//1024是這個list的緩衝區大小,當然可以隨意設定;80的意思是初始空閒的緩衝區大小是1024
QSemaphore freeBytes(1024);
//初始被使用的緩衝區大小是0
QSemaphore usedBytes(0);
QStringList timeList;
Thread::Thread()
{
    QTimer *timer=new QTimer(this);
    connect(timer,SIGNAL(timeout()),this,SLOT(slotTimeout()));
    timer->start(50);
}

Thread::~Thread()
{

}

void Thread::run()
{
    //獲取並列印list中的時間
    while(true)
    {
        usedBytes.acquire();
        qDebug()<<timeList.takeFirst();
        freeBytes.release();
    }
}
//通過定時器給list存入當前時間
void Thread::slotTimeout()
{
    freeBytes.acquire();//先申請一個位置,相當於空閒的緩衝區變為1023
    timeList.append(QTime::currentTime().toString("now : hh:mm:ss:zzz"));
    usedBytes.release();//給使用的大小+1
}
main.cpp
#include <QCoreApplication>
#include "thread.h"
int main(int argc, char *argv[])
{
    QCoreApplication a(argc, argv);
    Thread *thread=new Thread;
    thread->start();
    return a.exec();
}
測試結果