1. 程式人生 > >Redis原始碼剖析和註釋(十四)---- Redis 資料庫及相關命令實現(db)

Redis原始碼剖析和註釋(十四)---- Redis 資料庫及相關命令實現(db)

Redis 資料庫及相關命令實現

1. 資料庫管理命令

命令 描述
FLUSHDB 清空當前資料庫的所有key
FLUSHALL 清空整個Redis伺服器的所有key
DBSIZE 返回當前資料庫的key的個數
DEL key [key …] 刪除一個或多個鍵
EXISTS key 檢查給定key是否存在
SELECT id 切換到指定的資料庫
RANDOMKEY 從當前資料庫中隨機返回(不刪除)一個 key 。
KEYS pattern 查詢所有符合給定模式pattern的key
SCAN cursor [MATCH pattern] [COUNT count] 增量式迭代當前資料庫鍵
LASTSAVE 返回最近一次成功將資料儲存到磁碟上的時間,以 UNIX 時間戳格式表示。
TYPE key 返回指定鍵的物件型別
SHUTDOWN 停止所有客戶端,關閉 redis 伺服器(server)
RENAME key newkey 重新命名指定的key,newkey存在時覆蓋
RENAMENX key newkey 重新命名指定的key,當且僅當newkey不存在時操作
MOVE key db 移動key到指定資料庫
EXPIREAT key timestamp 為 key 設定生存時間,EXPIREAT 命令接受的時間引數是 UNIX 時間戳
EXPIRE key seconds 以秒為單位設定 key 的生存時間
PEXPIRE key milliseconds 以毫秒為單位設定 key 的生存時間
PEXPIREAT key milliseconds-timestamp 以毫秒為單位設定 key 的過期 unix 時間戳
TTL key 以秒為單位返回 key 的剩餘生存時間
PTTL key 以毫秒為單位返回 key 的剩餘生存時間

2. 資料庫的實現

2.1資料庫的結構

typedef struct redisDb {
    // 鍵值對字典,儲存資料庫中所有的鍵值對
    dict *dict;                 /* The keyspace for this DB */
    // 過期字典,儲存著設定過期的鍵和鍵的過期時間
    dict *expires;              /* Timeout of keys with a timeout set */
    // 儲存著 所有造成客戶端阻塞的鍵和被阻塞的客戶端
    dict *blocking_keys;        /*Keys with clients waiting for data (BLPOP) */
    // 儲存著 處於阻塞狀態的鍵,value為NULL
    dict *ready_keys;           /* Blocked keys that received a PUSH */
    // 事物模組,用於儲存被WATCH命令所監控的鍵
    dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS */
    // 當記憶體不足時,Redis會根據LRU演算法回收一部分鍵所佔的空間,而該eviction_pool是一個長為16陣列,儲存可能被回收的鍵
    // eviction_pool中所有鍵按照idle空轉時間,從小到大排序,每次回收空轉時間最長的鍵
    struct evictionPoolEntry *eviction_pool;    /* Eviction pool of keys */
    // 資料庫ID
    int id;                     /* Database ID */
    // 鍵的平均過期時間
    long long avg_ttl;          /* Average TTL, just for stats */
} redisDb;
  • blocking_keys 和 ready_keys 使用於在列表型別的阻塞命令(BLPOP等),詳細內容看:Redis 列表鍵命令實現
  • watched_keys 是用於事物模組。
  • eviction_pool 是Redis在記憶體不足情況下,要回收記憶體時所使用。
  • dict 和 expires 和 id是本篇主要討論的。

Redis伺服器和客戶端也都儲存有資料庫的資訊,下面截取出來:

typedef struct client {
    redisDb *db;            /* Pointer to currently SELECTed DB. */
} client;

struct redisServer {
    redisDb *db;
    int dbnum;                      /* Total number of configured DBs */
};

Redis伺服器在初始化時,會建立一個長度為dbnum(預設為16)個 redisDb型別陣列,客戶端登入時,預設的資料庫為0號資料庫。當執行SELECT index命令後,就會切換資料庫。我們用兩個客戶端,表示如下圖:

