1. 程式人生 > >使用無鎖隊列(環形緩沖區)註意事項

使用無鎖隊列(環形緩沖區)註意事項

size sign until read 定義 log 消費 ucc student

環形緩沖區是生產者和消費者模型中常用的數據結構。生產者將數據放入數組的尾端,而消費者從數組的另一端移走數據,當達到數組的尾部時,生產者繞回到數組的頭部。如果只有一個生產者和一個消費者,那麽就可以做到免鎖訪問環形緩沖區(Ring Buffer)。寫入索引只允許生產者訪問並修改,只要寫入者在更新索引之前將新的值保存到緩沖區中,則讀者將始終看到一致的數據結構。同理,讀取索引也只允許消費者訪問並修改。

環形緩沖區實現原理圖

技術分享圖片

如圖所示,當讀者和寫者指針相等時,表明緩沖區是空的,而只要寫入指針在讀取指針後面時,表明緩沖區已滿。

清單 9. 2.6.10 環形緩沖區實現代碼

技術分享圖片
 /*

 * __kfifo_put - puts some data into the FIFO, no locking version

 * Note that with only one concurrent reader and one concurrent

 * writer, you don‘t need extra locking to use these functions.

 */

 unsigned int __kfifo_put(struct kfifo *fifo,

       unsigned char *buffer, unsigned int len)

 {

  unsigned int l;

  len = min(len, fifo->size - fifo->in + fifo->out);

  /* first put the data starting from fifo->in to buffer end */

  l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

  memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

  /* then put the rest (if any) at the beginning of the buffer */

  memcpy(fifo->buffer, buffer + l, len - l);

  fifo->in += len;

  return len;

 }

 

 /*

 * __kfifo_get - gets some data from the FIFO, no locking version

 * Note that with only one concurrent reader and one concurrent

 * writer, you don‘t need extra locking to use these functions.

 */

 unsigned int __kfifo_get(struct kfifo *fifo,

     unsigned char *buffer, unsigned int len)

 {

  unsigned int l;

  len = min(len, fifo->in - fifo->out);

  /* first get the data from fifo->out until the end of the buffer */

  l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));

  memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

  /* then get the rest (if any) from the beginning of the buffer */

  memcpy(buffer + l, fifo->buffer, len - l);

  fifo->out += len;

  return len;

 }
技術分享圖片

需要註意的是

使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)時,如果返回參數與傳入參數len不相等時,則操作失敗

我們定義一個

//註意student_info 共17字節 按照內存排列占24字節

typedef struct student_info

{

uint64_t stu_id; //8個字節

uint32_t age; //4字節

uint32_t score;//4字節

char sex;//1字節

}student_info;

我們建立一個環形緩沖區,裏面只有64字節大小(雖然我們實際使用時大小遠大於此),向裏面多次存入

24字節student_info,看有什麽反應

技術分享圖片
//打印學生信息

void print_student_info(const student_info *stu_info)

{

    assert(stu_info);

    printf("id:%lu\t",stu_info->stu_id);

    printf("age:%u\t",stu_info->age);

     printf("sex:%d\t",stu_info->sex);

    printf("score:%u\n",stu_info->score);

}

 

student_info * get_student_info(time_t timer)

{

    student_info *stu_info = (student_info *)malloc(sizeof(student_info));

    srand(timer);

    stu_info->stu_id = 10000 + rand() % 9999;

    stu_info->age = rand() % 30;

    stu_info->score = rand() % 101;

     stu_info->sex=rand() % 2;

    print_student_info(stu_info);

    return stu_info;

}

void print_ring_buffer_len(struct ring_buffer *ring_buf)

{

     //用於打印緩沖區長度

     uint32_t ring_buf_len = 0;

     //取得已經使用緩沖區長度 size-ring_buf_len為未使用緩沖區的長度

     ring_buf_len=ring_buffer_len(ring_buf);

     printf("no use ring_buf_len:%d\n",(ring_buf->size-ring_buf_len));

}

