1. 程式人生 > >模仿Linux核心kfifo實現的迴圈快取

模仿Linux核心kfifo實現的迴圈快取

想實現個迴圈緩衝區(Circular Buffer),搜了些資料多數是基於迴圈佇列的實現方式。使用一個變數存放緩衝區中的資料長度或者空出來一個空間來判斷緩衝區是否滿了。偶然間看到分析Linux核心的迴圈緩衝佇列kfifo的實現,確實極其巧妙。kfifo主要有以下特點:

  • 保證緩衝空間的大小為2的次冪,不是的向上取整為2的次冪。
  • 使用無符號整數儲存輸入(in)和輸出(out)的位置,在輸入輸出時不對in和out的值進行模運算,而讓其自然溢位,並能夠保證in-out的結果為緩衝區中已存放的資料長度,這也是最能體現kfifo實現技巧的地方;
  • 使用記憶體屏障(Memory Barrier)技術,實現單消費者和單生產者對kfifo
    的無鎖併發訪問,多個消費者、生產者的併發訪問還是需要加鎖的。

本文主要以下三個部分:

  • 關於2的次冪問題,判斷是不是2的次冪以及向上取整為2的次冪
  • Linux核心中kfifo的實現及簡要分析
  • 根據kfifo實現的迴圈緩衝區,並進行一些測試

關於記憶體屏障的本文不作過多分析,可以參考WikiMemory Barrier。另外,本文所涉及的整數都預設為無符號整數,不再做一一說明。

1. 2的次冪

  • 判斷一個數是不是2的次冪
    kfifo要保證其快取空間的大小為2的次冪,如果不是則向上取整為2的次冪。其對於2的次冪的判斷方式也是很巧妙的。如果一個整數n是2的次冪,則二進位制模式必然是1000...
    ,而n-1的二進位制模式則是0111...,也就是說n和n-1的每個二進位制位都不相同,例如:8(1000)和7(0111);n不是2的次冪,則n和n-1的二進位制必然有相同的位都為1的情況,例如:7(0111)和6(0110)。這樣就可以根據 n & (n-1)的結果來判斷整數n是不是2的次冪,實現如下:
/*
    判斷n是否是2的冪
    若n為2的次冪,   則 n & (n-1) == 0,也就是n和n-1的各個位都不相同。例如 8(1000)和7(0111)
    若n不是2的次冪, 則 n & (n-1) != 0,也就是n和n-1的各個位肯定有相同的,例如7(0111)和6(0110)
*/
static inline bool is_power_of_2(uint32_t n) { return (n != 0 && ((n & (n - 1)) == 0)); }
  • 將數字向上取整為2的次冪
    如果設定的緩衝區大小不是2的次冪,則向上取整為2的次冪,例如:設定為5,則向上取為8。上面提到整數n是2的次冪,則其二進位制模式為100...,故如果正數k不是n的次冪,只需找到其最高的有效位1所在的位置(從1開始計數)pos,然後1 << pos即可將k向上取整為2的次冪。實現如下:
static inline uint32_t roundup_power_of_2(uint32_t a)
{
    if (a == 0)
        return 0;

    uint32_t position = 0;
    for (int i = a; i != 0; i >>= 1)
        position++;

    return static_cast<uint32_t>(1 << position);
}

2. Linux實現kfifo及分析

Linux核心中kfifo實現技巧,主要集中在放入資料的put方法和取資料的get方法。程式碼如下:

unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len)   
{   
    unsigned int l;   

    len = min(len, fifo->size - fifo->in + fifo->out);   

    /*  
     * Ensure that we sample the fifo->out index -before- we  
     * start putting bytes into the kfifo.  
     */   

    smp_mb();   

    /* first put the data starting from fifo->in to buffer end */   
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));   
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);   

    /* then put the rest (if any) at the beginning of the buffer */   
    memcpy(fifo->buffer, buffer + l, len - l);   

    /*  
     * Ensure that we add the bytes to the kfifo -before-  
     * we update the fifo->in index.  
     */   

    smp_wmb();   

    fifo->in += len;   

    return len;   
}  

unsigned int __kfifo_get(struct kfifo *fifo,unsigned char *buffer, unsigned int len)   
{   
    unsigned int l;   

    len = min(len, fifo->in - fifo->out);   

    /*  
     * Ensure that we sample the fifo->in index -before- we  
     * start removing bytes from the kfifo.  
     */   

    smp_rmb();   

    /* first get the data from fifo->out until the end of the buffer */   
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));   
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);   

    /* then get the rest (if any) from the beginning of the buffer */   
    memcpy(buffer + l, fifo->buffer, len - l);   

    /*  
     * Ensure that we remove the bytes from the kfifo -before-  
     * we update the fifo->out index.  
     */   

    smp_mb();   

    fifo->out += len;   

    return len;   
}   