這裡寫圖片描述

SELECT index命令非常簡單,原始碼如下:

// 切換資料庫
int selectDb(client *c, int id) {
    // id非法,返回錯誤
    if (id < 0 || id >= server.dbnum)
        return C_ERR;
    // 設定當前client的資料庫
    c->db = &server.db[id];
    return C_OK;
}

2.2 資料庫的鍵值對字典

Redis是一個key-value資料庫伺服器,它將所有的鍵值對都儲存在 redisDb 結構中的 dict 字典成員中(Redis 字典結構原始碼剖析)。

  • 鍵值對字典的鍵,就是資料庫的key,每一個key都是字串的物件。

  • 鍵值對字典的值,就是資料庫的value,每一個value可以是字串的物件,列表物件,雜湊表物件,集合物件和有序集合物件中的任意一種。

資料庫對鍵物件的刪除操作,會連帶值物件也一併刪除,因此再有一些操作中,例如RENAME等命令,中間步驟會使用刪除原有鍵,常常需要對值物件的引用計數加1,保護值物件不被刪除,當新的鍵被設定後,則對值物件的引用計數減1。

我們向一個數據庫中新增幾個鍵,並且用圖表示出來:

  • 紅色代表鍵物件,有 RAW編碼的字串物件,雜湊物件。將結構簡化表示,重點關注引用計數。
  • 藍色代表值物件,完成結構如圖所示。

這裡寫圖片描述

資料庫每次根據鍵名找到值物件時,是分為以讀操作 lookupKeyRead() 或寫操作 lookupKeyWrite() 的方式取出的,而這兩種有一定的區別,下面展示原始碼:

  • lookupKey()函式

讀操作 lookupKeyRead() 或寫操作 lookupKeyWrite()都會呼叫這個底層的函式,這個函式非常簡單,就是從鍵值對字典中先找到鍵名對應的鍵物件,然後取出值物件。

// 該函式被lookupKeyRead()和lookupKeyWrite()和lookupKeyReadWithFlags()呼叫
// 從資料庫db中取出key的值物件,如果存在返回該物件,否則返回NULL
// 返回key物件的值物件
robj *lookupKey(redisDb *db, robj *key, int flags) {
    // 在資料庫中查詢key物件,返回儲存該key的節點地址
    dictEntry *de = dictFind(db->dict,key->ptr);
    if (de) {   //如果找到
        robj *val = dictGetVal(de); //取出鍵對應的值物件

        /* Update the access time for the ageing algorithm.
         * Don't do it if we have a saving child, as this will trigger
         * a copy on write madness. */
        // 更新鍵的使用時間
        if (server.rdb_child_pid == -1 &&
            server.aof_child_pid == -1 &&
            !(flags & LOOKUP_NOTOUCH))
        {
            val->lru = LRU_CLOCK();
        }
        return val; //返回值物件
    } else {
        return NULL;
    }
  • lookupKeyRead()函式

lookupKeyRead()函式呼叫了lookupKeyReadWithFlags()函式,後者其實就判斷了一下當前鍵是否過期,如果沒有過期,更新 misses 和 hits 資訊,然後就返回值物件。

還有就是兩個巨集:

  1. define LOOKUP_NONE 0 //zero,沒有特殊意義
  2. define LOOKUP_NOTOUCH (1<<0) //不修改鍵的使用時間,如果只是想判斷key的值物件的編碼型別(TYPE命令)我們不希望改變鍵的使用時間。
// 以讀操作取出key的值物件,會更新是否命中的資訊
robj *lookupKeyRead(redisDb *db, robj *key) {
    return lookupKeyReadWithFlags(db,key,LOOKUP_NONE);
}

// 以讀操作取出key的值物件,沒找到返回NULL
// 呼叫該函式的副作用如下:
// 1.如果一個鍵的到達過期時間TTL,該鍵被設定為過期的
// 2.鍵的使用時間資訊被更新
// 3.全域性鍵 hits/misses 狀態被更新
// 注意:如果鍵在邏輯上已經過期但是仍然存在,函式返回NULL
robj *lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
    robj *val;

    // 如果鍵已經過期且被刪除
    if (expireIfNeeded(db,key) == 1) {
        /* Key expired. If we are in the context of a master, expireIfNeeded()
         * returns 0 only when the key does not exist at all, so it's save
         * to return NULL ASAP. */
        // 鍵已過期,如果是主節點環境,表示key已經絕對被刪除,如果是從節點,
        if (server.masterhost == NULL) return NULL;

        // 如果我們在從節點環境, expireIfNeeded()函式不會刪除過期的鍵,它返回的僅僅是鍵是否被刪除的邏輯值
        // 過期的鍵由主節點負責,為了保證主從節點資料的一致
        if (server.current_client &&
            server.current_client != server.master &&
            server.current_client->cmd &&
            server.current_client->cmd->flags & CMD_READONLY)
        {
            return NULL;
        }
    }
    // 鍵沒有過期,則返回鍵的值物件
    val = lookupKey(db,key,flags);
    // 更新 是否命中 的資訊
    if (val == NULL)
        server.stat_keyspace_misses++;
    else
        server.stat_keyspace_hits++;
    return val;
}
  • lookupKeyWrite()函式

