linux核心分析--核心中的資料結構之佇列(二)
核心中的佇列是以位元組形式儲存資料的,所以獲取資料的時候,需要知道資料的大小。
如果從佇列中取得資料時指定的大小不對的話,取得資料會不完整或過大。
核心中關於佇列定義的標頭檔案位於:<linux/kfifo.h> include/linux/kfifo.h
標頭檔案中定義的函式的實現位於:kernel/kfifo.c
核心佇列程式設計需要注意的是:
佇列的size在初始化時,始終設定為2的n次方
使用佇列之前將佇列結構體中的鎖(spinlock)釋放
1. kfifo概述
kfifo 是核心裡面的一個First In First Out資料結構,它採用環形迴圈佇列的資料結構來實現;它提供一個無邊界的位元組流服務,最重要的一點是,它使用並行無鎖程式設計技術,即當它用於只有一個入隊執行緒和一個出隊執行緒的場情時,兩個執行緒可以併發操作,而不需要任何加鎖行為,就可以保證kfifo的執行緒安全。
struct kfifo { unsignedchar *buffer; /* the buffer holdingthe data */ unsignedint size; /* the size of the allocatedbuffer */ unsignedint in; /* data is added at offset (in% size) */ unsignedint out; /* data is extracted fromoff. (out % size) */ spinlock_t*lock; /* protects concurrentmodifications */ };
這是kfifo的資料結構,kfifo主要提供了兩個操作,__kfifo_put(入隊操作)和__kfifo_get(出隊操作)。 它的各個資料成員如下:
buffer, 用於存放資料的快取
size, buffer空間的大小,在初化時,將它向上擴充套件成2的冪
lock, 如果使用不能保證任何時間最多隻有一個讀執行緒和寫執行緒,需要使用該lock實施同步。
in, out, 和buffer一起構成一個迴圈佇列。 in指向buffer中隊尾,而且out指向buffer中的隊頭,in是資料被開始存放的偏移位置,out是佇列開始取出資料的偏移位置。它的結構如示圖如下:
+--------------------------------------------------------------+
| |<----------data---------->| |
+--------------------------------------------------------------+
^ ^
| |
out in
在put和get函式中堆in,out做了很巧妙的處理!
2. kfifo_alloc 分配kfifo記憶體和初始化工作
struct kfifo*kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsignedchar *buffer;
structkfifo *ret;
/*
* round upto the next power of 2, since our 'let the indices
* wrap'tachnique works only in this case.
*/
if (size& (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}
buffer =kmalloc(size, gfp_mask);
if(!buffer)
returnERR_PTR(-ENOMEM);
ret =kfifo_init(buffer, size, gfp_mask, lock);
if (IS_ERR(ret))
kfree(buffer);
returnret;
}
這裡值得一提的是,kfifo->size的值總是在呼叫者傳進來的size引數的基礎上向2的冪擴充套件,這是核心一貫的做法。這樣的好處不言而喻--對kfifo->size取模運算可以轉化為與運算,如下:
kfifo->in % kfifo->size 可以轉化為 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函式中,使用size & (size – 1)來判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然後呼叫roundup_pow_of_two將之向上擴充套件為2的冪。在kfifo_alloc函 數中看到呼叫了kfifo_init函式:
/**
30 * kfifo_init - allocates a new FIFO using apreallocated buffer
31 * @buffer: the preallocated buffer to beused.
32 * @size: the size of the internal buffer,this have to be a power of 2.
33 * @gfp_mask: get_free_pages mask, passed tokmalloc()
34 * @lock: the lock to be used to protect thefifo buffer
35 *
36 * Do NOT pass the kfifo to kfifo_free() afteruse! Simply free the
37 * &struct kfifo with kfree().
38 */
39 struct kfifo*kfifo_init(unsigned char *buffer, unsigned int size,
40 gfp_t gfp_mask, spinlock_t *lock)
41 {
42 struct kfifo *fifo;
43
44 /* size must be a power of 2 */
45 BUG_ON(!is_power_of_2(size));
46
47 fifo = kmalloc(sizeof(struct kfifo),gfp_mask);
48 if (!fifo)
49 return ERR_PTR(-ENOMEM);
50
51 fifo->buffer = buffer;
52 fifo->size = size;
53 fifo->in = fifo->out = 0;
54 fifo->lock = lock;
55
56 return fifo;
57 }
3. __kfifo_put和__kfifo_get,巧妙的入隊和出隊操作,無鎖併發
__kfifo_put是入隊操作,它先將資料放入buffer裡面,最後才修改in引數;__kfifo_get是出隊操作,它先將資料從buffer中移走,最後才修改out。你會發現in和out兩者各司其職。
unsigned int__kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsignedint l;
len =min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensurethat we sample the fifo->out index -before- we
* startputting bytes into the kfifo.
*/
smp_mb();
/* firstput the data starting from fifo->in to buffer end */
l =min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)),buffer, l);
/* then putthe rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);
/*
* Ensurethat we add the bytes to the kfifo -before-
* weupdate the fifo->in index.
*/
smp_wmb();
fifo->in+= len; //每次累加,到達最大值後溢位,自動轉為0
returnlen;
}
unsigned int__kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsignedint l;
len =min(len, fifo->in - fifo->out);
/*
* Ensurethat we sample the fifo->in index -before- we
* startremoving bytes from the kfifo.
*/
smp_rmb();
/* firstget the data from fifo->out until the end of the buffer */
l =min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size -1)), l);
/* then getthe rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);
/*
* Ensurethat we remove the bytes from the kfifo -before-
* weupdate the fifo->out index.
*/
smp_mb();
fifo->out += len; //每次累加,到達最大值後溢位,自動轉為0
returnlen;
}
從上面的程式碼發現,put和get是非常精簡的,下面分情況討論函式的執行過程:
佇列的隊頭隊尾下標不受佇列長度的限制,就算隊頭下標大於佇列長度,也一樣可以使用,原理就在於,資料不是全部放在隊頭(fifo->out)和隊尾(fifo->in)之間的記憶體空間,而是把超出隊頭隊尾之間長度的資料放到整個佇列buffer的開始處
1、入隊操作
藍色部分為真實資料所在記憶體段,白色部分其實為邏輯上假定的資料所在地,也就是說,為了給使用者一種真正的佇列感覺——從尾部推進資料,從頭部拉取資料,那麼就必須讓fifo->out和fifo->in只能一直往一個方向推進,但是由於fifo所分配的buffer是有限的一段連續記憶體,fifo->out和fifo->in遲早要“越界”(這裡的越界是in和out大於size,in和out是整數,是相對於buffer的偏移量), 入佇列的操作,保證fifo->out和fifo->in是一直往右推進的。
如果fifo->in小於buffer->size,那麼先放完buffer->size-fifo->in這段記憶體空間,剩下的部分,轉移到buffer的可用空間開頭存放;如果fifo->in大於buffer->size,那麼直接把要入佇列的資料放到buffer可用空間開頭。
在put的函式中,有兩次取值min和兩次memcpy,
情況1、如果fifo->in小於fifo->size,並且in和out都沒有越界(都小於size)
那麼第一個min中的fifo->size-fifo->in+fifo->out是佇列空餘的空間大小(這裡要求所存放的字串長度小於佇列空餘大小)
第二個min中的fifo->in &(fifo->size - 1)是in相對於buffer的偏移位置
如果從in開始到佇列末尾便可存放字串,第一個min取len,第二個min取len,第一個memcpy是從buffer的in開始拷貝len個字元,第二個memcpy拷貝0個字元。
如果從in開始到佇列末尾不能存放所有的字元,第一個min取值為len,第二個min取值fifo->size- (fifo->in & (fifo->size - 1)),也就是從in到佇列末尾的大小;第一個memcpy是講佇列從in開始到佇列末尾填充滿,填充的長度也就是l,第二個memcpy是從佇列開始填充剩餘的len-l長度的字串。
in累加len的長度,in一直累加,這也是一個巧妙之處,是利用了unsigned int 的迴環。
情況2、如果fifo->in小於fifo->size, out沒有越界,in超過了size
如果in一旦超多size,那麼佇列的開頭一定存放了資料,並且佇列末尾沒有空餘,將資料存放到對壘開始的地方。
那麼第一個min中的fifo->size-fifo->in+fifo->out是佇列空餘的空間大小
第二個min中的fifo->in& (fifo->size - 1)是in相對於buffer的偏移位置
這個時候fifo->in > fifo->out,但是fifo->in & (fifo->size - 1) < fifo->out
此時需要從佇列的開始存放資料,第一個min取len,第二個min取len,因為此時fifo->size- (fifo->in & (fifo->size - 1))肯定小於fifo->size-fifo->in+fifo->out;第一個memcpy是從佇列開頭的in存放len的位元組,第二個memcpy不拷貝位元組。
情況3、如果fifo->in小於fifo->size, out沒有越界,in和out都超過了size
這種情況的複製情況和上面一樣,都是第一次直接拷貝結束。
分析,因為我們只考慮了放,沒有考慮取,在取的過程也是Out一直累加,如果累加到超過了unsigned int,那麼又從0開始賦值。同時裡面關於in和out的操作都是和size取模運算,所以in和out都會在size範圍內。這也是籠統的理解,但是這個理解是正確的。
在put的過程中,只要字串長度小於佇列空餘空間大小,in和Out無論從數值上怎麼越界,第一個min取值為len,第二個min中的fifo->size- (fifo->in & (fifo->size - 1))一定是從in偏移量開始到佇列末尾的空餘大小,這個值和len相比較,取其中較小者,換句話說,如果從in開始到佇列末尾的空餘空間長度大於字串長度,那麼第一個memcpy搞定一切,第二個memcpy不再從buffer的頭部開始複製資料;如果從in開始到佇列末尾的空餘空間大小小於字串長度,那麼從in開始的空間先複製滿,然後從空開始複製剩餘的字元。無論哪種情況都是這個過程。
2、對於get函式
情況1:fifo->in大於fifo->size而fifo->out小於fifo->size(即只有fifo->in“越界”),則先讀取fifo->out到fifo->size-1的那一段,大小為l個byte,然後再讀取剩下的從buffer開頭,大小為len-l個byte的資料(如下圖所示,即先讀data A段, 再讀出data B段);
情況2:fifo->in和fifo->out都“越界”了,那麼l = min(len, fifo->size - (fifo->out & (fifo->size -1))); 這一語句便起作用了,此時fifo->out&fifo->size-1的結果即實際要讀的資料所在的記憶體地址相對於buffer起始地址的偏移值(如下圖所示,左邊為實際上存在於記憶體中的data A段, 而右邊虛線框為邏輯上的data A段的位置);
分析:對於get的情況, 假設整個過程中fifo->in> fifo->out始終成立,並且假設get的字串長度始終小於佇列總資料的長度,第一個min的取值為min。第二個min中的fifo->size - (fifo->out & (fifo->size - 1))表示從out到佇列末尾的字串長度,如果這段字串的長度大於len,那麼第一個memcpy直接拷貝所要獲取的字串長度,第二個不拷貝,如果小於len的長度,從out開始拷貝到結束,然後從佇列的開始拷貝剩餘的字串。