1. 程式人生 > >Redis(三):set/get 命令解析

Redis(三):set/get 命令解析

  經過前兩篇的介紹,我們對整個redis的動作流程已經有比較清晰的認識。

  接下來就是到具體的命令處理方式的理解了,想來我們用這些工具的意義也是在此。雖然沒有人覺得,一個set/get方法會有難度,但是我們畢竟不是很清楚,否則也不至於在談到深處就懵逼了。

  我覺得本文的一個重要意義就是: 讓set/get還原成它本來樣子,和寫"hello world"一樣簡單。

框架性質的東西,我們前面已經講解,就直接進入主題: set/get 的操作。

  set/get 對應的兩個處理函式 (redisCommand) 定義是這樣的: 

    // rF 代表 getCommand 是隻讀命令,又快又準,時間複雜度 O(1)或者O(log(n))
    // wm 代表 setCommand 是個寫命令,當心空間問題
    {"get",getCommand,2,"rF",0,NULL,1,1,1,0,0}
    {"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0}

  所以,我們只要理解了, setCommand,getCommand 之後,就可以完全自信的說,set/get 就是和 "hello world" 一樣簡單了。

 

零、hash 演算法 

  很顯然,kv型的儲存一定是hash相關演算法的實現。那麼redis中如何使用這個hash演算法的呢?

  redis 中許多不同場景的hash演算法,其原型是在 dictType 中定義的。

typedef struct dictType {
    // hash 演算法原型
    unsigned int (*hashFunction)(const void *key);
    void *(*keyDup)(void *privdata, const void *key);
    void *(*valDup)(void *privdata, const void *obj);
    int (*keyCompare)(void *privdata, const void *key1, const void *key2);
    void (*keyDestructor)(void *privdata, void *key);
    void (*valDestructor)(void *privdata, void *obj);
} dictType;

  針對大部分場景,我們的key一般都是 string 型別的,但是還是會稍微有不一樣的。這裡我們就兩個場景來說明下:

1. 命令集構建的hash演算法

  即是 server.commands 中的key的hash演算法,這裡元素是有限的。其定義如下:

/* Command table. sds string -> command struct pointer. */
dictType commandTableDictType = {
    // 即 dictSdsCaseHash 是 command 的hash演算法實現
    dictSdsCaseHash,            /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCaseCompare,      /* key compare */
    dictSdsDestructor,          /* key destructor */
    NULL                        /* val destructor */
};
// server.c, 不區分大小寫的key hash
unsigned int dictSdsCaseHash(const void *key) {
    return dictGenCaseHashFunction((unsigned char*)key, sdslen((char*)key));
}
// dict.c, 不區分大小寫的key-hash演算法: hash * 33 + c
/* And a case insensitive hash function (based on djb hash) */
unsigned int dictGenCaseHashFunction(const unsigned char *buf, int len) {
    // static uint32_t dict_hash_function_seed = 5381;
    unsigned int hash = (unsigned int)dict_hash_function_seed;

    while (len--)
        hash = ((hash << 5) + hash) + (tolower(*buf++)); /* hash * 33 + c */
    return hash;
}

  

2. 針對普通kv查詢的hash演算法

  整個nosql就是kv的增刪改查,所以這是個重要的演算法。

/* Db->dict, keys are sds strings, vals are Redis objects. */
dictType dbDictType = {
    // k 的hash演算法實現
    dictSdsHash,                /* hash function */
    NULL,                       /* key dup */
    NULL,                       /* val dup */
    dictSdsKeyCompare,          /* key compare */
    dictSdsDestructor,          /* key destructor */
    dictObjectDestructor   /* val destructor */
};
// server.c, 呼叫 dict 的實現
unsigned int dictSdsHash(const void *key) {
    return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}
// dict.c, 資料key的hash演算法,或者通用的 string 的hashCode 演算法
/* MurmurHash2, by Austin Appleby
 * Note - This code makes a few assumptions about how your machine behaves -
 * 1. We can read a 4-byte value from any address without crashing
 * 2. sizeof(int) == 4
 *
 * And it has a few limitations -
 *
 * 1. It will not work incrementally.
 * 2. It will not produce the same results on little-endian and big-endian
 *    machines.
 */
unsigned int dictGenHashFunction(const void *key, int len) {
    /* 'm' and 'r' are mixing constants generated offline.
     They're not really 'magic', they just happen to work well.  */
    uint32_t seed = dict_hash_function_seed;
    const uint32_t m = 0x5bd1e995;
    const int r = 24;

    /* Initialize the hash to a 'random' value */
    uint32_t h = seed ^ len;

    /* Mix 4 bytes at a time into the hash */
    const unsigned char *data = (const unsigned char *)key;
    // 核心演算法: step1. *m, ^k>>r, *m, *m, ^k, 每4位做一次運算
    while(len >= 4) {
        uint32_t k = *(uint32_t*)data;

        k *= m;
        k ^= k >> r;
        k *= m;

        h *= m;
        h ^= k;

        data += 4;
        len -= 4;
    }

    /* Handle the last few bytes of the input array  */
    // step2. 倒數第三位 ^<<16, 第二位 ^<<8, 第一位 ^, 然後 *m
    switch(len) {
        case 3: h ^= data[2] << 16;
        case 2: h ^= data[1] << 8;
        case 1: h ^= data[0]; h *= m;
    };

    /* Do a few final mixes of the hash to ensure the last few
     * bytes are well-incorporated. */
    // step3. 再混合 ^>>13, *m, ^>>15
    h ^= h >> 13;
    h *= m;
    h ^= h >> 15;

    return (unsigned int)h;
}

  可以看到,針對普通的字串的hash可是要複雜許多呢,因為這裡資料遠比 command 的資料多,情況更復雜,這樣的演算法唯一的目標就是儘量避免hash衝突。(雖然不知道為啥這麼幹,但它就是牛逼)

  redis中還有其他的hash演算法,比如dictObjHash,dictEncObjHash, 後續有接觸我們再聊。

 

接下來,我們正式來看看 set/get 到底如何?

 

一、getCommand 解析

  很顯然,get 會是個最簡單的命令,自然要檢軟柿子捏了。

// t_string.c
void getCommand(client *c) {
    getGenericCommand(c);
}
int getGenericCommand(client *c) {
    robj *o;
    // 如果在kv裡找不到,則直接響應空,shared.nullbulk 作為全域性常量的優勢就體現出來了
    // shared.nullbulk = createObject(OBJ_STRING,sdsnew("$-1\r\n"));
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return C_OK;
    // 找到對應的資料,但是型別不匹配,說明不能使用 get 命令,響應錯誤資訊
    // shared.wrongtypeerr = "-WRONGTYPE Operation against a key holding the wrong kind of value\r\n"
    if (o->type != OBJ_STRING) {
        addReply(c,shared.wrongtypeerr);
        return C_ERR;
    } else {
        // 正常情況則直接響應結果即可
        addReplyBulk(c,o);
        return C_OK;
    }
}

  整個處理流程果然是異常簡單,感覺人生已經達到了巔峰!但是,我們還沒有看到關鍵,那就是查詢 key 的過程。我們通過之前的介紹,知道有個叫做 redisDb 的東西,看起來它是負責所有的資料管理。它應該不會因為簡單而不儲存某些資料吧。

// db.c, 查詢某個key對應的元素或者直接響應客戶端
robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
    // 使用 c->db 對應的資料庫進行查詢,所以要求客戶端必須針對某db進行操作,且不能跨庫操作是原理決定
    robj *o = lookupKeyRead(c->db, key);
    // 如果沒有查到資料就直接使用預設的 reply, 響應客戶端了
    if (!o) addReply(c,reply);
    return o;
}
// db.c, 讀取key 對應值
robj *lookupKeyRead(redisDb *db, robj *key) {
    robj *val;
    // 檢查過期情況,如果過期,則不用再查了
    if (expireIfNeeded(db,key) == 1) {
        /* Key expired. If we are in the context of a master, expireIfNeeded()
         * returns 0 only when the key does not exist at all, so it's save
         * to return NULL ASAP. */
        if (server.masterhost == NULL) return NULL;

        /* However if we are in the context of a slave, expireIfNeeded() will
         * not really try to expire the key, it only returns information
         * about the "logical" status of the key: key expiring is up to the
         * master in order to have a consistent view of master's data set.
         *
         * However, if the command caller is not the master, and as additional
         * safety measure, the command invoked is a read-only command, we can
         * safely return NULL here, and provide a more consistent behavior
         * to clients accessign expired values in a read-only fashion, that
         * will say the key as non exisitng.
         *
         * Notably this covers GETs when slaves are used to scale reads. */
        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
            server.current_client->cmd->flags & CMD_READONLY)
        {
            return NULL;
        }
    }
    // 然後從db中查詢對應的key值,其實就是一個 hash 查詢
    // 快取命中統計
    val = lookupKey(db,key);
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
// db.c, 過期的處理有點複雜,我們稍後再看,先看 db 的查詢key過程
robj *lookupKey(redisDb *db, robj *key) {
    // 直接在 db->dict 中進行hash查詢即可,前面已經介紹完成,關鍵優化點在增量rehash
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        if (server.rdb_child_pid == -1 && server.aof_child_pid == -1)
            val->lru = LRU_CLOCK();
        return val;
    } else {
        return NULL;
    }
}

