1. 程式人生 > >Redis有序集合(sortSet)的底層實現

Redis有序集合(sortSet)的底層實現

Redis中支援的資料結構比Memcached要多,如基本的字串、雜湊表、列表、集合、可排序集,在這些基本資料結構上也提供了針對該資料結構的各種操作,這也是Redis之所以流行起來的一個重要原因,當然Redis能夠流行起來的原因,遠遠不只這一個,如支援高併發的讀寫、資料的持久化、高效的記憶體管理及淘汰機制...

從Redis的git提交歷史中,可以查到,2009/10/24在1.050版本,Redis開始支援可排序集,在該版本中,只提供了一條命令zadd,巨集定義如下所示:

1     {"zadd",zaddCommand,4,REDIS_CMD_BULK|REDIS_CMD_DENYOOM},

那麼什麼是可排序集呢? 從Redis 1.0開始就給我們提供了集合(Set)這種資料結構,集合就跟數學上的集合概念是一個道理【無序性,確定性,互異性】,集合裡的元素無法保證元素的順序,而業務上的需求,可能不止是一個集合,而且還要求能夠快速地對集合元素進行排序,於是乎,Redis中提供了可排序集這麼一種資料結構,似乎也是合情合理,無非就是在集合的基礎上增加了排序功能,也許有人會問,Redis中不是有Sort命令嘛,下面的操作不也是同樣可以達到對無序集的排序功能嘛,是的,是可以,但是在這裡我們一直強調的是快速這兩個字,而Sort命令的時間複雜度為O(N+M*Log(M)),可排序集獲取一定範圍內元素的時間複雜度為O(log(N) + M)

複製程式碼

[email protected]:/home/bjpengpeng/redis-3.0.1/src# ./redis-cli 
127.0.0.1:6379> sort set
1) "1"
2) "2"
3) "3"
4) "5"
127.0.0.1:6379> sort set desc
1) "5"
2) "3"
3) "2"
4) "1"
127.0.0.1:6379>

複製程式碼

在瞭解可排序集是如何實現之前,需要了解一種資料結構跳錶(Skip List),跳錶與AVL、紅黑樹...等相比,資料結構簡單,演算法易懂,但查詢的時間複雜度與平衡二叉樹/紅黑樹相當,跳錶的基本結構如下圖所示

上圖中整個跳錶結構存放了4個元素5->10->20->30,圖中的紅色線表示查詢元素30時,走的查詢路線,從Head指標數組裡最頂層的指標所指的20開始比較,與普通的連結串列查詢相比,跳錶的查詢可以跳躍元素,上圖中查詢30,發現30比20大,則查詢就是20開始,而普通連結串列的查詢必須一個元素一個元素的比較,時間複雜度為O(n)

有了上圖所示的跳錶基本結構,再看看如何向跳錶中插入元素,向跳錶中插入元素,由於元素所在層級的隨機性,平均起來也是O(logn),說白了,就是查詢元素應該插入在什麼位置,然後就是普通的移動指標問題,再想想往有序單鏈表的插入操作吧,時間複雜度是不是也是O(n),下圖所示是往跳錶中插入元素28的過程,圖中紅色線表示查詢插入位置的過程,綠色線表示進行指標的移動,將該元素插入

 

有了跳錶的查詢及插入那麼就看看在跳錶中如何刪除元素吧,跳錶中刪除元素的個程,查詢要刪除的元素,找到後,進行指標的移動,過程如下圖所示,刪除元素30

有了上面的跳錶基本結構圖及原理,自已設計及實現跳錶吧,這樣當看到Redis裡面的跳錶結構時我們會更加熟悉,更容易理解些,【下面是對Redis中的跳錶資料結構及相關程式碼進行精減後形成的可執行程式碼】,首先定義跳錶的基本資料結構如下所示

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

#include<stdio.h>

#include<stdlib.h>

#define ZSKIPLIST_MAXLEVEL 32

#define ZSKIPLIST_P 0.25

#include <math.h>

//跳錶節點

typedef struct zskiplistNode {

int key;

int value;

struct zskiplistLevel {

struct zskiplistNode *forward;

} level[1];

} zskiplistNode;

