1. 程式人生 > >Redis資料型別之有序集合zset

Redis資料型別之有序集合zset

Redis中的zset主要支援以下命令:

  1. zadd、zincrby
  2. zrem、zremrangebyrank、zremrangebyscore、zremrangebyrank
  3. zrange、zrevrange、zrangebyscore、zrevrangebyscore、zrangebylex、zrevrangebylex
  4. zcount、zcard、zscore、zrank、zrevrank
  5. zunionstore、zinterstore

zset的原始碼主要涉及redis.h和t_zset.c兩個檔案。

1、跳躍表skiplist

Redis中的zset在實現時用到了跳躍表skiplist這種資料結構

。skiplist是一種基於並聯連結串列的、隨機化的資料結構,由 William Pugh 在論文《Skip lists: a probabilistic alternative to balanced trees》中首次提出,可以實現平均複雜度為O(longN)的插入、刪除和查詢操作。

下面我們主要來介紹跳躍表在Redis中的實現和應用。關於跳躍表的原理和程式碼實現,我在網上找到一篇不錯的文章,如果你還不瞭解跳躍表相關知識,可以先看看這篇文章:skiplist 跳躍表詳解及其程式設計實現

1.1、跳躍表的儲存結構

Redis中的跳躍表實現和William Pugh在《Skip Lists: A Probabilistic Alternative to Balanced Trees》一文中描述的跳躍表基本一致,主要有以下三點進行了修改:

  1. Redis中的跳躍表允許有重複的分值score,以支援有序集合中多個元素可以有相同的分值score。
  2. 節點的比較操作不僅僅比較其分值score,同時還要比較其關聯的元素值value。
  3. 每個節點還有一個後退指標(相當於雙向連結串列中的prev指標),通個該指標,我們可以從表尾向表頭遍歷列表。這個屬性可以實現zset的一些逆向操作命令如zrevrange。

跳躍表的節點定義在redis.h標頭檔案中:

/* 跳躍表節點定義 */
typedef struct zskiplistNode {
    // 存放的元素值
    robj *obj;
    // 節點分值,排序的依據
    double score;
    // 後退指標
struct zskiplistNode *backward; // 層 struct zskiplistLevel { // 前進指標 struct zskiplistNode *forward; // 跨越的節點數量 unsigned int span; } level[]; } zskiplistNode;

跳躍表的定義如下:

/* 跳躍表定義 */
typedef struct zskiplist {
    // 跳躍表的頭結點和尾節點
    struct zskiplistNode *header, *tail;
    // 節點數量
    unsigned long length;
    // 目前跳躍表的最大層數
    int level;
} zskiplist;

跳躍表主要操作實現在t_zset.c中,主要包括以下操作:

// 建立一個層數為level的跳躍表節點
zskiplistNode *zslCreateNode(int level, double score, robj *obj);
// 建立一個跳躍表
zskiplist *zslCreate(void);
// 釋放指定的跳躍表節點
void zslFreeNode(zskiplistNode *node);
// 釋放跳躍表
void zslFree(zskiplist *zsl);
// 往跳躍表中插入一個新節點
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj);
// 刪除節點函式,供zslDelete、zslDeleteByScore和zslDeleteByRank函式呼叫 
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update);
// 從從跳躍表中刪除一個分值score、儲存物件為obj的節點
int zslDelete(zskiplist *zsl, double score, robj *obj);
// 如果range給定的數值範圍包含在跳躍表的分值範圍則返回1,否則返回0
int zslIsInRange(zskiplist *zsl, zrangespec *range);
// 返回跳躍表中第一個分值score在range指定範圍的節點
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range);
// 返回跳躍表中最後一個分值score在range指定範圍的節點
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range);
//  在跳躍表中刪除所有分值在給定範圍range內的節點
unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict);
// 刪除成員物件值在指定字典序範圍的節點
unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict);
//  在跳躍表中刪除給定排序範圍的節點
unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict);
// 返回指定元素在跳躍表中的排位
unsigned long zslGetRank(zskiplist *zsl, double score, robj *o);
// 返回指定排位上的節點
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank)

接下來我們舉兩個列子,讓大家能更好地理解跳躍表的儲存結構。

(1)、建立一個空的跳躍表

zslCreate用來建立並初始化一個新的跳躍表,一個空的跳躍表如下所示:

這裡寫圖片描述

其中level0 - level31是一個長度為32(由ZSKIPLIST_MAXLEVEL定義,值為32)的zskiplistLevel 結構體陣列,zskiplistLevel結構體包含span和forward兩個成員,這裡為了方便展示忽略了span。