// db.c, 接下來看下,檢查過期情況
int expireIfNeeded(redisDb *db, robj *key) {
    mstime_t when = getExpire(db,key);
    mstime_t now;

    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    if (server.loading) return 0;

    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    now = server.lua_caller ? server.lua_time_start : mstime();

    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller,
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
    if (server.masterhost != NULL) return now > when;

    /* Return when this key has not expired */
    // 如果還沒到期就直接返回
    if (now <= when) return 0;

    /* Delete the key */
    server.stat_expiredkeys++;
    // key過期,是一個寫動作,需要傳播到 AOF 或者 slaves...
    propagateExpire(db,key,server.lazyfree_lazy_expire);
    // pub/sub 監控通知
    notifyKeyspaceEvent(NOTIFY_EXPIRED,
        "expired",key,db->id);
    // 同步刪除或者非同步刪除, 稍後討論
    return server.lazyfree_lazy_expire ? dbAsyncDelete(db,key) :
                                         dbSyncDelete(db,key);
}

/* Return the expire time of the specified key, or -1 if no expire
 * is associated with this key (i.e. the key is non volatile) */
long long getExpire(redisDb *db, robj *key) {
    dictEntry *de;
    /* No expire? return ASAP */
    // 查詢過期佇列, 資料量小
    if (dictSize(db->expires) == 0 ||
       (de = dictFind(db->expires,key->ptr)) == NULL) return -1;

    /* The entry was found in the expire dict, this means it should also
     * be present in the main dict (safety check). */
    serverAssertWithInfo(NULL,key,dictFind(db->dict,key->ptr) != NULL);
    // 返回到期時間戳, union 的應用
    return dictGetSignedIntegerVal(de);
}