int main(int argc, char *argv[])

{

    uint32_t size = 0;

     //用於判斷存儲或者取得數據的字節數

     uint32_t oklen = 0;

    struct ring_buffer *ring_buf = NULL;

    //64字節

    size=BUFFER_SIZE;

    ring_buf = ring_buffer_alloc(size);

     printf("input student\n");

     {

         student_info *stu_info;

         student_info stu_temp;

         uint32_t student_len=sizeof(student_info);

         printf("ring_buf_len:%d\n",ring_buf->size);

         printf("student_len:%d\n",student_len);

         //此時環形緩沖區沒有數據我們去取數據當然為空

         memset(&stu_temp,0,student_len);

         oklen=ring_buffer_get(ring_buf, (void *)(&stu_temp), student_len);

         if(oklen==student_len)

         {

            printf("get student data\n");

         }

         else

         {

            printf("no student data\n");

         }

         printf("\n");

        //第一次調用時用字節結束後還有64-24 =40字節

         stu_info = get_student_info(976686458);

         oklen = ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("1 put student data success\n");

         }

         else

         {

            printf("1 put student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("\n");

         //第二次調用時用字節結束後還有64-48 =16字節

         stu_info = get_student_info(976686464);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("2 put student data success\n");

         }

         else

         {

            printf("2 put student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("\n");

         //第三次調用時需要用字節但只有字節失敗

         //把字節都寫滿了

        //驗證了在調用__kfifo_put函數或者__kfifo_get函數時,如果返回參數與傳入參數len不相等時,則操作失敗

         stu_info = get_student_info(976686445);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("3 put student data success\n");

         }

         else

         {

            printf("3 put student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

       

         printf("\n");

         //第四次調用時需要用字節但無字節

         ////驗證了在調用__kfifo_put函數或者__kfifo_get函數時,如果返回參數與傳入參數len不相等時,則操作失敗

         stu_info = get_student_info(976686421);

         oklen= ring_buffer_put(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

           printf("4 put student data success\n");

         }

         else

         {

            printf("4 put student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("\n");

         //現在開始取學生數據裏面保存了個學生數據我們取三次看效果

         printf("output student\n");

 

         printf("\n");

         //第一次取得數據並打印

         memset(stu_info,0,student_len);

          oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("1 get student data success\n");

         }

         else

         {

            printf("1 get student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("\n");

         ////第二次取得數據並打印

         memset(stu_info,0,student_len);

          oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("2 get student data success\n");

         }

         else

         {

            printf("2 get student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

         printf("\n");

        //第三次取得數據失敗

         memset(stu_info,0,student_len);

         oklen=ring_buffer_get(ring_buf, (void *)stu_info, student_len);

         if(oklen==student_len)

         {

            print_student_info(stu_info);

           printf("3 get student data success\n");

         }

         else

         {

             printf("3 get student data failure\n");

         }

         print_ring_buffer_len(ring_buf);

 

     }

 

     return 1;

}
技術分享圖片

技術分享圖片

結論:在使用ring_buffer_get(kfifo_get)或者ring_buffer_put(kfifo_put)時,如果返回參數與傳入參數len不相等時,則操作失敗。代碼下載:tessc.rar(http://files.cnblogs.com/dragonsuc/tessc.rar)

需要註意的地方:

1.只有一個線程負責讀,另一個線程負責寫的時候,數據是線程安全的。上面的實現是基於這個原理實現的,當有多個線程讀或者多個線程寫的時候,不保證數據的正確性。
所以使用的時候,一個線程寫,一個線程讀。網絡應用中比較常用,就是開一個線程接口數據,然後把數據寫入隊列。然後開一個調度線程讀取網絡數據,然後分發到處理線程。

2.數據長度默認宏定義了一個長度,超過這個長度的時候,後續的數據會寫入失敗。

本文參考文章:

http://blog.csdn.net/mergerly/article/details/39009473

http://www.cnblogs.com/Anker/p/3481373.html

使用無鎖隊列(環形緩沖區)註意事項