put返回實際儲存到緩衝區中的資料長度,get返回的是實際取到的資料長度。在上面程式碼中,需要注意到在寫入、取出時候的兩次min運算。關於kfifo的分析,已有很多資料了,也可參考眉目傳情之匠心獨運的kfifo

Linux核心實現的kfifo的有以下特點:

  • 使用記憶體屏障 Memory Barrier
  • 初始化緩衝區空間時要保證緩衝區的大小為2的次冪
  • 使用無符號整數儲存in和out(輸入輸出的指標),並且在放入取出資料的時候不做模運算,讓其自然溢位。

優點:

  1. 實現單消費者和單生產者的無鎖併發訪問。多消費者和多生產者的時候還是需要加鎖的。
  2. 使用與運算in & (size-1)代替模運算
  3. 在更新in或者out的值時不做模運算,而是讓其自動溢位。這應該是kfifo實現最牛叉的地方了,利用溢位後的值參與運算,並且能夠保證結果的正確。溢位運算保證了以下幾點:

    • in - out為緩衝區中的資料長度
    • size - in + out 為緩衝區中空閒空間
    • in == out時緩衝區為空
    • size == (in - out)時緩衝區滿了

3. 模仿kfifo實現的迴圈緩衝

主要是模仿其無符號溢位的運算方法,並沒有利用記憶體屏障實現單生產者和單消費者的無鎖併發訪問。初始化及輸入輸出的程式碼如下:

struct kfifo{
    uint8_t *buffer;
    uint32_t in; // 輸入指標
    uint32_t out; // 輸出指標
    uint32_t size; // 緩衝區大小,必須為2的次冪

    kfifo(uint32_t _size)
    {
        if (!is_power_of_2(_size))
            _size = roundup_power_of_2(_size);

        buffer = new uint8_t[_size];
        in = 0;
        out = 0;
        size = _size;
    }

    // 返回實際寫入緩衝區中的資料
    uint32_t put(const uint8_t *data, uint32_t len)
    {
        // 當前緩衝區空閒空間
        len = min(len,size - in + out);

        // 當前in位置到buffer末尾的長度
        auto l = min(len, size - (in  & (size - 1)));

        // 首先複製資料到[in,buffer的末尾]
        memcpy(buffer + (in & (size - 1)), data, l);

        // 複製剩餘的資料(如果有)到[buffer的起始位置,...]
        memcpy(buffer, data + l, len - l);

        in += len; // 直接加,不作模運算。當溢位時,從buffer的開始位置重新開始

        return len;
    }

    // 返回實際讀取的資料長度
    uint32_t get(uint8_t *data, uint32_t len)
    {
        // 緩衝區中的資料長度
        len = min(len, in - out);

        // 首先從[out,buffer end]讀取資料
        auto l = min(len, size - (out & (size - 1)));
        memcpy(data, buffer + (out & (size - 1)), l);

        // 從[buffer start,...]讀取資料
        memcpy(data + l, buffer, len - l);

        out += len; // 直接加,不錯模運算。溢位後,從buffer的起始位置重新開始

        return len;
    }

在初始化緩衝空間的時候要驗證size是否為2的次冪,如果不是則向上取整為2的次冪。下面著重分析下在放入取出資料時對指標inout的處理,以及在溢位後怎麼能夠保證in - out仍然為緩衝區中的已有的資料長度。

put和get方法詳解

在向緩衝區中put資料的時候,需要兩個引數:要put的資料指標data和期望能夠put的資料長度len,返回值是實際存放到緩衝區中的資料長度(當緩衝區中空間不足時該值小於len)。下面詳細的解釋下put中每個語句的作用。

  • put函式中的第一句是len = min(len,size - in + out)計算實際向緩衝區中寫入資料的大小。如果想要寫入的資料len大於緩衝區中的空閒空間size - in + out,則只填充滿緩衝空間。

因為是迴圈緩衝區,所以其空閒空間有兩部分:從in到緩衝空間的末尾->[in,buffer end]和緩衝空間的起始位置到out->[buffer start,out]。