(2)、插入操作

跳躍表中的元素是按分值score排序的,如果我們往跳躍表中插入了a、b、c、d四個元素,對應的分值為3、5、7、9,則對應的跳躍表結構如下所示:

這裡寫圖片描述

2、zset編碼方式

有序集合zset主要有兩種編碼方式:REDIS_ENCODING_ZIPLIST和REDIS_ENCODING_SKIPLIST。ziplist可以表示較小的有序集合, skiplist表示任意大小的有序集合。

前面我們介紹List資料型別時,List以ziplist作為預設編碼。但在zset中則採取不同的策略,zset會根據zadd命令新增的第一個元素的長度大小來選擇建立編碼方式。具體而言:如果滿足下面兩個條件之一則使用ziplist編碼方式:

  1. Redis中server.zset_max_ziplist_entries的值不為0。
  2. 第一個元素值的長度小於server.zset_max_ziplist_value(預設值為64)。

反之,則使用skiplist編碼方式。

該過程實現在在zaddGenericCommand函式中,這裡只擷取部分程式碼用作展示:

    ...
    /* Lookup the key and create the sorted set if does not exist. */
    // 取出有序集合物件
    zobj = lookupKeyWrite(c->db,key);
    // 如果key指定的有序集合物件不存在則建立一個
    if (zobj == NULL) {
        // server.zset_max_ziplist_entries的預設值為128
        // server.zset_max_ziplist_value的預設值為64
        if (server.zset_max_ziplist_entries == 0 ||
            server.zset_max_ziplist_value < sdslen(c->argv[3]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetZiplistObject();
        }
        dbAdd(c->db,key,zobj);
    } 
    // 如果key指定的物件存在,還需要進一步檢查其型別是否是zset
    else {

        if (zobj->type != REDIS_ZSET) {
            addReply(c,shared.wrongtypeerr);
            goto cleanup;
        }
    }
    ...

如果zset當前使用REDIS_ENCODING_ZIPLIST編碼,當滿足下面兩個條件之一時會轉換為REDIS_ENCODING_SKIPLIST編碼:

  1. 當待新增的新字串長度超過server.zset_max_ziplist_value (預設值為64)時。
  2. ziplist中儲存的節點數量超過server.zset_max_ziplist_entries(預設值為128)時。

兩種編碼的轉換由zsetConvert函式實現。

2.1、ziplist編碼的zset儲存結構

在zset中,每個元素包含兩個成員:元素值、分值。如果使用ziplist編碼如何來儲存這兩個成員呢?Redis用ziplist中相鄰的兩個節點來存放zset中的一個元素,這兩個節點分別儲存元素值和分值。為了方便描述,我們稱這兩個為“元素值節點”和“分值節點”。同時,為了維持zset的有序性,ziplist中的節點兩兩一組並按分值score從小到大排序。

所以ziplist編碼的zset儲存結構如下所示:

這裡寫圖片描述

2.2、skiplist編碼的zset儲存結構

skiplist編碼的有序集合定義在redis.h標頭檔案中:

/* 有序集合結構體 */
typedef struct zset {
    // 字典,維護元素值和分值的對映關係
    dict *dict;
    // 按分值對元素值排序序,支援O(longN)數量級的查詢操作
    zskiplist *zsl;
} zset;

看到這裡你會不會感到奇怪:不是說以skiplist編碼嗎,為什麼還會有dict在裡面?這裡解釋一下dict的作用。在上面介紹中我們說過跳躍表可以實現平均複雜度為O(longN)的插入、刪除和查詢操作,這是zset高效執行的基礎。但是zset還需要支援諸如獲取元素值對應的分值、判斷某元素值是否存在zset中等命令,對於這些操作,如果在跳躍表的基礎上實現效果並不好。所以作者增加了一個dict來維護元素值和分值的對映關係(鍵為元素值、值為分值),這樣就能快速獲取指定成員的分值,彌補skiplist在這方面的不足。

到這裡,我們已經瞭解了zset兩種編碼方式的儲存結構。類似我們前面介紹的List型別,zset相關函式的主要功能之一就是要在ziplist和skiplist這兩種結構上維護一份統一的zset操作介面,以遮蔽底層的差異。這些操作沒有什麼難點,這裡就不一一贅述,大家可以參看我後面提供的註釋原始碼。

3、範圍操作命令

zset中有很多跟範圍相關的命令,大致可以歸納為以下三種:

  1. 獲取或刪除指定排位區間內的元素,如zrange、zrevrange、zremrangebyrank命令。
  2. 獲取或刪除指定分值區間內的元素,如zrangebyscore、zrevrangebyscore、zremrangebyscore命令。
  3. 獲取或刪除指定字典序區間內的元素,如zrangebylex、zremrangebylex。對於這種情況,需要注意:只有當插入到有序集合(Sorted set)中的所有元素的分值score都相同時,使用zrangebylex或zremrangebylex命令可以認為儲存在有序集合中的元素是按字典序排序(Lexicographical ordering)的,然後返回或刪除元素值在最小值 min 及最大值 max 之間的所有元素。如果有序集合中的元素存在不同的分值,所返回或刪除的元素是不確定的。

為了方便範圍操作,Redis在redis.h標頭檔案中定了了分值區間結構體和字典區間結構體:

/* Struct to hold a inclusive/exclusive range spec by score comparison. */
/* 指明某個區間為開區間 or 閉區間 的結構體 */
typedef struct {
    // 最小值、最大值
    double min, max;
    // 是否包含最小值、是否包含最大值(0表示包含、1表示不包含)
    int minex, maxex; /* are min or max exclusive? */
} zrangespec;

/* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */
/* 以字典順序表示的開區間 or 閉區間 */
typedef struct {
    robj *min, *max;  /* May be set to shared.(minstring|maxstring) */
    // 是否包含最小值、是否包含最大值(0表示包含、1表示不包含)
    int minex, maxex; /* are min or max exclusive? */
} zlexrangespec;

我們以zslDeleteRangeByScorezzlDeleteRangeByScore函式為例,總結這類區間操作的一般過程。

zslDeleteRangeByScore函式用於刪除skiplist編碼的有序集合中分值在指定範圍的元素,實現如下:

unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict) {
    // update陣列用來記錄降層節點
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long removed = 0;
    int i;

    x = zsl->header;
    // 從前往後遍歷,記錄降層節點,方面以後修改指標
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward && (range->minex ?
            x->level[i].forward->score <= range->min :
            x->level[i].forward->score < range->min))
                x = x->level[i].forward;
        update[i] = x;
    }

    /* Current node is the last with score < or <= min. */
    // 定位到第一次中待刪除的第一個節點
    x = x->level[0].forward;

    /* Delete nodes while in range. */
    // 刪除range指定範圍內的所有節點
    while (x &&
           (range->maxex ? x->score < range->max : x->score <= range->max))
    {
        // 記錄下一個節點的位置
        zskiplistNode *next = x->level[0].forward;
        // 刪除節點
        zslDeleteNode(zsl,x,update);
        // 刪除dict中相應的元素
        dictDelete(dict,x->obj);
        zslFreeNode(x);
        // 記錄刪除節點個數
        removed++;
        // 指向下一個節點
        x = next;
    }
    return removed;
}

zzlDeleteRangeByScore函式用於刪除ziplist編碼的有序集合中分值在指定範圍的元素,實現如下:

unsigned char *zzlDeleteRangeByScore(unsigned char *zl, zrangespec *range, unsigned long *deleted) {
    unsigned char *eptr, *sptr;
    double score;
    unsigned long num = 0;

    if (deleted != NULL) *deleted = 0;

    // 指向ziplist中分值落在指定範圍的第一個節點
    eptr = zzlFirstInRange(zl,range);
    if (eptr == NULL) return zl;

    /* When the tail of the ziplist is deleted, eptr will point to the sentinel
     * byte and ziplistNext will return NULL. */
    // 一直刪除節點一直遇到不在range指定範圍內的節點為止
    while ((sptr = ziplistNext(zl,eptr)) != NULL) {
        score = zzlGetScore(sptr);
        if (zslValueLteMax(score,range)) {
            /* Delete both the element and the score. */
            zl = ziplistDelete(zl,&eptr);
            zl = ziplistDelete(zl,&eptr);
            num++;
        } else {
            /* No longer in range. */
            break;
        }
    }

    if (deleted != NULL) *deleted = num;
    return zl;
}

我們可以看到,zset範圍操作的一般過程是:

  1. 先找到ziplist或skiplist中落在指定區間內開始迭代的第一個節點。比如分值指定分值區間為[3,10],如果是正向操作的話,第一步先找出第一個分值大於或等於3的節點,接下來往後遍歷;如果是逆向操作的話,第一步先找出第一個分值小於或等於10的節點,接下來往前遍歷。
  2. 上一步中得到開始遍歷節點,接下來就從該節點開始依次正向或逆向遍歷直到遇到不滿足要求的節點後跳出。

zset的實現分析大概就講這麼多吧,關於細節方面的東西大家還是需要看看原始碼,這樣子更好把握。