redis原始碼分析與思考(十五)——雜湊型別的命令實現(t_hash.c)
阿新 • • 發佈:2018-11-12
雜湊型別又叫做字典,在redis中,雜湊型別本身是一個鍵值對,而雜湊型別裡面也存貯著鍵值對,其對應關係是,每個雜湊型別的值對應著一個鍵值對或多對鍵值對,如圖所示:
雜湊型別命令
命令 | 對應操作 | 時間複雜度 |
---|---|---|
hset key field value | 新增一個域值對 | O(1) |
hget key field | 獲取域值 | O(1) |
hdel key field [field…] | 刪除多個域值對 | O(n) |
hlen key | 該雜湊表的長度 | O(1) |
hgetall key | 獲取全部的域值對 | O(n) |
hmget field [field…] | 獲取多個指定鍵的域值對 | O(n) |
hmset field value [field…] | 設定多個域值對 | O(n) |
hexists key field | 檢測其中是否有了field鍵 | O(1) |
hkeys key | 列出所有的域名 | O(n) |
hvals key | 列出所有的域值 | O(n) |
hsetnx key field value | 增加一個之前必須不存在的域值 | O(1) |
hincrby key field increment | 自增一段 | O(1) |
hincrbyfloat key field increment | 浮點數自增 | O(1) |
hstrlen key field | 獲取其對應域值的字串長度 | O(1) |
編碼的轉換
當每個元素的值的位元組長都小於預設的64位元組時,以及總長度小於預設的512個時,雜湊物件會採用ziplist來儲存資料,在插入新的元素的時候都會檢查是否會滿足這兩個條件,不滿足則進行編碼的轉換:
# hash_max_ziplist_value 64
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
// 如果物件不是 ziplist 編碼,那麼直接返回
if (o->encoding != REDIS_ENCODING_ZIPLIST) return;
// 檢查所有輸入物件,看它們的字串值是否超過了指定長度
for (i = start; i <= end; i++) {
if (sdsEncodedObject(argv[i]) &&
sdslen(argv[i]->ptr) > server.hash_max_ziplist_value)
{
// 將物件的編碼轉換成 REDIS_ENCODING_HT
hashTypeConvert(o, REDIS_ENCODING_HT);
break;
}
}
}
/*
* 將一個 ziplist 編碼的雜湊物件 o 轉換成其他編碼
*/
void hashTypeConvertZiplist(robj *o, int enc) {
redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
// 如果輸入是 ZIPLIST ,那麼不做動作
if (enc == REDIS_ENCODING_ZIPLIST) {
/* Nothing to do... */
// 轉換成 HT 編碼
} else if (enc == REDIS_ENCODING_HT) {
hashTypeIterator *hi;
dict *dict;
int ret;
// 建立雜湊迭代器
hi = hashTypeInitIterator(o);
// 建立空白的新字典
dict = dictCreate(&hashDictType, NULL);
// 遍歷整個 ziplist
while (hashTypeNext(hi) != REDIS_ERR) {
robj *field, *value;
// 取出 ziplist 裡的鍵
field = hashTypeCurrentObject(hi, REDIS_HASH_KEY);
field = tryObjectEncoding(field);
// 取出 ziplist 裡的值
value = hashTypeCurrentObject(hi, REDIS_HASH_VALUE);
value = tryObjectEncoding(value);
// 將鍵值對新增到字典
ret = dictAdd(dict, field, value);
if (ret != DICT_OK) {
redisLogHexDump(REDIS_WARNING,"ziplist with dup elements dump",
o->ptr,ziplistBlobLen(o->ptr));
redisAssert(ret == DICT_OK);
}
}
// 釋放 ziplist 的迭代器
hashTypeReleaseIterator(hi);
// 釋放物件原來的 ziplist
zfree(o->ptr);
// 更新雜湊的編碼和值物件
o->encoding = REDIS_ENCODING_HT;
o->ptr = dict;
} else {
redisPanic("Unknown hash encoding");
}
}
/*
* 對雜湊物件 o 的編碼方式進行轉換
*
* 目前只支援將 ZIPLIST 編碼轉換成 HT 編碼
*/
void hashTypeConvert(robj *o, int enc) {
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
hashTypeConvertZiplist(o, enc);
} else if (o->encoding == REDIS_ENCODING_HT) {
redisPanic("Not implemented");
} else {
redisPanic("Unknown hash encoding");
}
}
檢查域是否存在
步驟是先判斷是哪種編碼型別,然後呼叫對應的API介面,返回一個值表示成功:
/*
* 引數:
* field 域
* vstr 值是字串時,將它儲存到這個指標
* vlen 儲存字串的長度
* ll 值是整數時,將它儲存到這個指標
* 查詢失敗時,函式返回 -1 。
* 查詢成功時,返回 0 。
*/
int hashTypeGetFromZiplist(robj *o, robj *field,
unsigned char **vstr,
unsigned int *vlen,
long long *vll)
{
unsigned char *zl, *fptr = NULL, *vptr = NULL;
int ret;
// 確保編碼正確
redisAssert(o->encoding == REDIS_ENCODING_ZIPLIST);
// 取出未編碼的域
field = getDecodedObject(field);
// 遍歷 ziplist ,查詢域的位置
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 定位包含域的節點
fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
if (fptr != NULL) {
// 域已經找到,取出和它相對應的值的位置
vptr = ziplistNext(zl, fptr);
redisAssert(vptr != NULL);
}
}
decrRefCount(field);
// 從 ziplist 節點中取出值
if (vptr != NULL) {
ret = ziplistGet(vptr, vstr, vlen, vll);
redisAssert(ret);
return 0;
}
// 沒找到
return -1;
}
/*
* 從 REDIS_ENCODING_HT 編碼的 hash 中取出和 field 相對應的值。
* 成功找到值時返回 0 ,沒找到返回 -1 。
*/
int hashTypeGetFromHashTable(robj *o, robj *field, robj **value) {
dictEntry *de;
// 確保編碼正確
redisAssert(o->encoding == REDIS_ENCODING_HT);
// 在字典中查詢域(鍵)
de = dictFind(o->ptr, field);
// 鍵不存在
if (de == NULL) return -1;
// 取出域(鍵)的值
*value = dictGetVal(de);
// 成功找到
return 0;
}
/*
* 多型 GET 函式,從 hash 中取出域 field 的值,並返回一個值物件。
* 找到返回值物件,沒找到返回 NULL 。
*/
robj *hashTypeGetObject(robj *o, robj *field) {
robj *value = NULL;
// 從 ziplist 中取出值
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
if (hashTypeGetFromZiplist(o, field, &vstr, &vlen, &vll) == 0) {
// 建立值物件
if (vstr) {
value = createStringObject((char*)vstr, vlen);
} else {
value = createStringObjectFromLongLong(vll);
}
}
// 從字典中取出值
} else if (o->encoding == REDIS_ENCODING_HT) {
robj *aux;
if (hashTypeGetFromHashTable(o, field, &aux) == 0) {
incrRefCount(aux);
value = aux;
}
} else {
redisPanic("Unknown hash encoding");
}
// 返回值物件,或者 NULL
return value;
}
新增元素
下面給出HSET系列命令的底層實現:
/*
* 將給定的 field-value 對新增到 hash 中,
* 如果 field 已經存在,那麼刪除舊的值,並關聯新值。
* 這個函式負責對 field 和 value 引數進行引用計數自增。
* 返回 0 表示元素已經存在,這次函式呼叫執行的是更新操作。
* 返回 1 則表示函式執行的是新新增操作。
*/
int hashTypeSet(robj *o, robj *field, robj *value) {
int update = 0;
// 新增到 ziplist
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr, *vptr;
// 解碼成字串或者數字,ziplist需要編碼與解碼操作
field = getDecodedObject(field);
value = getDecodedObject(value);
// 遍歷整個 ziplist ,嘗試查詢並更新 field (如果它已經存在的話)
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 定位到域 field
fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
if (fptr != NULL) {
/* Grab pointer to the value (fptr points to the field) */
// 定位到域的值
vptr = ziplistNext(zl, fptr);
redisAssert(vptr != NULL);
// 標識這次操作為更新操作
update = 1;
/* Delete value */
// 刪除舊的鍵值對
zl = ziplistDelete(zl, &vptr);
/* Insert new value */
// 新增新的鍵值對
zl = ziplistInsert(zl, vptr, value->ptr, sdslen(value->ptr));
}
}
// 如果這不是更新操作,那麼這就是一個新增操作
if (!update) {
/* Push new field/value pair onto the tail of the ziplist */
// 將新的 field-value 對推入到 ziplist 的末尾
zl = ziplistPush(zl, field->ptr, sdslen(field->ptr), ZIPLIST_TAIL);
zl = ziplistPush(zl, value->ptr, sdslen(value->ptr), ZIPLIST_TAIL);
}
// 更新物件指標
o->ptr = zl;
// 釋放臨時物件
decrRefCount(field);
decrRefCount(value);
/* Check if the ziplist needs to be converted to a hash table */
// 檢查在新增操作完成之後,是否需要將 ZIPLIST 編碼轉換成 HT 編碼
if (hashTypeLength(o) > server.hash_max_ziplist_entries)
hashTypeConvert(o, REDIS_ENCODING_HT);
// 新增到字典
} else if (o->encoding == REDIS_ENCODING_HT) {
// 新增或替換鍵值對到字典
// 新增返回 1 ,替換返回 0
if (dictReplace(o->ptr, field, value)) { /* Insert */
incrRefCount(field);
} else { /* Update */
update = 1;
}
incrRefCount(value);
} else {
redisPanic("Unknown hash encoding");
}
// 更新/新增指示變數
return update;
}
刪除元素
/*
* 刪除成功返回 1 ,因為域不存在而造成的刪除失敗返回 0 。
*/
int hashTypeDelete(robj *o, robj *field) {
int deleted = 0;
// 從 ziplist 中刪除
if (o->encoding == REDIS_ENCODING_ZIPLIST) {
unsigned char *zl, *fptr;
field = getDecodedObject(field);
zl = o->ptr;
fptr = ziplistIndex(zl, ZIPLIST_HEAD);
if (fptr != NULL) {
// 定位到域
fptr = ziplistFind(fptr, field->ptr, sdslen(field->ptr), 1);
if (fptr != NULL) {
// 刪除域和值
zl = ziplistDelete(zl,&fptr);
zl = ziplistDelete(zl,&fptr);
o->ptr = zl;
deleted = 1;
}
}
decrRefCount(field);
// 從字典中刪除
} else if (o->encoding == REDIS_ENCODING_HT) {
if (dictDelete((dict*)o->ptr, field) == REDIS_OK) {
deleted = 1;
/* Always check if the dictionary needs a resize after a delete. */
// 刪除成功時,看字典是否需要收縮
if (htNeedsResize(o->ptr)) dictResize(o->ptr);
}
} else {
redisPanic("Unknown hash encoding");
}
return deleted;
}
獲取所有域
在講解資料庫遍歷時談到redis採取了兩種不同的遍歷方式,全量遍歷與漸進式遍歷,先來看看全量遍歷KEYS命令的底層實現:
void genericHgetallCommand(redisClient *c, int flags) {
robj *o;
hashTypeIterator *hi;
int multiplier = 0;
int length, count = 0;
// 取出雜湊物件
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,REDIS_HASH)) return;
// 計算要取出的元素數量
if (flags & REDIS_HASH_KEY) multiplier++;
if (flags & REDIS_HASH_VALUE) multiplier++;
length = hashTypeLength(o) * multiplier;
addReplyMultiBulkLen(c, length);
// 迭代節點,並取出元素
hi = hashTypeInitIterator(o);
while (hashTypeNext(hi) != REDIS_ERR) {
// 取出鍵
if (flags & REDIS_HASH_KEY) {
addHashIteratorCursorToReply(c, hi, REDIS_HASH_KEY);
count++;
}
// 取出值
if (flags & REDIS_HASH_VALUE) {
addHashIteratorCursorToReply(c, hi, REDIS_HASH_VALUE);
count++;
}
}
// 釋放迭代器
hashTypeReleaseIterator(hi);
redisAssert(count == length);
}
漸進式遍歷,直接呼叫資料庫中scanGenericCommand方法:
void hscanCommand(redisClient *c) {
robj *o;
unsigned long cursor;
//表示開始漸進式遍歷
if (parseScanCursorOrReply(c,c->argv[2],&cursor) == REDIS_ERR) return;
//讀取物件,檢查型別
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptyscan)) == NULL ||
checkType(c,o,REDIS_HASH)) return;
scanGenericCommand(c,o,cursor);
}
關於自增、獲取長度等操作與列表型別相似,在此不再重複。