  • auto l = min(len, size - (in & (size - 1))); 這個是判斷[in,buffer end]這部分空間是否足夠寫入資料
  • memcpy(buffer + (in & (size - 1)), data, l); 向[in,buffer end]這部分空間寫入資料
  • memcpy(buffer, data + l, len - l); 如果資料還沒有寫完,則向[buffer start,out]這部分空間寫入資料。
  • in += len 更新in,不做模運算,讓其自然溢位。

get和put很類似,首先判斷是否有足夠的資料取出;在取資料時首先從out取到buffer的末尾,如果不夠則從buffer的開始位置取;最後更新out時也是不做模運算,讓其溢位。看參看上面put的語句解釋,這裡就不再多說。

無符號溢位運算

kfifo之所以如次的簡潔,很大一部分要歸功於其in和out的溢位運算。這裡就解釋下在溢位的情況下,如何保證in - out仍然為緩衝區中的資料長度。首先來看圖:


  • 緩衝區為空
    這裡寫圖片描述
  • put 一堆資料後
    這裡寫圖片描述
  • get 一堆資料後
    這裡寫圖片描述
  • put的資料長度超過in到buffer末尾的長度,有一部分從put到buffer的起始位置
    這裡寫圖片描述

以上圖片引用自linux核心資料結構之kfifo,其對kfifo的分析也很詳細。
前三種情況下從圖中可以很清晰的看出in - out為緩衝區中的已有的資料長度,但是最後一種發現in跑到了out的前面,這時候in - out不是應該為負的麼,怎麼能是資料長度?這正是kfifo的高明之處,in和out都是無符號整數,那麼在in < out 時in - out就是負數,把這個負數當作無符號來看時,其值仍然是緩衝區中的資料長度。這和in累加到溢位的情況基本一致,這裡放在一起說。

這裡使用8位無符號整數來儲存in和out,方便溢位。這裡假設out = 100,in = 255,size = 256,如下圖

/*
    --------------------------------------
    |             |                  |   |
    --------------------------------------
                out = 100           in = 250
    這時緩衝區中已有的資料為:in - out = 150,空閒空間為:size - (in - out) = 106

    向緩衝區中put10個數據後
    --------------------------------------
    |    |       |                       |
    --------------------------------------
        in      out
    這時候 in + 10 = 260 溢位變為in = 4;這是 in - out = 4 - 100 = -96,仍然溢位-96十六進位制為`0xA0`,將其直接轉換為有符號數`0xA0 = 160`,在沒put之前的資料為150,put10個後,緩衝區中的資料剛好為160,剛好為溢位計算結果。
*/

進行上述運算的前提是,size必須為2的次冪。假如size = 257,則上述的執行就不會成功。

測試例項

上面描述都是基於運算推導的,下面據結合本文中的程式碼進行下驗證。
測試程式碼如下:設定空間大小為128,in和out為8位無符號整數

int main()
{
    uint8_t output[512] = { 0 };
    uint8_t data[256] = { 0 };
    for (int i = 0; i < 256; i++)
        data[i] = i;

    kfifo fifo(128);
    fifo.put(data, 100);

    fifo.get(output, 50);

    fifo.put(data, 30);

    auto c = fifo.put(data + 10, 92);

    cout << "Empty:" << fifo.isEmpty() << endl;
    cout << "Left Space:" << fifo.left() << endl;
    cout << "Length:" << fifo.length() << endl;
    uint8_t a = fifo.size - fifo.in + fifo.out;
    uint8_t b = fifo.in - fifo.out;

    cout << "=======================================" << endl;
    fifo.get(output, 128);
    cout << "Empty:" << fifo.isEmpty() << endl;
    cout << "Left Space:" << fifo.left() << endl;
    cout << "Length:" << fifo.length() << endl;

    cout << "======================================" << endl;
    fifo.put(output, 100);
    cout << "Empty:" << fifo.isEmpty() << endl;
    auto d = static_cast<uint8_t>(fifo.left());
    auto e = static_cast<uint8_t>(fifo.length());
    printf("Left Space:%d\n", d); 
    printf("Length:%d\n", e);

    getchar();
    return 0;
}

執行結果:
這裡寫圖片描述

  • 第一個輸出是將緩衝區填滿的狀態
  • 第二個輸出是將緩衝區取空的狀態
  • 第三個是in溢位的情況,具體來看看:
    在第二個輸出將緩衝區取空的時候,in = out = 178。接著,向緩衝區put了100個數據,這時候in += 100會溢位,溢位後in = 22。看輸出結果:put前緩衝區為空,put100個數據後,緩衝區的空閒空間為28,資料長度為100,是正確的。