1. 程式人生 > >redis原始碼分析與思考(十五)——雜湊型別的命令實現(t_hash.c)

redis原始碼分析與思考(十五)——雜湊型別的命令實現(t_hash.c)

    雜湊型別又叫做字典,在redis中,雜湊型別本身是一個鍵值對,而雜湊型別裡面也存貯著鍵值對,其對應關係是,每個雜湊型別的值對應著一個鍵值對或多對鍵值對,如圖所示:
在這裡插入圖片描述

雜湊型別命令

命令 對應操作 時間複雜度
hset key field value 新增一個域值對 O(1)
hget key field 獲取域值 O(1)
hdel key field [field…] 刪除多個域值對 O(n)
hlen key 該雜湊表的長度 O(1)
hgetall key 獲取全部的域值對 O(n)
hmget field [field…] 獲取多個指定鍵的域值對 O(n)
hmset field value [field…] 設定多個域值對 O(n)
hexists key field 檢測其中是否有了field鍵 O(1)
hkeys key 列出所有的域名 O(n)
hvals key 列出所有的域值 O(n)
hsetnx key field value 增加一個之前必須不存在的域值 O(1)
hincrby key field increment 自增一段 O(1)
hincrbyfloat key field increment 浮點數自增 O(1)
hstrlen key field 獲取其對應域值的字串長度 O(1)

編碼的轉換

    當每個元素的值的位元組長都小於預設的64位元組時,以及總長度小於預設的512個時,雜湊物件會採用ziplist來儲存資料,在插入新的元素的時候都會檢查是否會滿足這兩個條件,不滿足則進行編碼的轉換:

# hash_max_ziplist_value 64
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
    int i;
    // 如果物件不是 ziplist 編碼,那麼直接返回
    if (o->encoding != REDIS_ENCODING_ZIPLIST) return;
    // 檢查所有輸入物件,看它們的字串值是否超過了指定長度
    for (i = start; i <= end; i++) {
        if (sdsEncodedObject(argv[i]) &&
            sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
        {
            // 將物件的編碼轉換成 REDIS_ENCODING_HT
            hashTypeConvert(o, REDIS_ENCODING_HT);
            break;
        }
    }
}

/*
 * 將一個 ziplist 編碼的雜湊物件 o 轉換成其他編碼
 */
void hashTypeConvertZiplist(robj *o, int enc) {
    redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
    // 如果輸入是 ZIPLIST ,那麼不做動作
    if (enc == REDIS_ENCODING_ZIPLIST) {
        /* Nothing to do... */
    // 轉換成 HT 編碼
    } else if (enc == REDIS_ENCODING_HT) {
        hashTypeIterator *hi;
        dict *dict;
        int ret;
        // 建立雜湊迭代器
        hi = hashTypeInitIterator(o);
        // 建立空白的新字典
        dict = dictCreate(&hashDictType, NULL);
        // 遍歷整個 ziplist
        while (hashTypeNext(hi) != REDIS_ERR) {
            robj *field, *value;
            // 取出 ziplist 裡的鍵
            field = hashTypeCurrentObject(hi, REDIS_HASH_KEY);
            field = tryObjectEncoding(field);
            // 取出 ziplist 裡的值
            value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
            value = tryObjectEncoding(value);

            // 將鍵值對新增到字典
            ret = dictAdd(dict, field, value);
            if (ret != DICT_OK) {
                redisLogHexDump(REDIS_WARNING,"ziplist with dup elements dump",
                    o->ptr,ziplistBlobLen(o->ptr));
                redisAssert(ret == DICT_OK);
            }
        }
       // 釋放 ziplist 的迭代器
        hashTypeReleaseIterator(hi);
        // 釋放物件原來的 ziplist
        zfree(o->ptr);
        // 更新雜湊的編碼和值物件
        o->encoding = REDIS_ENCODING_HT;
        o->ptr = dict;
    } else {
        redisPanic("Unknown hash encoding");
    }
}

/*
 * 對雜湊物件 o 的編碼方式進行轉換
 *
 * 目前只支援將 ZIPLIST 編碼轉換成 HT 編碼
 */
void hashTypeConvert(robj *o, int enc) {
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        hashTypeConvertZiplist(o, enc);
    } else if (o->encoding == REDIS_ENCODING_HT) {
        redisPanic("Not implemented");
    } else {
        redisPanic("Unknown hash encoding");
    }
}

