1. 程式人生 > >redis原始碼分析與思考(十七)——有序集合型別的命令實現(t_zset.c)

redis原始碼分析與思考(十七)——有序集合型別的命令實現(t_zset.c)

    有序集合是集合的延伸,它儲存著集合元素的不可重複性,但不同的是,它是有序的,它利用每一個元素的分數來作為有序集合的排序依據,現在列出有序集合的命令:

有序集合命令

命令 對應操作 時間複雜度
zadd key score member [score member…] 新增成員 O(n)
zcard key 計算成員個數 O(1)
zscore key member 計算成員的分數 O(1)
zrank key member 計算成員的排名 O(logn)
zrem key member [member…] 刪除成員 O(logn)
zincrby key increment member 增加成員的分數 O(logn)
zrange key start end [withscores] 返回指定排名的成員 O(logn)
zrangebyscore key min max [withscores] 返回指定分數範圍內的成員 O(logn)
zcount key min max 返回指定分數範圍內的成員個數 O(logn)
zremrangebyrank key start end 刪除第start到第end名的成員 O(logn)
zremrangebyscore key min max 刪除指定分數範圍內的成員 O(logn)
zinterstore destination numkeys key [key…] 計算交集儲存在目標鍵 O(logn)
zunionstore destination numkeys key [key…] 就算並集儲存在目標鍵 O(logn)

    與集合型別一樣,有序集合也只講交集與並集以及編碼的轉換,有序集合的其它操作可參照之前寫的壓縮列表與跳躍表的實現。

編碼的轉換

    當有序集合裡面的元素個數小於預設的128個,且每個元素的值都小於64位元組的時候,redis會採用壓縮列表來實現存貯,不滿足上述任意情況都將實行編碼轉換,因為那時壓縮列表的效率會變低。但是與集合不同的是,編碼轉換會迴轉,即有序集合滿足上述情況下,跳躍表編碼的有序集合會回到壓縮列表編碼的有序集合:

/*
 * 將跳躍表物件 zobj 的底層編碼轉換為 encoding 。
 */
void zsetConvert(robj *zobj, int encoding) {
    zset *zs;
    zskiplistNode *node, *next;
    robj *ele;
    double score;
    if (zobj->encoding == encoding) return;
    // 從 ZIPLIST 編碼轉換為 SKIPLIST 編碼
    if (zobj->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl = zobj->ptr;
        unsigned char *eptr, *sptr;
        unsigned char *vstr;
        unsigned int vlen;
        long long vlong;
        if (encoding != REDIS_ENCODING_SKIPLIST)
            redisPanic("Unknown target encoding");
        // 建立有序集合結構
        zs = zmalloc(sizeof(*zs));
        // 字典
        zs->dict = dictCreate(&zsetDictType,NULL);
        // 跳躍表
        zs->zsl = zslCreate();
        // 有序集合在 ziplist 中的排列:
        // 指向 ziplist 中的首個節點(儲存著元素成員)
        eptr = ziplistIndex(zl,0);
        redisAssertWithInfo(NULL,zobj,eptr != NULL);
        // 指向 ziplist 中的第二個節點(儲存著元素分值)
        sptr = ziplistNext(zl,eptr);
        redisAssertWithInfo(NULL,zobj,sptr != NULL);
        // 遍歷所有 ziplist 節點,並將元素的成員和分值新增到有序集合中
        while (eptr != NULL) {    
            // 取出分值
            score = zzlGetScore(sptr);
            // 取出成員
redisAssertWithInfo(NULL,zobj,ziplistGet(eptr,&vstr,&vlen,&vlong));
            if (vstr == NULL)
                ele = createStringObjectFromLongLong(vlong);
            else
                ele = createStringObject((char*)vstr,vlen);
            // 將成員和分值分別關聯到跳躍表和字典中
            node = zslInsert(zs->zsl,score,ele);
            redisAssertWithInfo(NULL,zobj,dictAdd(zs->dict,ele,&node->score) == DICT_OK);
            incrRefCount(ele); /* Added to dictionary. */
            // 移動指標,指向下個元素
            zzlNext(zl,&eptr,&sptr);
        }
        // 釋放原來的 ziplist
        zfree(zobj->ptr);
        // 更新物件的值,以及編碼方式
        zobj->ptr = zs;
        zobj->encoding = REDIS_ENCODING_SKIPLIST;
    // 從 SKIPLIST 轉換為 ZIPLIST 編碼
    } else if (zobj->encoding == REDIS_ENCODING_SKIPLIST) {
        // 新的 ziplist
        unsigned char *zl = ziplistNew();
        if (encoding != REDIS_ENCODING_ZIPLIST)
            redisPanic("Unknown target encoding");
        // 指向跳躍表
        zs = zobj->ptr;
        // 先釋放字典,因為只需要跳躍表就可以遍歷整個有序集合了
        dictRelease(zs->dict);
        // 指向跳躍表首個節點
        node = zs->zsl->header->level[0].forward;
        // 釋放跳躍表表頭
        zfree(zs->zsl->header);
        zfree(zs->zsl);
        // 遍歷跳躍表,取出裡面的元素,並將它們新增到 ziplist
        while (node) {
            // 取出解碼後的值物件
            ele = getDecodedObject(node->obj);
            // 新增元素到 ziplist
            zl = zzlInsertAt(zl,NULL,ele,node->score);
            decrRefCount(ele);
            // 沿著跳躍表的第 0 層前進
            next = node->level[0].forward;
            zslFreeNode(node);
            node = next;
        }
        // 釋放跳躍表
        zfree(zs);
        // 更新物件的值,以及物件的編碼方式
        zobj->ptr = zl;
        zobj->encoding = REDIS_ENCODING_ZIPLIST;
    } else {
        redisPanic("Unknown sorted set encoding");
    }
}

交集、並集

    有序集合的交集與並集的底層實現也在一個函式裡面,與集合的交併集計算原理差不多,多了權值的計算,並集的時候沒有演算法的選擇:

/**
 *
 * @param c 客戶端
 * @param dstkey  儲存的目標有序集合鍵
 * @param op 判斷交集還是並集
 */
void zunionInterGenericCommand(redisClient *c, robj *dstkey, int op) {
    int i, j;
    long setnum;
    int aggregate = REDIS_AGGR_SUM;
    zsetopsrc *src;
    zsetopval zval;
    robj *tmp;
    unsigned int maxelelen = 0;
    robj *dstobj;
    zset *dstzset;
    zskiplistNode *znode;
    int touched = 0;
    /* expect setnum input keys to be given */
    // 取出要處理的有序集合的個數 setnum
    if ((getLongFromObjectOrReply(c, c->argv[2], &setnum, NULL) != REDIS_OK))
        return;
    if (setnum < 1) {
        //向客戶端返回錯誤資訊
        addReplyError(c,
            "at least 1 input key is needed for ZUNIONSTORE/ZINTERSTORE");
        return;
    }
    /* test if the expected number of keys would overflow */
    // setnum 引數和傳入的 key 數量不相同,出錯
    if (setnum > c->argc-3) {
        addReply(c,shared.syntaxerr);
        return;
    }
    /* read keys to be used for input */
    // 為每個輸入 key 建立一個迭代器
    src = zcalloc(sizeof(zsetopsrc) * setnum);
    for (i = 0, j = 3; i < setnum; i++, j++) {
        // 取出 key 物件
        robj *obj = lookupKeyWrite(c->db,c->argv[j]);
        // 建立迭代器
        if (obj != NULL) {
            if (obj->type != REDIS_ZSET && obj->type != REDIS_SET) {
                zfree(src);
                addReply(c,shared.wrongtypeerr);
                return;
            }
            src[i].subject = obj;
            src[i].type = obj->type;
            src[i].encoding = obj->encoding;
        // 不存在的物件設為 NULL
        } else {
            src[i].subject = NULL;
        }
        /* Default all weights to 1. */
        // 預設權重為 1.0
        src[i].weight = 1.0;
    }
    /* parse optional extra arguments */
    // 分析並讀入可選引數
    if (j < c->argc) {
        int remaining = c->argc - j;
        while (remaining) {
            if (remaining >= (setnum + 1) && !strcasecmp(c->argv[j]->ptr,"weights")) {
                j++; remaining--;
                // 權重引數
                for (i = 0; i < setnum; i++, j++, remaining--) {
                    if (getDoubleFromObjectOrReply(c,c->argv[j],&src[i].weight,
                            "weight value is not a float") != REDIS_OK)
                    {
                        zfree(src);
                        return;
                    }
                }
            } else if (remaining >= 2 && !strcasecmp(c->argv[j]->ptr,"aggregate")) {
                j++; remaining--;
                // 判斷聚合方式
                if (!strcasecmp(c->argv[j]->ptr,"sum")) {
                    aggregate = REDIS_AGGR_SUM;
                } else if (!strcasecmp(c->argv[j]->ptr,"min")) {
                    aggregate = REDIS_AGGR_MIN;
                } else if (!strcasecmp(c->argv[j]->ptr,"max")) {
                    aggregate = REDIS_AGGR_MAX;
                } else {
                    zfree(src);
                    addReply(c,shared.syntaxerr);
                    return;
                }
                j++; remaining--;
            } else {
                zfree(src);
                addReply(c,shared.syntaxerr);
                return;
            }
        }
    }
    /* sort sets from the smallest to largest, this will improve our
     * algorithm's performance */
    // 對所有集合進行排序,以減少演算法的常數項
    qsort(src,setnum,sizeof(zsetopsrc),zuiCompareByCardinality);
    // 建立結果集物件
    dstobj = createZsetObject();
    dstzset = dstobj->ptr;
    memset(&zval, 0, sizeof(zval));
    // ZINTERSTORE 命令
    if (op == REDIS_OP_INTER) {
        /* Skip everything if the smallest input is empty. */
        // 只處理非空集合
        if (zuiLength(&src[0]) > 0) {
            /* Precondition: as src[0] is non-empty and the inputs are ordered
             * by size, all src[i > 0] are non-empty too. */
            // 遍歷基數最小的 src[0] 集合
            zuiInitIterator(&src[0]);
            while (zuiNext(&src[0],&zval)) {
                double score, value;
                // 計算加權分值
                score = src[0].weight * zval.score;
                if (isnan(score)) score = 0;
                // 將 src[0] 集合中的元素和其他集合中的元素做加權聚合計算
                for (j = 1; j < setnum; j++) {
                    /* It is not safe to access the zset we are
                     * iterating, so explicitly check for equal object. */
                    // 如果當前迭代到的 src[j] 的物件和 src[0] 的物件一樣,
                    // 那麼 src[0] 出現的元素必然也出現在 src[j]
                    // 那麼我們可以直接計算聚合值,
                    // 不必進行 zuiFind 去確保元素是否出現
                    // 這種情況在某個 key 輸入了兩次,
                    // 並且這個 key 是所有輸入集合中基數最小的集合時會出現
                    if (src[j].subject == src[0].subject) {
                        value = zval.score*src[j].weight;
   zunionInterAggregate(&score,value,aggregate);
                    // 如果能在其他集合找到當前迭代到的元素的話
                    // 那麼進行聚合計算
                    } else if (zuiFind(&src[j],&zval,&value)) {
                        value *= src[j].weight;
      zunionInterAggregate(&score,value,aggregate);
                    // 如果當前元素沒出現在某個集合,那麼跳出 for 迴圈
                    // 處理下個元素
                    } else {
                        break;
                    }
                }
                /* Only continue when present in every input. */
                // 只在交集元素出現時,才執行以下程式碼
                if (j == setnum) {
                    // 取出值物件
                    tmp = zuiObjectFromValue(&zval);
                    // 加入到有序集合中
                    znode = zslInsert(dstzset->zsl,score,tmp);
                    incrRefCount(tmp); /* added to skiplist */
                    // 加入到字典中
                    dictAdd(dstzset->dict,tmp,&znode->score);
                    incrRefCount(tmp); /* added to dictionary */
                    // 更新字串物件的最大長度
                    if (sdsEncodedObject(tmp)) {
                        if (sdslen(tmp->ptr) > maxelelen)
                            maxelelen = sdslen(tmp->ptr);
                    }
                }
            }
            zuiClearIterator(&src[0]);
        }
    // ZUNIONSTORE
    } else if (op == REDIS_OP_UNION) {
        // 遍歷所有輸入集合
        for (i = 0; i < setnum; i++) {
            // 跳過空集合
            if (zuiLength(&src[i]) == 0)
                continue;
            // 遍歷所有集合元素
            zuiInitIterator(&src[i]);
            while (zuiNext(&src[i],&zval)) {
                double score, value;
                /* Skip an element that when already processed */
                // 跳過已處理元素
                if (dictFind(dstzset->dict,zuiObjectFromValue(&zval)) != NULL)
                    continue;
                /* Initialize score */
                // 初始化分值
                score = src[i].weight * zval.score;
                // 溢位時設為 0
                if (isnan(score)) score = 0;
                /* We need to check only next sets to see if this element
                 * exists, since we process every element just one time so
                 * it can't exist in a previous set (otherwise it would be
                 * already processed). */
                for (j = (i+1); j < setnum; j++) {
                    /* It is not safe to access the zset we are
                     * iterating, so explicitly check for equal object. */
                    // 當前元素的集合和被迭代集合一樣
                    // 所以同一個元素必然出現在 src[j] 和 src[i]
                    // 程式直接計算它們的聚合值
                    // 而不必使用 zuiFind 來檢查元素是否存在
                    if(src[j].subject == src[i].subject) {
                        value = zval.score*src[j].weight;
                        zunionInterAggregate(&score,value,aggregate);

                    // 檢查成員是否存在
                    } else if (zuiFind(&src[j],&zval,&value)) {
                        value *= src[j].weight;
                        zunionInterAggregate(&score,value,aggregate);
                    }
                }
                // 取出成員
                tmp = zuiObjectFromValue(&zval);
                // 插入並集元素到跳躍表
                znode = zslInsert(dstzset->zsl,score,tmp);
                incrRefCount(zval.ele); /* added to skiplist */
                // 新增元素到字典
                dictAdd(dstzset->dict,tmp,&znode->score);
                incrRefCount(zval.ele); /* added to dictionary */
                // 更新字串最大長度
                if (sdsEncodedObject(tmp)) {
                    if (sdslen(tmp->ptr) > maxelelen)
                        maxelelen = sdslen(tmp->ptr);
                }
            }
            zuiClearIterator(&src[i]);
        }
    } else {
        redisPanic("Unknown operator");
    }
    // 刪除已存在的 dstkey ,等待後面用新物件代替它
    if (dbDelete(c->db,dstkey)) {
        signalModifiedKey(c->db,dstkey);
        touched = 1;
        server.dirty++;
    }
    // 如果結果集合的長度不為 0 
    if (dstzset->zsl->length) {
        /* Convert to ziplist when in limits. */
        // 看是否需要對結果集合進行編碼轉換
        if (dstzset->zsl->length <= server.zset_max_ziplist_entries &&