lookupKeyWrite() 函式則先判斷鍵是否過期,然後直接呼叫最底層的 lookupKey() 函式,和 lookupKeyRead()函式 相比,少了一步更新 misses 和 hits 資訊的過程。

// 以寫操作取出key的值物件,不更新是否命中的資訊
robj *lookupKeyWrite(redisDb *db, robj *key) {
    expireIfNeeded(db,key);
    return lookupKey(db,key,LOOKUP_NONE);
}

2.3 鍵的過期時間

redisBb結構中的 expires 字典儲存這設定了過期時間的鍵和過期的時間。通過 EXPIRE 、 PEXPIRE、 EXPIREAT 和 PEXPIREAT四個命令,客戶端可以給某個存在的鍵設定過期時間,當鍵的過期時間到達時,鍵就不再可用。

我們先用圖展示一下資料庫中的過期字典,用剛才的鍵值對字典中的物件。

這裡寫圖片描述

  • 很明顯,鍵值對字典和過期字典中的相同物件只佔一份空間,只是增加引用計數

我們重點討論過期鍵的刪除策略:

  1. 惰性刪除:當客戶度讀出帶有超時屬性的鍵時,如果已經超過鍵設定的過期時間,會執行刪除並返回空。
  2. 定時刪除:Redis內部維護一個定時任務,預設每秒執行10次。

我們給出惰性刪除的程式碼,這個函式 expireIfNeeded(),所有讀寫資料庫的Redis命令在執行前都會呼叫,刪除過期鍵。