檢查域是否存在

    步驟是先判斷是哪種編碼型別,然後呼叫對應的API介面,返回一個值表示成功:

/* 
 * 引數:
 *  field   域
 *  vstr    值是字串時,將它儲存到這個指標
 *  vlen    儲存字串的長度
 *  ll      值是整數時,將它儲存到這個指標
 * 查詢失敗時,函式返回 -1 。
 * 查詢成功時,返回 0 。
 */
int hashTypeGetFromZiplist(robj *o, robj *field,
                           unsigned char **vstr,
                           unsigned int *vlen,
                           long long *vll)
{
    unsigned char *zl, *fptr = NULL, *vptr = NULL;
    int ret;
    // 確保編碼正確
    redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
    // 取出未編碼的域
    field = getDecodedObject(field);
    // 遍歷 ziplist ,查詢域的位置
    zl = o->ptr;
    fptr = ziplistIndex(zl, ZIPLIST_HEAD);
    if (fptr != NULL) {
        // 定位包含域的節點
        fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
        if (fptr != NULL) {
            // 域已經找到,取出和它相對應的值的位置
            vptr = ziplistNext(zl, fptr);
            redisAssert(vptr != NULL);
        }
    }
    decrRefCount(field);
    // 從 ziplist 節點中取出值
    if (vptr != NULL) {
        ret = ziplistGet(vptr, vstr, vlen, vll);
        redisAssert(ret);
        return 0;
    }
    // 沒找到
    return -1;
}

/*
 * 從 REDIS_ENCODING_HT 編碼的 hash 中取出和 field 相對應的值。
 * 成功找到值時返回 0 ,沒找到返回 -1 。
 */
int hashTypeGetFromHashTable(robj *o, robj *field, robj **value) {
    dictEntry *de;
    // 確保編碼正確
    redisAssert(o->encoding == REDIS_ENCODING_HT);
    // 在字典中查詢域(鍵)
    de = dictFind(o->ptr, field);
    // 鍵不存在
    if (de == NULL) return -1;
    // 取出域(鍵)的值
    *value = dictGetVal(de);
    // 成功找到
    return 0;
}

/* 
 * 多型 GET 函式,從 hash 中取出域 field 的值,並返回一個值物件。
 * 找到返回值物件,沒找到返回 NULL 。
 */
robj *hashTypeGetObject(robj *o, robj *field) {
    robj *value = NULL;
    // 從 ziplist 中取出值
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *vstr = NULL;
        unsigned int vlen = UINT_MAX;
        long long vll = LLONG_MAX;
        if (hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll) == 0) {
            // 建立值物件
            if (vstr) {
                value = createStringObject((char*)vstr, vlen);
            } else {
                value = createStringObjectFromLongLong(vll);
            }
        }
    // 從字典中取出值
    } else if (o->encoding == REDIS_ENCODING_HT) {
        robj *aux;
        if (hashTypeGetFromHashTable(o, field, &aux) == 0) {
            incrRefCount(aux);
            value = aux;
        }
    } else {
        redisPanic("Unknown hash encoding");
    }
    // 返回值物件,或者 NULL
    return value;
}

新增元素

    下面給出HSET系列命令的底層實現:

/* 
 * 將給定的 field-value 對新增到 hash 中,
 * 如果 field 已經存在,那麼刪除舊的值,並關聯新值。
 * 這個函式負責對 field 和 value 引數進行引用計數自增。
 * 返回 0 表示元素已經存在,這次函式呼叫執行的是更新操作。
 * 返回 1 則表示函式執行的是新新增操作。
 */