// 刪除過期資料key的兩種方式,同步+非同步
// db.c, 同步刪除, 刪除 expires 佇列和 dict 資料
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}
// lazyfree.c, 非同步刪除過期資料, 一看就很複雜
/* Delete a key, value, and associated expiration entry if any, from the DB.
 * If there are enough allocations to free the value object may be put into
 * a lazy free list instead of being freed synchronously. The lazy free list
 * will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
    /* Deleting an entry from the expires dict will not free the sds of
     * the key, because it is shared with the main dictionary. */
    if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

    /* If the value is composed of a few allocations, to free in a lazy way
     * is actually just slower... So under a certain limit we just free
     * the object synchronously. */
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {
        robj *val = dictGetVal(de);
        // 判斷刪除的資料的影響範圍,與 資料型別有關,string為1,hash/set則計算count,list計算length
        size_t free_effort = lazyfreeGetFreeEffort(val);

        /* If releasing the object is too much work, let's put it into the
         * lazy free list. */
        if (free_effort > LAZYFREE_THRESHOLD) {
            // 將相關的資料放入佇列中,後臺任務慢慢刪除
            atomicIncr(lazyfree_objects,1,&lazyfree_objects_mutex);
            bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
            // 自身則立即設定為 NULL
            dictSetVal(db->dict,de,NULL);
        }
    }

    /* Release the key-val pair, or just the key if we set the val
     * field to NULL in order to lazy free it later. */
    if (dictDelete(db->dict,key->ptr) == DICT_OK) {
        if (server.cluster_enabled) slotToKeyDel(key);
        return 1;
    } else {
        return 0;
    }
}

  怎麼樣?是不是有一首歌叫涼涼~

  可以說,get操作本身是相當簡單的,在無hash衝突前提下,O(1)的複雜度搞定。然而它還要處理過期的資料問題,就不那麼簡單了。

  我們用一個時序圖整體體會下get的流程:

 

 

