我們再來學習如何從跳躍表中查詢資料,跳躍表本質上是一個連結串列,但它允許我們像陣列一樣定位某個索引區間內的節點,並且與陣列不同的是,跳躍表允許我們將頭節點L0層的前驅節點(即跳躍表分值最小的節點)zsl->header.level[0].forward當成索引0的節點,尾節點zsl->tail(跳躍表分值最大的節點)當成索引zsl->length-1的節點,索引按分值從小到大遞增查詢;也允許我們將尾節點當成索引0的節點,頭節點L0層的前驅節點當做索引zsl->length-1的節點,索引按分值從大到小遞增查詢。當我們呼叫下面的方法按照索引區間來查詢時,會把我們的索引轉換成跨度,然後查詢落在跨度的第一個節點,之後根據reverse(逆向狀態)決定是要正向查詢還是逆向查詢。

假設我們要進行正向查詢(即:索引按分值從小到大遞增查詢),給定的索引區間是[0,2],那麼我們要找到跨度為1的節點,然後從跨度為1的節點L0層逐個遞進,直到停留在跨度為3的節點,頭節點L0層的前驅節點zsl->header.level[0].forward在跳躍表的索引為0,跨度為1,在跨度為1的節點從L0層逐個遞進,一直遞進到跨度為3的節點,這樣便完成了索引區間[0,2]的查詢。如果我們要進行逆向查詢(即:索引按分值從大到小遞增查詢),索引區間依舊是[0,2],那麼我們要找到跨度為跨度為zsl->length-0=zsl->length的節點,那自然是尾節點,找到跨度區間的第一個節點後,我們通過backward指標逐個後退,一直後退到跨度為zsl->length-2的節點,如此便完成查詢。