// 檢查鍵是否過期,如果過期,從資料庫中刪除
// 返回0表示沒有過期或沒有過期時間,返回1 表示鍵被刪除
int expireIfNeeded(redisDb *db, robj *key) {
    //得到過期時間,單位毫秒
    mstime_t when = getExpire(db,key);
    mstime_t now;

    // 沒有過期時間,直接返回
    if (when < 0) return 0; /* No expire for this key */

    /* Don't expire anything while loading. It will be done later. */
    // 伺服器正在載入,那麼不進行過期檢查
    if (server.loading) return 0;

    /* If we are in the context of a Lua script, we claim that time is
     * blocked to when the Lua script started. This way a key can expire
     * only the first time it is accessed and not in the middle of the
     * script execution, making propagation to slaves / AOF consistent.
     * See issue #1525 on Github for more information. */
    // 返回一個Unix時間,單位毫秒
    now = server.lua_caller ? server.lua_time_start : mstime();

    /* If we are running in the context of a slave, return ASAP:
     * the slave key expiration is controlled by the master that will
     * send us synthesized DEL operations for expired keys.
     *
     * Still we try to return the right information to the caller,
     * that is, 0 if we think the key should be still valid, 1 if
     * we think the key is expired at this time. */
    // 如果伺服器正在進行主從節點的複製,從節點的過期鍵應該被 主節點發送同步刪除的操作 刪除,而自己不主動刪除
    // 從節點只返回正確的邏輯資訊,0表示key仍然沒有過期,1表示key過期。
    if (server.masterhost != NULL) return now > when;

    /* Return when this key has not expired */
    // 當鍵還沒有過期時,直接返回0
    if (now <= when) return 0;

    /* Delete the key */
    // 鍵已經過期,刪除鍵
    server.stat_expiredkeys++;              //過期鍵的數量加1
    propagateExpire(db,key);                //將過期鍵key傳播給AOF檔案和從節點
    notifyKeyspaceEvent(NOTIFY_EXPIRED,     //傳送"expired"事件通知
        "expired",key,db->id);
    return dbDelete(db,key);                //從資料庫中刪除key
}

3. 資料庫相關命令實現

我們只列舉部分命令實現,所有程式碼註釋可以上github檢視:Redis 資料庫實現(db.c)

3.1 鍵空間命令

  • SCAN 一類命令的底層實現
// SCAN cursor [MATCH pattern] [COUNT count]
// SCAN、HSCAN、SSCAN、ZSCAN一類命令底層實現
// o物件必須是雜湊物件或集合物件,否則命令將操作當前資料庫
// 如果o不是NULL,那麼說明他是一個雜湊或集合物件,函式將跳過這些鍵物件,對引數進行分析
// 如果是雜湊物件,返回返回的是鍵值對
void scanGenericCommand(client *c, robj *o, unsigned long cursor) {
    int i, j;
    list *keys = listCreate();  //建立一個列表
    listNode *node, *nextnode;
    long count = 10;
    sds pat = NULL;
    int patlen = 0, use_pattern = 0;
    dict *ht;

    /* Object must be NULL (to iterate keys names), or the type of the object
     * must be Set, Sorted Set, or Hash. */
    // 輸入型別的檢查,要麼迭代鍵名,要麼當前集合物件,要麼迭代雜湊物件,要麼迭代有序集合物件
    serverAssert(o == NULL || o->type == OBJ_SET || o->type == OBJ_HASH ||
                o->type == OBJ_ZSET);

    /* Set i to the first option argument. The previous one is the cursor. */
    // 計算第一個引數的下標,如果是鍵名,要條跳過該鍵
    i = (o == NULL) ? 2 : 3; /* Skip the key argument if needed. */

    /* Step 1: Parse options. */
    // 1. 解析選項
    while (i < c->argc) {
        j = c->argc - i;
        // 設定COUNT引數,COUNT 選項的作用就是讓使用者告知迭代命令, 在每次迭代中應該返回多少元素。
        if (!strcasecmp(c->argv[i]->ptr, "count") && j >= 2) {
            //儲存個數到count
            if (getLongFromObjectOrReply(c, c->argv[i+1], &count, NULL)
                != C_OK)
            {
                goto cleanup;
            }

            // 如果個數小於1,語法錯誤
            if (count < 1) {
                addReply(c,shared.syntaxerr);
                goto cleanup;
            }

            i += 2; //引數跳過兩個已經解析過的
        // 設定MATCH引數,讓命令只返回和給定模式相匹配的元素。
        } else if (!strcasecmp(c->argv[i]->ptr, "match") && j >= 2) {
            pat = c->argv[i+1]->ptr;    //pattern字串
            patlen = sdslen(pat);       //pattern字串長度

            /* The pattern always matches if it is exactly "*", so it is
             * equivalent to disabling it. */
            // 如果pattern是"*",就不用匹配,全部返回,設定為0
            use_pattern = !(pat[0] == '*' && patlen == 1);

            i += 2;
        } else {
            addReply(c,shared.syntaxerr);
            goto cleanup;
        }
    }

    /* Step 2: Iterate the collection.
     *
     * Note that if the object is encoded with a ziplist, intset, or any other
     * representation that is not a hash table, we are sure that it is also
     * composed of a small number of elements. So to avoid taking state we
     * just return everything inside the object in a single call, setting the
     * cursor to zero to signal the end of the iteration. */

    /* Handle the case of a hash table. */
    // 2.如果物件是ziplist、intset或其他而不是雜湊表,那麼這些型別只是包含少量的元素
    // 我們一次將其所有的元素全部返回給呼叫者,並設定遊標cursor為0,標示迭代完成
    ht = NULL;
    // 迭代目標是資料庫
    if (o == NULL) {
        ht = c->db->dict;
    // 迭代目標是HT編碼的集合物件
    } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
        ht = o->ptr;
    // 迭代目標是HT編碼的雜湊物件
    } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
        ht = o->ptr;
        count *= 2; /* We return key / value for this type. */
    // 迭代目標是skiplist編碼的有序集合物件
    } else if (o->type == OBJ_ZSET && o->encoding == OBJ_ENCODING_SKIPLIST) {
        zset *zs = o->ptr;
        ht = zs->dict;
        count *= 2; /* We return key / value for this type. */
    }

    if (ht) {
        void *privdata[2];
        /* We set the max number of iterations to ten times the specified
         * COUNT, so if the hash table is in a pathological state (very
         * sparsely populated) we avoid to block too much time at the cost
         * of returning no or very few elements. */
        // 設定最大的迭代長度為10*count次
        long maxiterations = count*10;

        /* We pass two pointers to the callback: the list to which it will
         * add new elements, and the object containing the dictionary so that
         * it is possible to fetch more data in a type-dependent way. */
        // 回撥函式scanCallback的引數privdata是一個數組,儲存的是被迭代物件的鍵和值
        // 回撥函式scanCallback的另一個引數,是一個字典物件
        // 回撥函式scanCallback的作用,從字典物件中將鍵值對提取出來,不用管字典物件是什麼資料型別
        privdata[0] = keys;
        privdata[1] = o;
        // 迴圈掃描ht,從遊標cursor開始,呼叫指定的scanCallback函式,提出ht中的資料到剛開始建立的列表keys中
        do {
            cursor = dictScan(ht, cursor, scanCallback, privdata);
        } while (cursor &&
              maxiterations-- &&
              listLength(keys) < (unsigned long)count);//沒迭代完,或沒迭代夠count,就繼續迴圈

    // 如果是集合物件但編碼不是HT是整數集合
    } else if (o->type == OBJ_SET) {
        int pos = 0;
        int64_t ll;
        // 將整數值取出來,構建成字串物件加入到keys列表中,遊標設定為0,表示迭代完成
        while(intsetGet(o->ptr,pos++,&ll))
            listAddNodeTail(keys,createStringObjectFromLongLong(ll));
        cursor = 0;
    // 如果是雜湊物件,或有序集合物件,但是編碼都不是HT,是ziplist
    } else if (o->type == OBJ_HASH || o->type == OBJ_ZSET) {
        unsigned char *p = ziplistIndex(o->ptr,0);
        unsigned char *vstr;
        unsigned int vlen;
        long long vll;

        while(p) {
            // 將值取出來,根據不同型別的值,構建成相同的字串物件,加入到keys列表中
            ziplistGet(p,&vstr,&vlen,&vll);
            listAddNodeTail(keys,
                (vstr != NULL) ? createStringObject((char*)vstr,vlen) :
                                 createStringObjectFromLongLong(vll));
            p = ziplistNext(o->ptr,p);
        }
        cursor = 0;
    } else {
        serverPanic("Not handled encoding in SCAN.");
    }

    /* Step 3: Filter elements. */
    // 3. 如果設定MATCH引數,要進行過濾
    node = listFirst(keys); //連結串列首節點地址
    while (node) {
        robj *kobj = listNodeValue(node);   //key物件
        nextnode = listNextNode(node);      //下一個節點地址
        int filter = 0; //預設為不過濾

        /* Filter element if it does not match the pattern. */
        //pattern不是"*"因此要過濾
        if (!filter && use_pattern) {
            // 如果kobj是字串物件
            if (sdsEncodedObject(kobj)) {
                // kobj的值不匹配pattern,設定過濾標誌
                if (!stringmatchlen(pat, patlen, kobj->ptr, sdslen(kobj->ptr), 0))
                    filter = 1;
            // 如果kobj是整數物件
            } else {
                char buf[LONG_STR_SIZE];
                int len;

                serverAssert(kobj->encoding == OBJ_ENCODING_INT);
                // 將整數轉換為字串型別,儲存到buf中
                len = ll2string(buf,sizeof(buf),(long)kobj->ptr);
                //buf的值不匹配pattern,設定過濾標誌
                if (!stringmatchlen(pat, patlen, buf, len, 0)) filter = 1;
            }
        }

        /* Filter element if it is an expired key. */
        // 迭代目標是資料庫,如果kobj是過期鍵,則過濾
        if (!filter && o == NULL && expireIfNeeded(c->db, kobj)) filter = 1;

        /* Remove the element and its associted value if needed. */
        // 如果該鍵滿足了上述的過濾條件,那麼將其從keys列表刪除並釋放
        if (filter) {
            decrRefCount(kobj);
            listDelNode(keys, node);
        }

        /* If this is a hash or a sorted set, we have a flat list of
         * key-value elements, so if this element was filtered, remove the
         * value, or skip it if it was not filtered: we only match keys. */
        // 如果當前迭代目標是有序集合或雜湊物件,因此keys列表中儲存的是鍵值對,如果key鍵物件被過濾,值物件也應當被過濾
        if (o && (o->type == OBJ_ZSET || o->type == OBJ_HASH)) {
            node = nextnode;
            nextnode = listNextNode(node);  //值物件的節點地址
            // 如果該鍵滿足了上述的過濾條件,那麼將其從keys列表刪除並釋放
            if (filter) {
                kobj = listNodeValue(node); //取出值物件
                decrRefCount(kobj);
                listDelNode(keys, node);    //刪除
            }
        }
        node = nextnode;
    }

    /* Step 4: Reply to the client. */
    // 4. 回覆資訊給client
    addReplyMultiBulkLen(c, 2);     //2部分,一個是遊標,一個是列表
    addReplyBulkLongLong(c,cursor); //回覆遊標

    addReplyMultiBulkLen(c, listLength(keys));  //回覆列表長度

    //迴圈回覆列表中的元素,並釋放
    while ((node = listFirst(keys)) != NULL) {
        robj *kobj = listNodeValue(node);
        addReplyBulk(c, kobj);
        decrRefCount(kobj);
        listDelNode(keys, node);
    }

// 清理程式碼
cleanup:
    listSetFreeMethod(keys,decrRefCountVoid);   //設定特定的釋放列表的方式decrRefCountVoid
    listRelease(keys);                          //釋放
}
  • RENAME、RENAMENX命令底層實現
// RENAME key newkey
// RENAMENX key newkey
// RENAME、RENAMENX命令底層實現
void renameGenericCommand(client *c, int nx) {
    robj *o;
    long long expire;
    int samekey = 0;

    /* When source and dest key is the same, no operation is performed,
     * if the key exists, however we still return an error on unexisting key. */
    // key和newkey相同的話,設定samekey標誌
    if (sdscmp(c->argv[1]->ptr,c->argv[2]->ptr) == 0) samekey = 1;

    // 以寫操作讀取key的值物件
    if ((o = lookupKeyWriteOrReply(c,c->argv[1],shared.nokeyerr)) == NULL)
        return;

    // 如果key和newkey相同,nx為1傳送0,否則為ok
    if (samekey) {
        addReply(c,nx ? shared.czero : shared.ok);
        return;
    }

    // 增加值物件的引用計數,保護起來,用於關聯newkey,以防刪除了key順帶將值物件也刪除
    incrRefCount(o);
    // 備份key的過期時間,將來作為newkey的過期時間
    expire = getExpire(c->db,c->argv[1]);
    // 判斷newkey的值物件是否存在
    if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
        // 設定nx標誌,則不符合已存在的條件,傳送0
        if (nx) {
            decrRefCount(o);
            addReply(c,shared.czero);
            return;
        }
        /* Overwrite: delete the old key before creating the new one
         * with the same name. */
        dbDelete(c->db,c->argv[2]); //將舊的newkey物件刪除
    }
    // 將newkey和key的值物件關聯
    dbAdd(c->db,c->argv[2],o);
    // 如果newkey設定過過期時間,則為newkey設定過期時間
    if (expire != -1) setExpire(c->db,c->argv[2],expire);
    // 刪除key
    dbDelete(c->db,c->argv[1]);
    // 傳送這兩個鍵被修改的訊號
    signalModifiedKey(c->db,c->argv[1]);
    signalModifiedKey(c->db,c->argv[2]);
    // 傳送不同命令的事件通知
    notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
        c->argv[1],c->db->id);
    notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_to",
        c->argv[2],c->db->id);
    server.dirty++;     //更新髒鍵
    addReply(c,nx ? shared.cone : shared.ok);
}
  • MOVE 命令
// MOVE key db 將當前資料庫的 key 移動到給定的資料庫 db 當中。
// MOVE 命令實現
void moveCommand(client *c) {
    robj *o;
    redisDb *src, *dst;
    int srcid;
    long long dbid, expire;

    // 伺服器處於叢集模式,不支援多資料庫
    if (server.cluster_enabled) {
        addReplyError(c,"MOVE is not allowed in cluster mode");
        return;
    }

    /* Obtain source and target DB pointers */
    // 獲得源資料庫和源資料庫的id
    src = c->db;
    srcid = c->db->id;

    // 將引數db的值儲存到dbid,並且切換到該資料庫中
    if (getLongLongFromObject(c->argv[2],&dbid) == C_ERR ||
        dbid < INT_MIN || dbid > INT_MAX ||
        selectDb(c,dbid) == C_ERR)
    {
        addReply(c,shared.outofrangeerr);
        return;
    }
    // 目標資料庫
    dst = c->db;
    // 切換回源資料庫
    selectDb(c,srcid); /* Back to the source DB */

    /* If the user is moving using as target the same
     * DB as the source DB it is probably an error. */
    // 如果前後切換的資料庫相同,則返回有關錯誤
    if (src == dst) {
        addReply(c,shared.sameobjecterr);
        return;
    }

    /* Check if the element exists and get a reference */
    // 以寫操作取出源資料庫的物件
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (!o) {
        addReply(c,shared.czero);   //不存在傳送0
        return;
    }
    // 備份key的過期時間
    expire = getExpire(c->db,c->argv[1]);

    /* Return zero if the key already exists in the target DB */
    // 判斷當前key是否存在於目標資料庫,存在直接返回,傳送0
    if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
        addReply(c,shared.czero);
        return;
    }
    // 將key-value物件新增到目標資料庫中
    dbAdd(dst,c->argv[1],o);
    // 設定移動後key的過期時間
    if (expire != -1) setExpire(dst,c->argv[1],expire);
    incrRefCount(o);    //增加引用計數

    /* OK! key moved, free the entry in the source DB */
    // 從源資料庫中將key和關聯的值物件刪除
    dbDelete(src,c->argv[1]);
    server.dirty++; //更新髒鍵
    addReply(c,shared.cone);    //回覆1
}

3.2 過期命令

  • EXPIRE, PEXPIRE, EXPIREAT,PEXPIREAT命令的底層實現
// EXPIRE key seconds
// EXPIREAT key timestamp
// PEXPIRE key milliseconds
// PEXPIREAT key milliseconds-timestamp
// EXPIRE, PEXPIRE, EXPIREAT,PEXPIREAT命令的底層實現
// basetime引數可能是絕對值,可能是相對值。執行AT命令時basetime為0,否則儲存的是當前的絕對時間
// unit 是UNIT_SECONDS 或者 UNIT_MILLISECONDS,但是basetime總是以毫秒為單位的。
void expireGenericCommand(client *c, long long basetime, int unit) {
    robj *key = c->argv[1], *param = c->argv[2];
    long long when; /* unix time in milliseconds when the key will expire. */

    // 取出時間引數儲存到when中
    if (getLongLongFromObjectOrReply(c, param, &when, NULL) != C_OK)
        return;

    // 如果過期時間是以秒為單位,則轉換為毫秒值
    if (unit == UNIT_SECONDS) when *= 1000;
    // 絕對時間
    when += basetime;

    /* No key, return zero. */
    // 判斷key是否在資料庫中,不在返回0
    if (lookupKeyWrite(c->db,key) == NULL) {
        addReply(c,shared.czero);
        return;
    }

    /* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
     * should never be executed as a DEL when load the AOF or in the context
     * of a slave instance.
     *
     * Instead we take the other branch of the IF statement setting an expire
     * (possibly in the past) and wait for an explicit DEL from the master. */
    // 如果當前正在載入AOF資料或者在從節點環境中,即使EXPIRE的TTL為負數,或者EXPIREAT的時間戳已經過期
    // 伺服器都不會執行DEL命令,且將過期TTL設定為鍵的過期時間,等待主節點發來的DEL命令

    // 如果when已經過時,伺服器為主節點且沒有載入AOF資料
    if (when <= mstime() && !server.loading && !server.masterhost) {
        robj *aux;

        // 將key從資料庫中刪除
        serverAssertWithInfo(c,key,dbDelete(c->db,key));
        server.dirty++; //更新髒鍵

        /* Replicate/AOF this as an explicit DEL. */
        // 建立一個"DEL"命令
        aux = createStringObject("DEL",3);
        rewriteClientCommandVector(c,2,aux,key);    //修改客戶端的引數列表為DEL命令
        decrRefCount(aux);
        // 傳送鍵被修改的訊號
        signalModifiedKey(c->db,key);
        // 傳送"del"的事件通知
        notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
        addReply(c, shared.cone);
        return;

    // 如果當前伺服器是從節點,或者伺服器正在載入AOF資料
    // 不管when有沒有過時,都設定為過期時間
    } else {
        // 設定過期時間
        setExpire(c->db,key,when);
        addReply(c,shared.cone);
        signalModifiedKey(c->db,key);   //傳送鍵被修改的訊號
        notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); //傳送"expire"的事件通知
        server.dirty++; //更新髒鍵
        return;
    }
}
  • TTL、PTTL 命令底層實現
// TTL key
// PTTL key
// TTL、PTTL命令底層實現,output_ms為1,返回毫秒,為0返回秒
void ttlGenericCommand(client *c, int output_ms) {
    long long expire, ttl = -1;

    /* If the key does not exist at all, return -2 */
    // 判斷key是否存在於資料庫,並且不修改鍵的使用時間
    if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == NULL) {
        addReplyLongLong(c,-2);
        return;
    }
    /* The key exists. Return -1 if it has no expire, or the actual
     * TTL value otherwise. */
    // 如果key存在,則備份當前key的過期時間
    expire = getExpire(c->db,c->argv[1]);

    // 如果設定了過期時間
    if (expire != -1) {
        ttl = expire-mstime();  //計算生存時間
        if (ttl < 0) ttl = 0;
    }
    // 如果鍵是永久的
    if (ttl == -1) {
        addReplyLongLong(c,-1); //傳送-1
    } else {
        addReplyLongLong(c,output_ms ? ttl : ((ttl+500)/1000)); //傳送生存時間
    }
}