二、setCommand

  setCommand 是個寫操作,就不是 get 那麼簡單了。

// t_string.c, set 的所有用法都統一 setCommand, 多個引數共同解析為 flags
/* SET key value [NX] [XX] [EX <seconds>] [PX <milliseconds>] */
void setCommand(client *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = OBJ_SET_NO_FLAGS;

    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
        // NX 與 XX 互斥
        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
            !(flags & OBJ_SET_XX))
        {
            flags |= OBJ_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_NX))
        {
            flags |= OBJ_SET_XX;
        } 
        // PX 與 EX 互斥
        else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_PX) && next)
        {
            flags |= OBJ_SET_EX;
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' &&
                   !(flags & OBJ_SET_EX) && next)
        {
            flags |= OBJ_SET_PX;
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
    // 嘗試壓縮 value 值以節省空間 (原始命令: set key value)
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
// object.c, 壓縮字串
/* Try to encode a string object in order to save space */
robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;

    /* Make sure this is a string object, the only type we encode
     * in this function. Other types use encoded memory efficient
     * representations but are handled by the commands implementing
     * the type. */
    serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);

    /* We try some specialized encoding only for objects that are
     * RAW or EMBSTR encoded, in other words objects that are still
     * in represented by an actually array of chars. */
    if (!sdsEncodedObject(o)) return o;

    /* It's not safe to encode shared objects: shared objects can be shared
     * everywhere in the "object space" of Redis and may end in places where
     * they are not handled. We handle them only as values in the keyspace. */
     if (o->refcount > 1) return o;

    /* Check if we can represent this string as a long integer.
     * Note that we are sure that a string larger than 21 chars is not
     * representable as a 32 nor 64 bit integer. */
    len = sdslen(s);
    // 針對小於21個字串的字元,嘗試轉為 long 型
    if (len <= 21 && string2l(s,len,&value)) {
        /* This object is encodable as a long. Try to use a shared object.
         * Note that we avoid using shared integers when maxmemory is used
         * because every object needs to have a private LRU field for the LRU
         * algorithm to work well. */
        if ((server.maxmemory == 0 ||
             (server.maxmemory_policy != MAXMEMORY_VOLATILE_LRU &&
              server.maxmemory_policy != MAXMEMORY_ALLKEYS_LRU)) &&
            value >= 0 &&
            value < OBJ_SHARED_INTEGERS)
        {
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            if (o->encoding == OBJ_ENCODING_RAW) sdsfree(o->ptr);
            o->encoding = OBJ_ENCODING_INT;
            o->ptr = (void*) value;
            return o;
        }
    }

    /* If the string is small and is still RAW encoded,
     * try the EMBSTR encoding which is more efficient.
     * In this representation the object and the SDS string are allocated
     * in the same chunk of memory to save space and cache misses. */
    // 44
    if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;

        if (o->encoding == OBJ_ENCODING_EMBSTR) return o;
        // 使用 EMBSTR 編碼轉換,實際就是同一個 s 返回
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }

    /* We can't encode the object...
     *
     * Do the last try, and at least optimize the SDS string inside
     * the string object to require little space, in case there
     * is more than 10% of free space at the end of the SDS string.
     *
     * We do that only for relatively large strings as this branch
     * is only entered if the length of the string is greater than
     * OBJ_ENCODING_EMBSTR_SIZE_LIMIT. */
    if (o->encoding == OBJ_ENCODING_RAW &&
        sdsavail(s) > len/10)
    {
        o->ptr = sdsRemoveFreeSpace(o->ptr);
    }

    /* Return the original object. */
    return o;
}
// sds.c, 去除無用空間佔用
/* Reallocate the sds string so that it has no free space at the end. The
 * contained string remains not altered, but next concatenation operations
 * will require a reallocation.
 *
 * After the call, the passed sds string is no longer valid and all the
 * references must be substituted with the new pointer returned by the call. */
