1. 程式人生 > >dpdk rte_ring無鎖佇列 及 核心kfifo

dpdk rte_ring無鎖佇列 及 核心kfifo

kfifo是核心裡面的一個FIFO資料結構,採用環形迴圈佇列的資料結構來實現;它提供一個無邊界的位元組流服務,最重要的一點是,它使用並行無鎖程式設計技術,即當它用於只有一個入隊執行緒和一個出隊執行緒的場情時,兩個執行緒可以併發操作,而不需要任何加鎖行為,就可以保證kfifo的執行緒安全。
kfifo程式碼既然肩負著這麼多特性,那我們先一敝它的程式碼:
struct kfifo {
    unsigned char *buffer;    /* the buffer holding the data */
    unsigned int size;    /* the size of the allocated buffer */
    unsigned int in;    /* data is added at offset (in % size) */
    unsigned int out;    /* data is extracted from off. (out % size) */
    spinlock_t *lock;    /* protects concurrent modifications */ //緊用在多生產者或多消費者環境。
};
+--------------------------------------------------------------+
|            |<----------data---------->|                      |
+--------------------------------------------------------------+
             ^                          ^                      ^
             |                          |                      |
            out                        in                     size

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
    unsigned char *buffer;
    struct kfifo *ret;

    /*
     * round up to the next power of 2, since our 'let the indices
     * wrap' tachnique works only in this case.
     */
    if (size & (size - 1)) {
        BUG_ON(size > 0x80000000);
        size = roundup_pow_of_two(size);
    }

    buffer = kmalloc(size, gfp_mask);
    if (!buffer)
        return ERR_PTR(-ENOMEM);

    ret = kfifo_init(buffer, size, gfp_mask, lock);

    if (IS_ERR(ret))
        kfree(buffer);

    return ret;
}
unsigned int __kfifo_put(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
    /* 本次存放的資料量,當資料量大於剩餘空間時取len */
    len = min(len, fifo->size - fifo->in + fifo->out);

    smp_mb();
    /* 計算fifo->in後面還有多少可用空間 */
    l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

    memcpy(fifo->buffer, buffer + l, len - l);

    smp_wmb();

    fifo->in += len;

    return len;
}

unsigned int __kfifo_get(struct kfifo *fifo,
             unsigned char *buffer, unsigned int len)
{
    unsigned int l;
     /* 本次取的資料量,當len大於kinfo->buf資料量時時取len */
    len = min(len, fifo->in - fifo->out);

    smp_rmb();

   /* 計算fifo->out後面有多少資料可以取 */
    l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
    memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
    memcpy(buffer + l, fifo->buffer, len - l);

    smp_mb();

    fifo->out += len;

    return len;
}

保持一個原則:先存取資料然後增加in/out.
空閒的空間:fifo->size - fifo->in + fifo->out
已佔用的空間:fifo->in - fifo->out
佇列空:fifo->in == fifo->out
佇列滿:(fifo->in - fifo->out)&(fifo->size-1) == 0

smp_mp:記憶體屏障用來告訴CPU保順執行。