int hashTypeSet(robj *o, robj *field, robj *value) {
    int update = 0;
    // 新增到 ziplist
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr, *vptr;
        // 解碼成字串或者數字,ziplist需要編碼與解碼操作
        field = getDecodedObject(field);
        value = getDecodedObject(value);
        // 遍歷整個 ziplist ,嘗試查詢並更新 field (如果它已經存在的話)
        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            // 定位到域 field
            fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
            if (fptr != NULL) {
                /* Grab pointer to the value (fptr points to the field) */
                // 定位到域的值
                vptr = ziplistNext(zl, fptr);
                redisAssert(vptr != NULL);
                // 標識這次操作為更新操作
                update = 1;
                /* Delete value */
                // 刪除舊的鍵值對
                zl = ziplistDelete(zl, &vptr);
                /* Insert new value */
                // 新增新的鍵值對
                zl = ziplistInsert(zl, vptr, value->ptr, sdslen(value->ptr));
            }
        }
        // 如果這不是更新操作,那麼這就是一個新增操作
        if (!update) {
            /* Push new field/value pair onto the tail of the ziplist */
            // 將新的 field-value 對推入到 ziplist 的末尾
            zl = ziplistPush(zl, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
            zl = ziplistPush(zl, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
        }
        // 更新物件指標
        o->ptr = zl;
        // 釋放臨時物件
        decrRefCount(field);
        decrRefCount(value);
        /* Check if the ziplist needs to be converted to a hash table */
        // 檢查在新增操作完成之後,是否需要將 ZIPLIST 編碼轉換成 HT 編碼
        if (hashTypeLength(o) > server.hash_max_ziplist_entries)
            hashTypeConvert(o, REDIS_ENCODING_HT);
    // 新增到字典
    } else if (o->encoding == REDIS_ENCODING_HT) {
        // 新增或替換鍵值對到字典
        // 新增返回 1 ,替換返回 0
        if (dictReplace(o->ptr, field, value)) { /* Insert */
            incrRefCount(field);
        } else { /* Update */
            update = 1;
        }
        incrRefCount(value);
    } else {
        redisPanic("Unknown hash encoding");
    }
    // 更新/新增指示變數
    return update;
}

刪除元素

/* 
 * 刪除成功返回 1 ,因為域不存在而造成的刪除失敗返回 0 。
 */
int hashTypeDelete(robj *o, robj *field) {
    int deleted = 0;
    // 從 ziplist 中刪除
    if (o->encoding == REDIS_ENCODING_ZIPLIST) {
        unsigned char *zl, *fptr;
        field = getDecodedObject(field);
        zl = o->ptr;
        fptr = ziplistIndex(zl, ZIPLIST_HEAD);
        if (fptr != NULL) {
            // 定位到域
            fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
            if (fptr != NULL) {
                // 刪除域和值
                zl = ziplistDelete(zl,&fptr);
                zl = ziplistDelete(zl,&fptr);
                o->ptr = zl;
                deleted = 1;
            }
        }
        decrRefCount(field);
    // 從字典中刪除
    } else if (o->encoding == REDIS_ENCODING_HT) {
        if (dictDelete((dict*)o->ptr, field) == REDIS_OK) {
            deleted = 1;
            /* Always check if the dictionary needs a resize after a delete. */
            // 刪除成功時,看字典是否需要收縮
            if (htNeedsResize(o->ptr)) dictResize(o->ptr);
        }
    } else {
        redisPanic("Unknown hash encoding");
    }
    return deleted;
}

獲取所有域

    在講解資料庫遍歷時談到redis採取了兩種不同的遍歷方式,全量遍歷與漸進式遍歷,先來看看全量遍歷KEYS命令的底層實現:

void genericHgetallCommand(redisClient *c, int flags) {
    robj *o;
    hashTypeIterator *hi;
    int multiplier = 0;
    int length, count = 0;
    // 取出雜湊物件
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
        || checkType(c,o,REDIS_HASH)) return;
    // 計算要取出的元素數量
    if (flags & REDIS_HASH_KEY) multiplier++;
    if (flags & REDIS_HASH_VALUE) multiplier++;
    length = hashTypeLength(o) * multiplier;
    addReplyMultiBulkLen(c, length);
    // 迭代節點,並取出元素
    hi = hashTypeInitIterator(o);
    while (hashTypeNext(hi) != REDIS_ERR) {
        // 取出鍵
        if (flags & REDIS_HASH_KEY) {
            addHashIteratorCursorToReply(c, hi, REDIS_HASH_KEY);
            count++;
        }
        // 取出值
        if (flags & REDIS_HASH_VALUE) {
            addHashIteratorCursorToReply(c, hi, REDIS_HASH_VALUE);
            count++;
        }
    }
    // 釋放迭代器
    hashTypeReleaseIterator(hi);
    redisAssert(count == length);
}

    漸進式遍歷,直接呼叫資料庫中scanGenericCommand方法:

void hscanCommand(redisClient *c) {
    robj *o;
    unsigned long cursor;
    //表示開始漸進式遍歷
    if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;
    //讀取物件,檢查型別
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
        checkType(c,o,REDIS_HASH)) return;
    scanGenericCommand(c,o,cursor);
}

    關於自增、獲取長度等操作與列表型別相似,在此不再重複。