sds sdsRemoveFreeSpace(sds s) {
    void *sh, *newsh;
    // s[-1] 指標不越界, 它是在新建一個 sds 物件時,在該指標前一位寫入的值,確定sds型別
    char type, oldtype = s[-1] & SDS_TYPE_MASK;
    int hdrlen;
    size_t len = sdslen(s);
    // sdsHdrSize: sds頭部大小
    sh = (char*)s-sdsHdrSize(oldtype);

    type = sdsReqType(len);
    hdrlen = sdsHdrSize(type);
    if (oldtype==type) {
        newsh = s_realloc(sh, hdrlen+len+1);
        if (newsh == NULL) return NULL;
        s = (char*)newsh+hdrlen;
    } else {
        newsh = s_malloc(hdrlen+len+1);
        if (newsh == NULL) return NULL;
        memcpy((char*)newsh+hdrlen, s, len+1);
        s_free(sh);
        s = (char*)newsh+hdrlen;
        s[-1] = type;
        sdssetlen(s, len);
    }
    sdssetalloc(s, len);
    return s;
}

// t_string.c, expire 為超時時間設定
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */

    if (expire) {
        // 解析 expire 到 milliseconds 中
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != C_OK)
            return;
        if (milliseconds <= 0) {
            addReplyErrorFormat(c,"invalid expire time in %s",c->cmd->name);
            return;
        }
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    // 語法限制檢測, NX 要求不存在, XX 要求存在
    if ((flags & OBJ_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & OBJ_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    // 切實儲存 kv
    setKey(c->db,key,val);
    server.dirty++;
    // 設定超時
    if (expire) setExpire(c->db,key,mstime()+milliseconds);
    // 通知pub/sub變更
    notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
    // 通知expire事件
    if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
        "expire",key,c->db->id);
    addReply(c, ok_reply ? ok_reply : shared.ok);
}
// object.c, 
int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) {
    long long value;
    if (getLongLongFromObject(o, &value) != C_OK) {
        if (msg != NULL) {
            addReplyError(c,(char*)msg);
        } else {
            addReplyError(c,"value is not an integer or out of range");
        }
        return C_ERR;
    }
    *target = value;
    return C_OK;
}
// object.c, 
int getLongLongFromObject(robj *o, long long *target) {
    long long value;

    if (o == NULL) {
        value = 0;
    } else {
        serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
        // 將字串轉換為 long 型,得到超時時間
        if (sdsEncodedObject(o)) {
            if (strict_strtoll(o->ptr,&value) == C_ERR) return C_ERR;
        } else if (o->encoding == OBJ_ENCODING_INT) {
            value = (long)o->ptr;
        } else {
            serverPanic("Unknown string encoding");
        }
    }
    if (target) *target = value;
    return C_OK;
}

  看完了超時及各標識位的解析,及set框架流程,我們來看下具體核心的kv儲存: setKey(), setExpire();

// db.c, set kv
/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
 *
 * 1) The ref count of the value object is incremented.
 * 2) clients WATCHing for the destination key notified.
 * 3) The expire time of the key is reset (the key is made persistent). */