void zrangeGenericCommand(client *c, int reverse) {
robj *key = c->argv[1];
robj *zobj;
int withscores = 0;
long start;
long end;
long llen;
long rangelen; //讀取起始索引和終止索引,如果存在一個索引讀取失敗,則退出
if ((getLongFromObjectOrReply(c, c->argv[2], &start, NULL) != C_OK) ||
(getLongFromObjectOrReply(c, c->argv[3], &end, NULL) != C_OK))
return;
//判斷是否要返回分值
if (c->argc == 5 && !strcasecmp(c->argv[4]->ptr, "withscores")) {
withscores = 1;
} else if (c->argc >= 5) {
addReply(c, shared.syntaxerr);
return;
}
//判斷key是否存在,如果不存在則退出,如果存在但型別不為ZSET也退出。
if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL
|| checkType(c, zobj, OBJ_ZSET))
return; /*
* Sanitize indexes.
* 審查索引,這裡主要針對傳入索引為負數的情況,大家都知道,如果一個
* 跳躍表的節點個數為N,我們要從起始節點查詢到末尾節點,可以用[0,N-1]
* 或者[0,-1],當傳入的end<0時,這裡會重新規正end的索引,llen為zset的長度,
* 因此查詢[0,-1],這裡會規正為[0,N-1]。同理,start也會被規正,如果我們查詢
* [-5,-3],即代表查詢有序集合倒數第5個節點至倒數第三個節點,前提是N>=5這個
* 查詢才有意義。如果我們的起始索引傳入的是一個絕對值>N的負數,那麼llen + start的
* 結果也為負數,如果判斷start<0,則start會被規正為0。
* */
llen = zsetLength(zobj);
if (start < 0) start = llen + start;
if (end < 0) end = llen + end;
if (start < 0) start = 0; /*
* Invariant: start >= 0, so this test will be true when end < 0.
* The range is empty when start > end or start >= length.
* 如果起始索引大於終止索引,或者起始索引大於等於有序集合節點數量,則直接
* 返回空陣列。
* */
if (start > end || start >= llen) {
addReply(c, shared.emptyarray);
return;
}
//如果判斷終止索引大於等於節點數,則規整為llen-1
if (end >= llen) end = llen - 1;
//計算要返回的節點數
rangelen = (end - start) + 1;
//……
if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//壓縮列表邏輯……
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln;
sds ele; /*
* Check if starting point is trivial, before doing log(N) lookup.
* 這裡會根據給定的開始索引,查詢該索引對應的節點,並將ln指向該節點。
* 需要注意的一點是:平常我們都認為header.level[0].forward指向的節點,在跳躍表
* 中的索引為0,但有時候跳躍表的末尾節點zsl->tail的索引值也有可能為0,這裡就要提到
* 逆向查詢。
* 當我們使用ZRANGE key min max [WITHSCORES]命令查詢時,成員的位置是按照其分值
* 從小到大來排序,這時候header.level[0].forward的索引值為0,
* header.level[0].forward.level[0].forward的索引值為1。而zls->tail的索引值
* 為zls->length-1。
* 當我們使用ZREVRANGE key start stop [WITHSCORES]命令查詢時,成員的位置是按照
* 其分值從大到小來排序,這時候zls->tail的索引值為0,header.level[0].forward的
* 索引值為zls->length-1,header.level[0].forward.level[0].forward的索引值為
* zls->length-2。當reverse為1時,本次查詢即為逆向查詢。
* 我們注意到不管是if還是else分值,只要start>0,最終都會執行zslGetElementByRank()
* 將ln定位到起始節點。當start為0時,如果是逆向查詢,則索引0的位置是尾節點zsl->tail,
* 如果是正向查詢,索引0的位置則是zsl->header->level[0].forward。
* 那麼(llen - start)和(start + 1)又代表什麼含義呢?為什麼zslGetElementByRank()
* 可以根據這兩個公式的計算結果,定位到索引對應的節點呢?其實這兩個公式計算的是跨度,而
* zslGetElementByRank()則是根據給定的跳躍表和跨度查詢節點而已。
* 如果是正常查詢,假設起始索引為0,則跨度為start(0)+1=1,剛好為頭節點L0層到達第一個節點的
* 跨度為1;如果起始索引為1,則跨度為start(1)+1=2,剛好是頭節點到達索引值為1的節點的跨度。
* 如果是逆向查詢,索引值為0代表尾節點,而llen-start(0)=llen為頭節點到達尾節點的跨度;同理,
* 倒數第二個節點的索引值為1,頭節點到達倒數第二個節點的跨度為llen-start(1)=llen-1。
* */
if (reverse) {
ln = zsl->tail;
if (start > 0)
ln = zslGetElementByRank(zsl, llen - start);
} else {
ln = zsl->header->level[0].forward;
if (start > 0)
ln = zslGetElementByRank(zsl, start + 1);
}
//定位到起始節點後,根據逆向狀態,不為0時後退查詢(ln-backward),為0時遞進查詢(ln->level[0].forward)
while (rangelen--) {
serverAssertWithInfo(c, zobj, ln != NULL);
ele = ln->ele;
if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, ele, sdslen(ele));
if (withscores) addReplyDouble(c, ln->score);
ln = reverse ? ln->backward : ln->level[0].forward;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}

  

查詢到跨度對應的節點,查詢到跨度對應的節點,則在<1>處返回,如果我們傳入的跨度大於頭節點到尾節點的跨度,則返回NULL。

/*
* Finds an element by its rank. The rank argument needs to be 1-based.
* */
zskiplistNode *zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;//累計跨度
int i; x = zsl->header;
/*
* 從頭節點的最高層出發,如果基於當前層能夠遞進到前一個節點,
* 則把當前節點的跨度加到traversed。
*/
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && (traversed + x->level[i].span) <= rank) {
traversed += x->level[i].span;
x = x->level[i].forward;
}
/*
* 如果累計跨度與呼叫方傳入的跨度相等,則代表x已經前進到呼叫方
* 所要求達到的跨度的節點,返回x。
*/
if (traversed == rank) {
return x;//<1>
}
}
//如果傳入的跨度大於頭節點到尾節點的跨度,則返回NULL。
return NULL;
}

  

跳躍表除了可以根據索引區間來查詢,還可以根據分值區間來查詢,這裡我們又見到了結構體zrangespec。當我們需要判斷一個節點是否落在我們指定的分值區間內,需要呼叫zslValueGteMin()和zslValueLteMax(),當傳入一個指定的分值和區間,zslValueGteMin()和zslValueLteMax()的結果不為0,則表明節點落在分值區間內。此外,這兩個方法還可以判斷一個跳躍表是否和區間有交集,比如呼叫zslValueGteMin()時,傳入尾節點(跳躍表分值最大的節點)及一個指定區間,如果尾節點沒有落在指定區間,代表此區間都大於尾節點,此時我們不需要遍歷跳躍表即可返回一個空陣列,告訴客戶端在指定區間內找不到任何節點;同理,呼叫zslValueLteMax()時傳入頭節點L0層的前驅節點(跳躍表分值最小節點)沒有落在區間內,則表明區間小於跳躍表,同樣不需要遍歷跳躍表即可返回空陣列給客戶端,告訴客戶端在指定區間內找不到任何節點。

/*
* Struct to hold a inclusive/exclusive range spec by score comparison.
* 此結構體用於表示一個指定區間,minex為0時表示在進行最小值比較時,要包含最小值本身
* 同理maxex為0時表示進行最大值比較時,要包含最大值本身。
* 比如:min=2,max=9
* 當minex=0,maxex=0時,區間為:[2,9]
* 當minex=1,maxex=0時,區間為:(2,9]
* 當minex=0,maxex=1時,區間為:[2,9)
* 當minex=1,maxex=1時,區間為:(2,9)
* */
typedef struct {
double min, max;
int minex, maxex; /* are min or max exclusive? */
} zrangespec; /*
* 如果spec->minex不為0,返回分值是否大於區間最小值的比較結果,
* 為0則返回分值是否大於等於區間最小值的比較結果。
* 如果傳入一個跳躍表尾節點的分值zsl->tail.score(即:跳躍表最大分值)和區間返回結果為0,
* 則表示跳躍表和區間沒有交集。
* 這裡分兩種情況:
* spec->minex不為0:區間要查詢分值大於spec->min的元素,
* zsl->tail.score<=spec->min代表跳躍表最大分值小於等於min,返回結果為0。
* spec->minex為0:區間要查詢分值大於等於spec->min的元素,
* zsl->tail.score<spec->min代表跳躍表最大分值小於min,返回結果為0。
*/
int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
} /*
* 如果spec->maxex不為0,返回分值是否小於區間最大值的比較結果,為0則返回分值
* 是否小於等於區間最大值的比較結果。
* 如果傳入一個跳躍表頭節點L0層指向節點的分值
* zsl->header.level[0].forward.score(即:跳躍表最小分值)和
* 區間返回結果為0,則表示跳躍表和區間沒有交集。
* 這裡分兩種情況:
* spec->maxex不為0:區間要查詢分值小於spec->max的元素,
* zsl->header.level[0].forward.score>=spec->max代表跳躍表最小分值大於等於區間
* 最大分值,返回結果為0。
* spec->maxex為0:區間要查詢分值小於等於spec->max的元素,
* zsl->header.level[0].forward.score>spec->max代表跳躍表最小分值大於區間最大分值,
* 返回結果為0。
*/
int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}

    

在真正根據分值區間查詢跳躍表前,會校驗區間是否有效,如果我們輸入一個區間[a,b],但a>b,那麼這個區間肯定是無效區間,無須遍歷跳躍表;如果a=b,如果區間的開閉狀態出現:(a,b)、(a,b]、[a,b)這三種情況,也是無效區間,只有[a,b]才會去查詢節點,表示需要查詢分值為a(或者b)的節點。當校驗完區間是有效後,還會呼叫zslValueGteMin()和zslValueLteMax()判斷跳躍表和區間是否存在交集,即區間是否整體大於跳躍表或整體小於跳躍表,如果出現這兩種情況則表明區間和跳躍表無交集,也就不需要遍歷。

/*
* Returns if there is a part of the zset is in range.
* 判斷跳躍表和區間是否存在交集
* */
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x; /*
* Test for ranges that will always be empty.
* 校驗區間範圍是否有效,無效則返回0表示查詢結果為空:
* 1.如果最小值大於最大值,則無效。
* 2.如果最小值等於最大值,且區間為:(min,max)、(min,max]、[min,max)則無效。
* */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
/*
* 如果尾節點不為NULL,則把跳躍表最大分值zsl->tail.score與區間比較,
* 如果range->minex不為0,則查詢分值大於range->min的元素,如果跳躍表
* 最大分值zsl->tail.score小於等於range->min,則表示跳躍表和區間沒有交集,
* 無須遍歷跳躍表查詢;同理如果range->minex為0,則查詢分值大於等於range->min
* 的元素,如果zsl->tail.score小於range->min,則表示跳躍表和區間沒有交集,
* 也無須遍歷跳躍表查詢。
*/
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score, range))
return 0;
/*
* 如果頭節點L0層的前驅節點不為NULL,則把跳躍表最小分值zsl->header->level[0].forward.score
* 與區間比較,如果range->maxex不為0,則查詢分值小於range->maxex的元素,如果跳躍表最小
* 分值zsl->header->level[0].forward.score大於等於range->max,則表示跳躍表和區間沒有交集,
* 無須遍歷跳躍表查詢;同理如果range->maxex為0,則查詢分值小於等於range->maxex的元素,如果跳躍表
* 最小分值zsl->header->level[0].forward.score大於range->maxex,則表示跳躍表和區間沒有交集,
* 也無須遍歷跳躍表查詢。
*/
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score, range))
return 0;
//跳躍表和區間存在交集,需要遍歷跳躍表查詢。
return 1;
}

  

跳躍表允許我們根據分值區間進行正向查詢(分值從小到大)或逆向查詢(分值從大到小),如果是正向查詢,則呼叫zslFirstInRange()方法,先判斷跳躍表和指定區間是否存在交集,如果存在則查詢指定區間內的分值最小的節點並返回。

/*
* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range.
* 查詢落在指定區間的第一個節點,如果沒有元素落在這個區間則返回NULL。
* */
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i; /*
* If everything is out of range, return early.
* 如果跳躍表和區間沒有交集則無須遍歷,直接返回NULL。
* */
if (!zslIsInRange(zsl, range)) return NULL;
//從頭節點的最高層開始遍歷
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
/*
* Go forward while *OUT* of range.
* 如果x->level[i].forward不為NULL,根據其分值x->level[i].forward->score和
* range->minex判斷前驅節點是否能前進。
* 這裡分兩種情況:
* range->minex不為0:判斷前驅節點的分值是否大於range->min,如果小於等於的話
* expression=zslValueGteMin(x->level[i].forward->score, range)為0,
* 代表需要前進,找到大於min的節點,而!expression為1,while條件成立,x會
* 前進到它的前驅節點。當x的前驅節點的分值大於min,就會停止迴圈,x會停留在區間內
* 第一個節點的後繼節點。
* range->minex為0:判斷前驅節點的分值是否大於等於range->min,如果小於的話,
* expression=zslValueGteMin(x->level[i].forward->score, range),expression為0,
* 需要前進,找到大於等於min的節點,而!expression為1,while條件成立,x會
* 前進到它的前驅節點。當x的前驅節點的分值大於等於min,就會停止迴圈,x會停留在區間內
* 第一個節點的後繼節點。
* */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score, range))
x = x->level[i].forward;
} /*
* This is an inner range, so the next node cannot be NULL.
* 上面的迴圈會讓x停留在區間內第一個節點的後繼節點,為了達到區間內的
* 第一個節點,x要在L0層前進到它的前驅節點。
* */
x = x->level[0].forward;
serverAssert(x != NULL); /*
* Check if score <= max.
* range->maxex不為0:如果區間內第一個節點的分值大於等於spec->max,
* expression=zslValueLteMax(x->score, range),expression結果為0
* !expression為1,表示查詢異常,返回NULL。
* range->maxex為0:如果區間內第一個節點的分值大於spec->max,
* 則expression=zslValueLteMax(x->score, range),expression結果為0
* !expression為1,表示查詢異常,返回NULL。
* */
if (!zslValueLteMax(x->score, range)) return NULL;
return x;
}

  

如果要進行逆向查詢,則呼叫zslLastInRange(),這裡同樣先判斷跳躍表是否和區間存在交集,只有存在交集才會進行下一步的判斷,查詢指定區間內分值最大的節點並返回。

/*
* Find the last node that is contained in the specified range.
* Returns NULL when no element is contained in the range.
* 查詢落在指定區間的最後一個節點,如果沒有元素落在這個區間則返回NULL。
* */
zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i; /*If everything is out of range, return early.*/
if (!zslIsInRange(zsl, range)) return NULL;

//從頭節點的最高層開始遍歷
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
/*
* Go forward while *IN* range.
* 根據區間range和前驅節點的分值判斷是否前進,如果x->level[i].forward
* 不為NULL,根據range->maxex判斷前驅節點是否能前進。
* 這裡分兩種情況:
* 如果range->maxex不為0,且前驅節點的分值小於range->max,則可以前進。
* 如果range->maxex為0,且前驅節點的分值小於等於range->max,則可以前進。
* */
while (x->level[i].forward &&
zslValueLteMax(x->level[i].forward->score, range))
x = x->level[i].forward;
} /* This is an inner range, so this node cannot be NULL. */
serverAssert(x != NULL); /*
* Check if score >= min.
* 如果range->minex不為0,x的分值小於或等於range->min,代表查詢出現異常,則返回NULL。
* 如果range->minex為0,x的分值小於range->min,代表查詢出現異常,則返回NULL。
* */
if (!zslValueGteMin(x->score, range)) return NULL;
return x;
}

  

在瞭解完上面的內容後,下面我們要步入正題:如何根據分值區間進行正向或逆向查詢節點。在下面程式碼<1>處,會根據逆向狀態選擇ln是指向區間分值最大的節點,或是分值最小的節點。在定位到起始節點後,會在<2>處的while迴圈對節點進行偏移,如果到達偏移位置後的ln不為NULL,則會進入<3>處的while迴圈,查詢分值落在區間內的節點,這裡會根據逆向狀態是否不為0,決定是用backward指標後退,還是向L0層的前驅節點遞進,一直到分值不落在區間內跳出while迴圈,或者ln為NULL,又或者limit為0結束while迴圈。如果我們沒有指定偏移(offset)和返回數量(limit),則不會進行偏移,limit預設值為-1,limit--永遠不為0,這裡會返回落在區間內的所有節點,能結束while迴圈只有遇到分值不落在區間內的節點,或者是ln為NULL。

/* This command implements ZRANGEBYSCORE, ZREVRANGEBYSCORE. */
void genericZrangebyscoreCommand(client *c, int reverse) {
zrangespec range;//指定區間
robj *key = c->argv[1];
robj *zobj;
long offset = 0, limit = -1;//偏移和結果返回數量
int withscores = 0;
unsigned long rangelen = 0;
void *replylen = NULL;
int minidx, maxidx; /*
* Parse the range arguments.
* 解析範圍引數
* ZRANGEBYSCORE key min max和ZREVRANGEBYSCORE key max min兩個命令
* 都是此函式實現的,如果客戶端輸入的命令為ZRANGEBYSCORE,則reverse為0,按
* 從小到大查詢分值及元素,分值小的在前,分值大的在後。如果客戶端輸入的命令為
* ZREVRANGEBYSCORE,則reverse不為0,按從大到小查詢分值及元素,分值大的在前,
* 分值小的在後。
*
* */
if (reverse) {
/* Range is given as [max,min] */
maxidx = 2;
minidx = 3;
} else {
/* Range is given as [min,max] */
minidx = 2;
maxidx = 3;
} if (zslParseRange(c->argv[minidx], c->argv[maxidx], &range) != C_OK) {
addReplyError(c, "min or max is not a float");
return;
} /*
* Parse optional extra arguments. Note that ZCOUNT will exactly have
* 4 arguments, so we'll never enter the following code path.
* 遍歷可選引數,這裡會判斷是否要返回分值(withscores),是否要對查詢結果進行偏移(offset)和數量(limit)的限制
* */
if (c->argc > 4) {
int remaining = c->argc - 4;
int pos = 4; while (remaining) {
if (remaining >= 1 && !strcasecmp(c->argv[pos]->ptr, "withscores")) {
pos++;
remaining--;
withscores = 1;
} else if (remaining >= 3 && !strcasecmp(c->argv[pos]->ptr, "limit")) {
if ((getLongFromObjectOrReply(c, c->argv[pos + 1], &offset, NULL)
!= C_OK) ||
(getLongFromObjectOrReply(c, c->argv[pos + 2], &limit, NULL)
!= C_OK)) {
return;
}
pos += 3;
remaining -= 3;
} else {
addReply(c, shared.syntaxerr);
return;
}
}
} /*
* Ok, lookup the key and get the range
* 如果key所對應的zobj不存在,或者zobj的型別不為zset,則退出。
* */
if ((zobj = lookupKeyReadOrReply(c, key, shared.emptyarray)) == NULL ||
checkType(c, zobj, OBJ_ZSET))
return; if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//壓縮列表流程...
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {//zobj型別為跳躍表
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
zskiplistNode *ln; /*
* If reversed, get the last node in range as starting point.
* 如果是逆向查詢,ln會指向區間分值最大的節點,如果是正向查詢,ln則指向區間分值最小的節點。
* */
if (reverse) {
ln = zslLastInRange(zsl, &range);
} else {
ln = zslFirstInRange(zsl, &range);
} /*
* No "first" element in the specified interval.
* 如果沒有落在區間的開始節點則退出
* */
if (ln == NULL) {
addReply(c, shared.emptyarray);
return;
} /* We don't know in advance how many matching elements there are in the
* list, so we push this object that will represent the multi-bulk
* length in the output buffer, and will "fix" it later */
replylen = addReplyDeferredLen(c); /*
* If there is an offset, just traverse the number of elements without
* checking the score because that is done in the next loop.
* <2>如果有偏移量,則根據reverse狀態選擇是後退還是遞進,以達到偏移量。
* 如果客戶端有傳入偏移,則offset不為0,這裡會迴圈到offset為0或ln為NULL時跳出迴圈。
* 否則offset預設值為0,不會進入此迴圈。
* */
while (ln && offset--) {
if (reverse) {
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
/*
* <3>如果客戶端有傳入偏移和數量,則limit不為0,此時會根據reverse狀態後退或者前進至
* ln為NULL或者limit為0,否則limit預設值為-1,limit--永遠為true(注:只要limit
* 不為0,則永遠為true,即便是負數),這裡就會迴圈到ln為NULL時,獲取所有分值符合區間
* 節點的。
* 除了limit為0,或者ln為NULL會跳出while迴圈,在<4>處還會根據reverse狀態判斷分值是否在區間,
* 如果不在則跳出迴圈,如果分值符合區間,還會在<5>處根據reverse狀態選擇是後退到後一個節點(ln->backward),
* 還是前進到前一個節點(ln->level[0].forward)。
*/
while (ln && limit--) {
/*Abort when the node is no longer in range.*/
if (reverse) {//<4>
if (!zslValueGteMin(ln->score, &range)) break;
} else {
if (!zslValueLteMax(ln->score, &range)) break;
} rangelen++;
if (withscores && c->resp > 2) addReplyArrayLen(c, 2);
addReplyBulkCBuffer(c, ln->ele, sdslen(ln->ele));
if (withscores) addReplyDouble(c, ln->score); /* Move to next node */
if (reverse) {//<5>
ln = ln->backward;
} else {
ln = ln->level[0].forward;
}
}
} else {
serverPanic("Unknown sorted set encoding");
} if (withscores && c->resp == 2) rangelen *= 2;
setDeferredArrayLen(c, replylen, rangelen);
}

  

最後,我們要了解如何獲取一個元素在跳躍表中的索引,其實這裡面的邏輯也是非常的簡單,我們先從字典上獲取節點的分值,然後根據分值及元素獲取其在跳躍表中的索引,這裡依舊支援正向查詢或逆向查詢,如果是正向查詢,分值越小,索引越小,如果分值相等,則元素越小,索引越小;如果是逆向查詢,則分值越大,索引越小,如果分值相等,則元素越大,索引越小。

/* Given a sorted set object returns the 0-based rank of the object or
* -1 if the object does not exist.
* 返回元素在有序集合中的索引,如果返回-1則代表元素不在有序集合內。
*
* For rank we mean the position of the element in the sorted collection
* of elements. So the first element has rank 0, the second rank 1, and so
* forth up to length-1 elements.
* 在跳躍表中第一個元素的索引為0,第二個元素索引為1,以此類推,最後一個元素索引為length-1。
*
* If 'reverse' is false, the rank is returned considering as first element
* the one with the lowest score. Otherwise if 'reverse' is non-zero
* the rank is computed considering as element with rank 0 the one with
* the highest score.
* 如果reverse為0,跳躍表索引從分值最小的節點開始,即zsl->header.level[0].forward索引為0、
* zsl->header.level[0].forward.level[0].forward索引為1,zsl->tail索引為zsl-length-1;
* 如果reverse不為0,跳躍表索引從分值最大的節點開始,即zsl->tail索引為0,zsl->tail.backward索引
* 為1,zsl->header.level[0].forward索引為zsl->length-1
* */
long zsetRank(robj *zobj, sds ele, int reverse) {
unsigned long llen;
unsigned long rank; llen = zsetLength(zobj); if (zobj->encoding == OBJ_ENCODING_ZIPLIST) {
//壓縮列表邏輯……
} else if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {
zset *zs = zobj->ptr;
zskiplist *zsl = zs->zsl;
dictEntry *de;
double score; de = dictFind(zs->dict, ele);
if (de != NULL) {
/*
* 如果元素存在在跳躍表上,則獲取元素的分支,並根據
* 分支判斷其在跳躍表中的跨度,根據跨度計算節點在跳躍表
* 中的索引。
* 如果是正向查詢(reverse為0),則索引為跨度(rank)-1。
* 如果是逆向查詢(reverse不為0),則索引為跳躍表長度(zsl->length)-跨度(rank)。
*/
score = *(double *) dictGetVal(de);
rank = zslGetRank(zsl, score, ele);
/* Existing elements always have a rank. */
serverAssert(rank != 0);
if (reverse)
return llen - rank;
else
return rank - 1;
} else {//如果元素不在跳躍表上,則返回-1
return -1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
}

  

獲取完分值後,需要定位節點在跳躍表中的跨度,然後根據逆向狀態及跨度,計算節點在跳躍表中的索引。

/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
* Note that the rank is 1-based due to the span of zsl->header to the
* first element.
* 根據給定的分支和元素查詢其節點在跳躍表中的跨度,返回0代表節點不存在。
* */
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;
//從頭節點最高層遍歷,如果能前進到前一個節點,則把當前節點的跨度加到rank上
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele, ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
} /*
* x might be equal to zsl->header, so test if obj is non-NULL
* x可能停留在頭節點,此處判斷是保證節點的元素不為NULL。
* */
if (x->ele && sdscmp(x->ele, ele) == 0) {
return rank;
}
}
return 0;
}

  

至此,筆者和大家一起學習了Redis跳躍表的是如何插入節點、刪除節點、更新節點以及如何對跳躍表中的節點進行不同維度(索引、分值)的查詢。跳躍表是一種應用相當廣泛的資料結構,很多場景下人們都用跳躍表代替B-Tree,因為跳躍表和B-Tree有著一樣的查詢時間複雜度O(logN),但跳躍表的實現卻比B-Tree簡單很多。而Redis正是藉助了跳躍表的思路實現了有序集合,使得很多需要儲存、排序海量資料的業務得以實現,如:微博熱搜或者頭條新聞,都可以使用Redis有序集合來解決。

當然,由於筆者的時間精力有限,這裡並沒有完全介紹所有跳躍表命令的相關實現,但筆者相信能看到這裡的人,基本已經掌握了跳躍表的整體脈絡。如果對跳躍表其餘命令有興趣的朋友,可以自行翻閱Redis原始碼,或者評論私信筆者你們的疑問,如果問題多的話筆者還會針對大家共同的問題進行講解。