//跳錶

typedef struct zskiplist {

struct zskiplistNode *header;

int level;

} zskiplist;

在程式碼中我們定義了跳錶結構中儲存的資料為Key->Value這種形式的鍵值對,注意的是skiplistNode裡面內含了一個結構體,代表的是層級,並且定義了跳錶的最大層級為32級,下面的程式碼是建立空跳錶,以及層級的獲取方式

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

//建立跳錶的節點

zskiplistNode *zslCreateNode(int level, int key, int value) {

zskiplistNode *zn = (zskiplistNode *)malloc(sizeof(*zn)+level*sizeof(zn->level));

zn->key = key;

zn->value = value;

return zn;

}

//初始化跳錶

zskiplist *zslCreate(void) {

int j;

zskiplist *zsl;

zsl = (zskiplist *) malloc(sizeof(*zsl));

zsl->level = 1;//將層級設定為1

zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,NULL,NULL);

for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {

zsl->header->level[j].forward = NULL;

}

return zsl;

}

//向跳錶中插入元素時,隨機一個層級,表示插入在哪一層

int zslRandomLevel(void) {

int level = 1;

while ((rand()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))

level += 1;

return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;

}

在這段程式碼中,使用了隨機函式獲取過元素所在的層級,下面就是重點,向跳錶中插入元素,插入元素之前先查詢插入的位置,程式碼如下所示,程式碼中注意update[i]

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

//向跳錶中插入元素

zskiplistNode *zslInsert(zskiplist *zsl, int key, int value) {

zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

int i, level;

x = zsl->header;

//在跳錶中尋找合適的位置並插入元素

for (i = zsl->level-1; i >= 0; i--) {

while (x->level[i].forward &&

(x->level[i].forward->key < key ||

(x->level[i].forward->key == key &&

x->level[i].forward->value < value))) {

x = x->level[i].forward;

}

update[i] = x;

}

//獲取元素所在的隨機層數

level = zslRandomLevel();

if (level > zsl->level) {

for (i = zsl->level; i < level; i++) {

update[i] = zsl->header;

}

zsl->level = level;

}

//為新建立的元素建立資料節點

x = zslCreateNode(level,key,value);

for (i = 0; i < level; i++) {

x->level[i].forward = update[i]->level[i].forward;

update[i]->level[i].forward = x;

}

return x;

}

下面是程式碼中刪除節點的操作,和插入節點類似

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

//跳錶中刪除節點的操作

void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {

int i;

for (i = 0; i < zsl->level; i++) {

if (update[i]->level[i].forward == x) {

update[i]->level[i].forward = x->level[i].forward;

}

}

//如果層數變了,相應的將層數進行減1操作

while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)

zsl->level--;

}

//從跳錶中刪除元素

int zslDelete(zskiplist *zsl, int key, int value) {

zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;

int i;

x = zsl->header;

//尋找待刪除元素

for (i = zsl->level-1; i >= 0; i--) {

while (x->level[i].forward &&

(x->level[i].forward->key < key ||

(x->level[i].forward->key == key &&

x->level[i].forward->value < value))) {

x = x->level[i].forward;

}

update[i] = x;

}

x = x->level[0].forward;

if (x && key == x->key && x->value == value) {

zslDeleteNode(zsl, x, update);

//別忘了釋放節點所佔用的儲存空間

free(x);

return 1;

else {

//未找到相應的元素

return 0;

}

return 0;

}

最後,附上一個不優雅的測試樣例

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

//將連結串列中的元素打印出來

void printZslList(zskiplist *zsl) {

zskiplistNode  *x;

x = zsl->header;

for (int i = zsl->level-1; i >= 0; i--) {

zskiplistNode *p = x->level[i].forward;

while (p) {

printf(" %d|%d ",p->key,p->value);

p = p->level[i].forward;

}

printf("\n");

}

}

int main() {

zskiplist *list = zslCreate();

zslInsert(list,1,2);

zslInsert(list,4,5);

zslInsert(list,2,2);

zslInsert(list,7,2);

zslInsert(list,7,3);

zslInsert(list,7,3);

printZslList(list);

//zslDelete(list,7,2);

printZslList(list);

}

有了上面的跳錶理論基礎,理解Redis中跳錶的實現就不是那麼難了

Redis中跳錶的基本資料結構定義如下,與基本跳錶資料結構相比,在Redis中實現的跳錶其特點是不僅有前向指標,也存在後向指標,而且在前向指標的結構中存在span跨度欄位,這個跨度欄位的出現有助於快速計算元素在整個集合中的排名

複製程式碼

//定義跳錶的基本資料節點
typedef struct zskiplistNode {
    robj *obj; // zset value
    double score;// zset score
    struct zskiplistNode *backward;//後向指標
    struct zskiplistLevel {//前向指標
        struct zskiplistNode *forward;
        unsigned int span;
    } level[];
} zskiplistNode;

typedef struct zskiplist {
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level;
} zskiplist;

//有序集資料結構
typedef struct zset {
    dict *dict;//字典存放value,以value為key
    zskiplist *zsl;
} zset;

複製程式碼

將如上資料結構轉化成更形式化的圖形表示,如下圖所示

 

在上圖中,可以看到header指標指向的是一個具有固定層級(32層)的表頭節點,為什麼定義成32,是因為定義成32層理論上對於2^32-1個元素的查詢最優,而2^32=4294967296個元素,對於絕大多數的應用來說,已經足夠了,所以就定義成了32層,到於為什麼查詢最優,你可以將其想像成一個32層的完全二叉排序樹,算算這個樹中節點的數量

Redis中有序集另一個值得注意的地方就是當Score相同的時候,是如何儲存的,當集合中兩個值的Score相同,這時在跳錶中儲存會比較這兩個值,對這兩個值按字典排序儲存在跳錶結構中

有了上述的資料結構相關的基礎知識,來看看Redis對zskiplist/zskiplistNode的相關操作,原始碼如下所示(原始碼均出自t_zset.c)

建立跳錶結構的原始碼

複製程式碼

//#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
zskiplist *zslCreate(void) {
    int j;
    zskiplist *zsl;
    //分配記憶體
    zsl = zmalloc(sizeof(*zsl));
    zsl->level = 1;//預設層級為1
    zsl->length = 0;//跳錶長度設定為0
    zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
    for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
        //因為沒有任何元素,將表頭節點的前向指標均設定為0
        zsl->header->level[j].forward = NULL;
        //將表頭節點前向指標結構中的跨度欄位均設為0
        zsl->header->level[j].span = 0;
    }
    //表頭後向指標設定成0
    zsl->header->backward = NULL;
    //表尾節點設定成NULL
    zsl->tail = NULL;
    return zsl;
}

複製程式碼

在上述程式碼中呼叫了zslCreateNode這個函式,函式的原始碼如下所示=

複製程式碼

zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
    zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
    zn->score = score;
    zn->obj = obj;
    return zn;
}

複製程式碼

執行完上述程式碼之後會建立如下圖所示的跳錶結構

 

建立了跳錶的基本結構,下面就是插入操作了,Redis中原始碼如下所示

複製程式碼

zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x; //update[32]
    unsigned int rank[ZSKIPLIST_MAXLEVEL];//rank[32]
    int i, level;
    redisAssert(!isnan(score));
    x = zsl->header;
    //尋找元素插入的位置 
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
            (x->level[i].forward->score < score || //以下是得分相同的情況下,比較value的字典排序
                (x->level[i].forward->score == score &&compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    //產生隨機層數
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        //記錄最大層數
        zsl->level = level;
    }
    //產生跳錶節點
    x = zslCreateNode(level,score,obj);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;
        //更新跨度
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }
    //此種情況只會出現在隨機出來的層數小於最大層數時
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }
    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

複製程式碼

上述原始碼中,有一個產生隨機層數的函式,原始碼如下所示:

複製程式碼

int zslRandomLevel(void) {
    int level = 1;
    //#define ZSKIPLIST_P 0.25 
    while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
        level += 1;
    //#ZSKIPLIST_MAXLEVEL 32
    return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}

複製程式碼

圖形化的形式描述如下圖所示:

理解了插入操作,其他查詢,刪除,求範圍操作基本上類似