void setKey(redisDb *db, robj *key, robj *val) {
    // 先查詢,再更新
    if (lookupKeyWrite(db,key) == NULL) {
        // 新增 kv
        dbAdd(db,key,val);
    } else {
        // 覆蓋 kv
        dbOverwrite(db,key,val);
    }
    // 增加 value 的引用計數
    incrRefCount(val);
    // 新增的元素,移出過期佇列
    removeExpire(db,key);
    signalModifiedKey(db,key);
}
robj *lookupKeyWrite(redisDb *db, robj *key) {
    // 先嚐試過期處理,再查詢db (hash 查詢 db->dic)
    expireIfNeeded(db,key);
    return lookupKey(db,key);
}

// db.c, 新增kv
/* Add the key to the DB. It's up to the caller to increment the reference
 * counter of the value if needed.
 *
 * The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val) {
    sds copy = sdsdup(key->ptr);
    // 新增到 db->dict 中
    int retval = dictAdd(db->dict, copy, val);

    serverAssertWithInfo(NULL,key,retval == C_OK);
    // list 型別的資料,進行特殊處理(阻塞)
    if (val->type == OBJ_LIST) signalListAsReady(db, key);
    // 叢集新增
    if (server.cluster_enabled) slotToKeyAdd(key);
 }
// db.c 
/* Slot to Key API. This is used by Redis Cluster in order to obtain in
 * a fast way a key that belongs to a specified hash slot. This is useful
 * while rehashing the cluster. */
void slotToKeyAdd(robj *key) {
    // hash 定位 slot, 下面我們簡單看下該演算法
    unsigned int hashslot = keyHashSlot(key->ptr,sdslen(key->ptr));

    sds sdskey = sdsdup(key->ptr);
    // 新增key 到 server.cluster->slots_to_keys 的 跳錶中
    zslInsert(server.cluster->slots_to_keys,hashslot,sdskey);
}
// cluster.c, slot 定位演算法, 其實就是 crc16演算法與上 0x3FFF,該演算法決定了 slot 最多隻能有 16383 個
/* We have 16384 hash slots. The hash slot of a given key is obtained
 * as the least significant 14 bits of the crc16 of the key.
 *
 * However if the key contains the {...} pattern, only the part between
 * { and } is hashed. This may be useful in the future to force certain
 * keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
    int s, e; /* start-end indexes of { and } */

    for (s = 0; s < keylen; s++)
        if (key[s] == '{') break;

    /* No '{' ? Hash the whole key. This is the base case. */
    if (s == keylen) return crc16(key,keylen) & 0x3FFF;

    /* '{' found? Check if we have the corresponding '}'. */
    for (e = s+1; e < keylen; e++)
        if (key[e] == '}') break;

    /* No '}' or nothing betweeen {} ? Hash the whole key. */
    if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;

    /* If we are here there is both a { and a } on its right. Hash
     * what is in the middle between { and }. */
    return crc16(key+s+1,e-s-1) & 0x3FFF;
}

// db.c, 設定key的超時標識
void setExpire(redisDb *db, robj *key, long long when) {
    dictEntry *kde, *de;

    /* Reuse the sds from the main dict in the expire dict */
    kde = dictFind(db->dict,key->ptr);
    serverAssertWithInfo(NULL,key,kde != NULL);
    // 將需要超時檢測的 key 新增到 db->expires 佇列中
    de = dictReplaceRaw(db->expires,dictGetKey(kde));
    // 設定超時時間為 when
    dictSetSignedIntegerVal(de,when);
}
// dict.h
#define dictSetSignedIntegerVal(entry, _val_) \
    do { entry->v.s64 = _val_; } while(0)

  總體來說,set操作會分為幾步:

    1. 判斷出多重引數,如是否是NX/EX/PX/XX, 是否超時設定;
    2. 編碼轉換資料, 如將字串轉換為long型;
    3. 解析超時欄位;
    4. set kv, 新增或者覆蓋資料庫值, 同時清理過期佇列;
    5. 設定超時時間;
    6. 觸發事件監聽;
    7. 響應客戶端;

  最後,我們以set的整個時序圖作為結尾,也讓我們明白一點,不是每個hello world 都很簡單: