1. 程式人生 > >【redis原始碼分析】RDB持久化機制

【redis原始碼分析】RDB持久化機制

        rdb是redis儲存記憶體資料到磁碟資料的其中一種方式(另一種是AOF)。Rdb的主要原理就是在某個時間點把記憶體中的所有資料的快照儲存一份到磁碟上。在條件達到時通過fork一個子程序把記憶體中的資料寫到一個臨時檔案中來實現儲存資料快照。在所有資料寫完後再把這個臨時檔案用原子函式rename(2)重新命名為目標rdb檔案。這種實現方式充分利用fork的copy on write。

  另外一種是通過save命令主動觸發儲存資料快照,這種是阻塞式的,即不會通過生成子程序(就在當前程序完成)來進行資料集快照的儲存。

  相關配置

save <seconds> <changes>

  經過多少秒且多少個key有改變就進行,可以配置多個,只要有一個滿足就進行儲存資料快照到磁碟

rdbcompression yes

  儲存資料到rdb檔案時是否進行壓縮,如果不想可以配置成’no’,預設是’yes’,因為壓縮可以減少I/O,當然,壓縮需要消耗一些cpu資源。

dbfilename dump.rdb

  快照檔名

dir ./

  快照檔案所在的目錄,同時也是AOF檔案所在的目錄

  Rdb檔案格式

  [注:本節所說的型別,值在沒有特別標明的情況下都是針對rdb檔案來說的]

  Rdb檔案的整體格式

檔案簽名 | 版本號 | 型別 | 值 | 型別 | 值 | … | 型別 | 值

  [注:豎線和空格是為了便於閱讀而加入的,rdb檔案中是沒有豎線和空格分隔的]

  • 檔案簽名是字串:REDIS
  • 版本號是字串:0006
  • 型別是指值的型別,redis值的型別有很多種,下邊一一介紹
  • 值是對應的型別下的值,不同型別的值格式不一樣。這裡的值包含了redis中的key與val。而不是單指redis中val。

  REDIS_SELECTDB型別與REDIS_EOF型別

  • REDIS_SELECTDB型別:對應的值是redis db的編號,從0開始到比db數小1的數值。redis中可以配置db數,每個key只屬於一個db。
  • 儲存redis db的編號時使用的是儲存長度時使用的格式,為了儘量壓縮rdb檔案,儲存長度使用的位元組數是不一樣的,會進行重新編碼
  • REDIS_EOF型別:沒有對應的值。rdb檔案的結束符。

  把這REDIS_SELECTDB型別和REDIS_EOF型別代入到上邊的rdb檔案的格式中,那麼rdb檔案的整體格式變成為:

檔案簽名 | 版本號 | REDIS_SELECTDB型別 | db編號 | 型別 | 值 | … | REDIS_SELECTD 型別 | db編號 | 型別 | 值 | … | REDIS_EOF型別

  • 兩個REDIS_SELECTDB型別之間的資料都是該db下邊的key和value的資料

  相關程式碼

  Rdb.c

int rdbSave(char *filename) { 
    … 
      // 以 "temp-<pid>.rdb" 格式建立臨時檔名
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    // 初始化 rio 檔案
    rioInitWithFile(&rdb,fp);
    // 如果有需要的話,設定校驗和計算函式
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
    // 以 "REDIS <VERSION>" 格式寫入檔案頭,以及 RDB 的版本
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;


    // 遍歷所有資料庫,儲存它們的資料
    for (j = 0; j < server.dbnum; j++) {
        // 指向資料庫
        redisDb *db = server.db+j;
        // 指向資料庫 key space
        dict *d = db->dict;
        // 資料庫為空, pass ,處理下個數據庫
        if (dictSize(d) == 0) continue;


        // 建立迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }


        /* Write the SELECT DB opcode */
        // 記錄正在使用的資料庫的號碼
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(&rdb,j) == -1) goto werr;


        /* Iterate this DB writing every entry */
        // 將資料庫中的所有節點儲存到 RDB 檔案
        while((de = dictNext(di)) != NULL) {
            // 取出鍵
            sds keystr = dictGetKey(de);
            // 取出值
            robj key, 
                 *o = dictGetVal(de);
            long long expire;
            
            initStaticStringObject(key,keystr);
            // 取出過期時間
            expire = getExpire(db,&key);
             //儲存所有的鍵值對
            if (rdbSaveKeyValuePair(&rdb,&key,o,expire,now) == -1) goto werr;
        }
        dictReleaseIterator(di);
    }
    … 
}

  Rdb中長度的儲存

  Redis為了儘量壓縮rdb檔案真是費盡心思,先來看看redis為了壓縮使用的長度儲存。長度主要用在字串長度,連結串列長度,hash表的大小儲存上。

  Redis把長度的儲存分為四種,最左邊位元組的從左到右的前兩位用於區分長度的儲存型別。

    相關程式碼   

    Rdb.c:31

int rdbSaveLen(rio *rdb, uint32_t len) {
    unsigned char buf[2];
    size_t nwritten;

    if (len < (1<<6)) {
        /* Save a 6 bit len */
		//00 xxxxxx
        buf[0] = (len&0xFF)|(REDIS_RDB_6BITLEN<<6);
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        nwritten = 1;
    } else if (len < (1<<14)) {
        /* Save a 14 bit len */
		//01 xxxxxx xxxxxxxx
        buf[0] = ((len>>8)&0xFF)|(REDIS_RDB_14BITLEN<<6);
        buf[1] = len&0xFF;
        if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
        nwritten = 2;
    } else {
        /* Save a 32 bit len */
		//10 xxxxxx
        buf[0] = (REDIS_RDB_32BITLEN<<6);
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
        len = htonl(len);
		//轉換為網路位元組序寫入,
		if (rdbWriteRaw(rdb,&len,4) == -4) return -1;
        nwritten = 1+4;
    }

    // 返回編碼所使用的長度
    return nwritten;
}

  也許你發現了,程式碼中只有上邊的表格中只有3種,還有一種哪去了呢?

  另外一種比較特殊,如下:

    是不是覺得這種長度型別很奇怪,為什麼要這樣做?

  Redis在兩種情況下需要對儲存的內容進行編碼

  1.把字串轉成整數儲存

  比如:‘-100’需要4個位元組儲存,轉換整數只需要一個位元組

  相關函式rdbTryIntegerEncoding(rdb.c:88)

  2.使用lzf演算法壓縮字串

  相關函式lzf_compress(lzf_c.c:99),lzf的演算法解釋見lzf字串壓縮演算法

  當redis使用這兩種編碼對字串進行編碼時,在讀取時需要區分該字串有沒有被編碼過,對編碼過的字串需要特別處理,因為長度資訊是儲存在字串的前面的,所以可以通過在儲存長度的位置上加入編碼型別的資訊。

  我們來看看相關程式碼

  Rdb.c:557

 //從rdb檔案中中讀取長度
uint32_t rdbLoadLen(rio *rdb, int *isencoded) {
    unsigned char buf[2];
    uint32_t len;
    int type;

    if (isencoded) *isencoded = 0;
    if (rioRead(rdb,buf,1) == 0) return REDIS_RDB_LENERR;
    type = (buf[0]&0xC0)>>6;
    if (type == REDIS_RDB_ENCVAL) {
        //進行了特殊的編碼,置編碼標誌,並返回編碼型別
        if (isencoded) *isencoded = 1;
        return buf[0]&0x3F;
    } else if (type == REDIS_RDB_6BITLEN) {
        /* Read a 6 bit len. */
        return buf[0]&0x3F;
    } else if (type == REDIS_RDB_14BITLEN) {
        /* Read a 14 bit len. */
        if (rioRead(rdb,buf+1,1) == 0) return REDIS_RDB_LENERR;
        return ((buf[0]&0x3F)<<8)|buf[1];
    } else {
        /* Read a 32 bit len. */
        if (rioRead(rdb,&len,4) == 0) return REDIS_RDB_LENERR;
        return ntohl(len);
    }
}


  我們可以看到,在讀取rdb檔案時,當發現長度型別是REDIS_RDB_ENCVAL,把編碼型別返回。

  我們來看看知道編碼型別後的處理

  Rdb.c

/*
 * 根據編碼 encode ,從 rdb 檔案中讀取字串,並返回字串物件
 */
robj *rdbGenericLoadStringObject(rio *rdb, int encode) {
    int isencoded;
    uint32_t len;
    sds val;

    // 獲取字串的長度
    len = rdbLoadLen(rdb,&isencoded);
    if (isencoded) {
    	//如果被編碼了,則返回的是編碼型別
        switch(len) {
        case REDIS_RDB_ENC_INT8:
        case REDIS_RDB_ENC_INT16:
        case REDIS_RDB_ENC_INT32:
            // 位元組串是整數,建立整數物件並返回
            return rdbLoadIntegerObject(rdb,len,encode);
        case REDIS_RDB_ENC_LZF:
            // 位元組串是被 lzf 演算法壓縮的字串
            return rdbLoadLzfStringObject(rdb);
        default:
            redisPanic("Unknown RDB encoding type");
        }
    }

    if (len == REDIS_RDB_LENERR) return NULL;

    // 建立一個指定長度的 sds
    val = sdsnewlen(NULL,len);
    if (len && rioRead(rdb,val,len) == 0) {
        sdsfree(val);
        return NULL;
    }
    // 根據 sds ,建立字串物件
    return createObject(REDIS_STRING,val);
}

整個流程是:

  • 讀取長度
  • 如果長度型別是有編碼資訊的,則根據編碼型別進行讀取
  • 如果長度型別是有效長度,則根據長度資訊讀取字串

  REDIS_EXPIRETIME型別

  • 如果一個key被expire設定過,那麼在該key與value的前面會有一個REDIS_EXPIRETIME型別與其對應的值。
  • REDIS_EXPIRETIME型別對應的值是過期時間點的timestamp
  • REDIS_EXPIRETIME型別與其值是可選的,不是必須的,只有被expire設定過的key才有這個值

  假設有一個key被expire命令設定過,把這REDIS_EXPIRETIME型別代入到上邊的rdb檔案的格式中,那麼rdb檔案的整體格式變成為:

檔案簽名 | 版本號 | REDIS_SELECTDB型別 | db編號 | REDIS_EXPIRETIME型別 | timestamp | 型別 | 值 | … | REDIS_SELECTD 型別 | db編號 | 型別 | 值 | … | REDIS_EOF型別

  資料型別

  資料型別主要有以下型別:

/* Dup object types to RDB object types. Only reason is readability (are we
 * dealing with RDB types or with in-memory object types?).
 *
 * 物件型別在 RDB 檔案中的型別
 */
#define REDIS_RDB_TYPE_STRING 0
#define REDIS_RDB_TYPE_LIST   1
#define REDIS_RDB_TYPE_SET    2
#define REDIS_RDB_TYPE_ZSET   3
#define REDIS_RDB_TYPE_HASH   4

/* Object types for encoded objects. */
/*
 * 編碼物件的方式
 */
#define REDIS_RDB_TYPE_HASH_ZIPMAP    9
#define REDIS_RDB_TYPE_LIST_ZIPLIST  10
#define REDIS_RDB_TYPE_SET_INTSET    11
#define REDIS_RDB_TYPE_ZSET_ZIPLIST  12
#define REDIS_RDB_TYPE_HASH_ZIPLIST  13


下邊以REDIS_RDB_TYPE_STRING和REDIS_RDB_TYPE_LIST型別為例進行詳解,其他的型別都類似

        型別

        REDIS_RDB_TYPE_STRING | 值

       假設rdb檔案由一個值是REDIS_RDB_TYPE_STRING型別,比如執行了一個set mykey mevalue的命令,則在rdb檔案中表示為

REDIS_RDB_TYPE_STRING型別 | 值

其中值包含了key的長度,key的值,val的長度和val的值,把REDIS_RDB_TYPE_STRING型別值的格式代入得:

REDIS_RDB_TYPE_STRING型別 | keylen | mykey | vallen | myval

  型別用一個位元組表示,長度的儲存格式見rdb中長度的儲存

相關的程式碼:

/* Save a key-value pair, with expire time, type, key, value.
 * 儲存鍵值對,值的型別,以及它的過期時間(如果有的話)。
 *
 * On error -1 is returned.
 * 出錯返回 -1 。
 *
 * On success if the key was actaully saved 1 is returned, otherwise 0
 * is returned (the key was already expired). 
 *
 * 如果 key 已經過期,放棄儲存,返回 0 。
 * 如果 key 儲存成功,返回 1 。
 */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val,
                        long long expiretime, long long now)
{
    /* Save the expire time */
    // 儲存過期時間
    if (expiretime != -1) {
        /* If this key is already expired skip it */
        // key 已過期,直接跳過
        if (expiretime < now) return 0;

        if (rdbSaveType(rdb,REDIS_RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
    }

    /* Save type, key, value */
    // 儲存值型別
    if (rdbSaveObjectType(rdb,val) == -1) return -1;
    // 儲存 key
    if (rdbSaveStringObject(rdb,key) == -1) return -1;
    // 儲存 value
    if (rdbSaveObject(rdb,val) == -1) return -1;

    return 1;
}

  REDIS_LIST型別

  1.List

如果以linkedlist實現list

REDIS_LIST | keylen | key | listlen | len | value | len | value

  Listlen是連結串列長度

  Len是連結串列結點的值value的長度

  Value是連結串列結點的值

  2.Ziplist

REDIS_ENCODING_ZIPLIST| keylen | key   | ziplist

  Ziplist就是通過字串來實現的,直接將其儲存於rdb檔案中即可

       相關程式碼:

/* Save a Redis object. Returns -1 on error, 0 on success. */
/*
 * 將 Redis 物件寫入到 rdb 。
 *
 * 出錯返回 -1 ,寫入成功返回 0 。
 */
int rdbSaveObject(rio *rdb, robj *o) {
    int n, nwritten = 0;

    if (o->type == REDIS_STRING) {
        /* Save a string value */
        // 字串直接儲存
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
        nwritten += n;
    } else if (o->type == REDIS_LIST) {
        /* Save a list value */
        if (o->encoding == REDIS_ENCODING_ZIPLIST) {
            // 儲存 ziplist 佔用的位元組數量
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);

            // 以字串形式儲存整個 ziplist
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else if (o->encoding == REDIS_ENCODING_LINKEDLIST) {
            list *list = o->ptr;
            listIter li;
            listNode *ln;

            // </span><strong><span style="color:#ff0000;">儲存入節點數量</span></strong><span style="color:#333333;">
            if ((n = rdbSaveLen(rdb,listLength(list))) == -1) return -1;
            nwritten += n;

            // </span><strong><span style="color:#ff0000;">遍歷所有連結串列節點,取出值,並以字元形式儲存它們</span></strong><span style="color:#333333;">
            listRewind(list,&li);
            while((ln = listNext(&li))) {
                robj *eleobj = listNodeValue(ln);
                if ((n = </span><strong><span style="color:#ff0000;">rdbSaveStringObject</span></strong><span style="color:#333333;">(rdb,eleobj)) == -1) return -1;
                nwritten += n;
            }
        } else {
            redisPanic("Unknown list encoding");
        }
    } else if (o->type == </span><strong><span style="color:#ff0000;">REDIS_SET</span></strong><span style="color:#333333;">) {
        /* Save a set value */
        if (o->encoding == </span><strong><span style="color:#ff0000;">REDIS_ENCODING_HT</span></strong><span style="color:#333333;">) {
            dict *set = o->ptr;
            dictIterator *di = dictGetIterator(set);
            dictEntry *de;

            //</span><strong><span style="color:#ff0000;"> 儲存集合的基數</span></strong><span style="color:#333333;">
            if ((n = rdbSaveLen(rdb,dictSize(set))) == -1) return -1;
            nwritten += n;

            // 取出所有成員的值,並以字串形式儲存它們
            while((de = dictNext(di)) != NULL) {
                robj *eleobj = dictGetKey(de);
                if ((n = </span><span style="color:#ff0000;"><strong>rdbSaveStringObjec</strong></span><span style="color:#333333;">t(rdb,eleobj)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else if (o->encoding == </span><strong><span style="color:#ff0000;">REDIS_ENCODING_INTSET</span></strong><span style="color:#333333;">) {
            // 儲存 intset 佔用的位元組數量
            size_t l = intsetBlobLen((intset*)o->ptr);

            </span><strong><span style="color:#ff0000;">// 以字串形式儲存整個 intset</span></strong><span style="color:#333333;">
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else {
            redisPanic("Unknown set encoding");
        }
    } else if (o->type == </span><strong><span style="color:#ff0000;">REDIS_ZSET</span></strong><span style="color:#333333;">) {
        /* Save a sorted set value */
        if (o->encoding == </span><span style="color:#ff0000;"><strong>REDIS_ENCODING_ZIPLIST</strong></span><span style="color:#333333;">) {
            // 儲存 ziplist 佔用的位元組數
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
            
            </span><span style="color:#ff0000;"><strong>// 將整個 ziplist 以字串形式儲存</strong></span><span style="color:#333333;">
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;
        } else if (o->encoding == </span><span style="color:#ff0000;"><strong>REDIS_ENCODING_SKIPLIST</strong></span><span style="color:#333333;">) {
            zset *zs = o->ptr;
            dictIterator *di = dictGetIterator(zs->dict);
            dictEntry *de;

            </span><span style="color:#ff0000;">// 儲存有序整合員的基數</span><span style="color:#333333;">
            if ((n = rdbSaveLen(rdb,dictSize(zs->dict))) == -1) return -1;
            nwritten += n;

            // 遍歷整個字典,儲存所有有序整合員
            while((de = dictNext(di)) != NULL) {
                robj *eleobj = dictGetKey(de);
                double *score = dictGetVal(de);

                </span><span style="color:#ff0000;"><strong>// 儲存 member</strong></span><span style="color:#333333;">
                if ((n = rdbSaveStringObject(rdb,eleobj)) == -1) return -1;
                nwritten += n;

                </span><span style="color:#ff0000;"><strong>// 儲存 score</strong></span><span style="color:#333333;">
                if ((n = rdbSaveDoubleValue(rdb,*score)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);
        } else {
            redisPanic("Unknown sorted set encoding");
        }
    } else if (o->type == </span><strong><span style="color:#ff0000;">REDIS_HASH</span></strong><span style="color:#333333;">) {
        /* Save a hash value */
        if (o->encoding == </span><span style="color:#ff0000;"><strong>REDIS_ENCODING_ZIPLIST</strong></span><span style="color:#333333;">) {
            // 儲存 ziplist 佔用的位元組數
            size_t l = ziplistBlobLen((unsigned char*)o->ptr);
            
            </span><span style="color:#ff0000;"><strong>// 將整個 ziplist 儲存為字串</strong></span><span style="color:#333333;">
            if ((n = rdbSaveRawString(rdb,o->ptr,l)) == -1) return -1;
            nwritten += n;

        } else if (o->encoding == </span><span style="color:#ff0000;"><strong>REDIS_ENCODING_HT</strong></span><span style="color:#333333;">) {
            dictIterator *di = dictGetIterator(o->ptr);
            dictEntry *de;
            
            </span><span style="color:#ff0000;"><strong>// 記錄字典的鍵值對數量</strong></span><span style="color:#333333;">
            if ((n = rdbSaveLen(rdb,dictSize((dict*)o->ptr))) == -1) return -1;
            nwritten += n;

            // 遍歷整個字典,將所有鍵值對儲存到 rdb 檔案
            while((de = dictNext(di)) != NULL) {
                robj *key = dictGetKey(de);
                robj *val = dictGetVal(de);

                </span><span style="color:#ff0000;"><strong>// 儲存 key</strong></span><span style="color:#333333;">
                if ((n = rdbSaveStringObject(rdb,key)) == -1) return -1;
                nwritten += n;

                </span><span style="color:#ff0000;"><strong>// 儲存 value</strong></span><span style="color:#333333;">
                if ((n = rdbSaveStringObject(rdb,val)) == -1) return -1;
                nwritten += n;
            }
            dictReleaseIterator(di);

        } else {
            redisPanic("Unknown hash encoding");
        }

    } else {
        redisPanic("Unknown object type");
    }

    return nwritten;
}</span>

  快照儲存

  我們接下來看看具體實現細節

  不管是觸發條件滿足後通過fork子程序來儲存快照還是通過save命令來觸發,其實都是呼叫的同一個函式rdbSave()。

  先來看看觸發條件滿足後通過fork子程序的實現儲存快照的的實現

在每100ms呼叫一次的serverCron函式中會對快照儲存的條件進行檢查,如果滿足了則進行快照儲存

  如果後端有寫rdb的子程序或者寫aof的子程序,則檢查rdb子程序是否退出了,如果退出了則進行一些收尾處理,比如更新髒資料計數server.dirty和最近快照儲存時間server.lastsave。

  如果後端沒有寫rdb的子程序且沒有寫aof的子程序,則判斷下是否有觸發寫rdb的條件滿足了,如果有條件滿足,則通過呼叫rdbSaveBackground函式進行快照儲存。

  跟著進rdbSaveBackground函式裡邊看看

<span style="color:#333333;">/*
 * 使用子程序儲存資料庫資料,不阻塞主程序
 */
int rdbSaveBackground(char *filename) {
    pid_t childpid;
    long long start;
     //已經在執行
    if (server.rdb_child_pid != -1) return REDIS_ERR;
    
    // 修改伺服器狀態
    server.dirty_before_bgsave = server.dirty;

    // 開始時間
    start = ustime();
    // 建立子程序
    if ((childpid = fork()) == 0) {
        int retval;

        /* Child */
        // 子程序不接收網路資料
        if (server.ipfd > 0) close(server.ipfd);
        if (server.sofd > 0) close(server.sofd);

        // 儲存資料
       </span><strong><span style="color:#ff0000;"> retval = rdbSave(filename);</span></strong><span style="color:#333333;">
       // .......
        // 退出子程序
        exitFromChild((retval == REDIS_OK) ? 0 : 1);
    } else {
        /* Parent */
        // 記錄最後一次 fork 的時間
        server.stat_fork_time = ustime()-start;

        // 建立子程序失敗時進行錯誤報告
        if (childpid == -1) {..... }
        redisLog(REDIS_NOTICE,"Background saving started by pid %d",childpid);

        // 記錄儲存開始的時間
        server.rdb_save_time_start = time(NULL);
        // 記錄子程序的 id
        server.rdb_child_pid = childpid;
        // 在執行時關閉對資料庫的 rehash
        // 避免 copy-on-write
        updateDictResizePolicy();
        return REDIS_OK;
    }
    return REDIS_OK; /* unreached */
}</span>

  rdb的快照儲存是通過函式rdbSave函式(rdb.c:394)來實現的。其實save命令也是通過呼叫這個函式來實現的。我們來簡單看看

  rdb.c

/* Save the DB on disk. Return REDIS_ERR on error, REDIS_OK on success */
/*
 * 將資料庫儲存到磁碟上。成功返回 REDIS_OK ,失敗返回 REDIS_ERR 。
 */
int rdbSave(char *filename) {
    dictIterator *di = NULL;
    dictEntry *de;
    char tmpfile[256];
    char magic[10];
    int j;
    long long now = mstime();
    FILE *fp;
    rio rdb;
    uint64_t cksum;

    // 以 "temp-<pid>.rdb" 格式建立臨時檔名
    snprintf(tmpfile,256,"temp-%d.rdb", (int) getpid());
    fp = fopen(tmpfile,"w");
    if (!fp) {
        redisLog(REDIS_WARNING, "Failed opening .rdb for saving: %s",
            strerror(errno));
        return REDIS_ERR;
    }

    // 初始化 rio 檔案
    rioInitWithFile(&rdb,fp);
    // 如果有需要的話,設定校驗和計算函式
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;
    // 以 "REDIS <VERSION>" 格式寫入檔案頭,以及 RDB 的版本
    snprintf(magic,sizeof(magic),"REDIS%04d",REDIS_RDB_VERSION);
    if (rdbWriteRaw(&rdb,magic,9) == -1) goto werr;

    // 遍歷所有資料庫,儲存它們的資料
    for (j = 0; j < server.dbnum; j++) {
        // 指向資料庫
        redisDb *db = server.db+j;
        // 指向資料庫 key space
        dict *d = db->dict;
        // 資料庫為空, pass ,處理下個數據庫
        if (dictSize(d) == 0) continue;

        // 建立迭代器
        di = dictGetSafeIterator(d);
        if (!di) {
            fclose(fp);
            return REDIS_ERR;
        }

        /* Write the SELECT DB opcode */
        // 記錄正在使用的資料庫的號碼
        if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_SELECTDB) == -1) goto werr;
        if (rdbSaveLen(&rdb,j) == -1) goto werr;

        /* Iterate this DB writing every entry */
        // 將資料庫中的所有節點儲存到 RDB 檔案
        while((de = dictNext(di)) != NULL) {
            // 取出鍵
            sds keystr = dictGetKey(de);
            // 取出值
            robj key, 
                 *o = dictGetVal(de);
            long long expire;
            
            initStaticStringObject(key,keystr);
            // 取出過期時間
            expire = getExpire(db,&key);
            if (<span style="background-color: rgb(255, 0, 0);"><strong>rdbSaveKeyValuePair</strong></span>(&rdb,&key,o,expire,now) == -1) goto werr;
        }
        dictReleaseIterator(di);
    }
    di = NULL; /* So that we don't release it again on error. */

    /* EOF opcode */
    if (rdbSaveType(&rdb,REDIS_RDB_OPCODE_EOF) == -1) goto werr;

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the
     * loading code skips the check in this case. */
    cksum = rdb.cksum;
    memrev64ifbe(&cksum);
    rioWrite(&rdb,&cksum,8);

    /* Make sure data will not remain on the OS's output buffers */
    fflush(fp);
    fsync(fileno(fp));
    fclose(fp);

    /* Use RENAME to make sure the DB file is changed atomically only
     * if the generate DB file is ok. */
    // 將臨時檔案 tmpfile 改名為 filename 
    if (rename(tmpfile,filename) == -1) {
        redisLog(REDIS_WARNING,"Error moving temp DB file on the final destination: %s", strerror(errno));
        unlink(tmpfile);
        return REDIS_ERR;
    }

    redisLog(REDIS_NOTICE,"DB saved on disk");

    // 初始化資料庫資料
    server.dirty = 0;
    server.lastsave = time(NULL);
    server.lastbgsave_status = REDIS_OK;

    return REDIS_OK;

werr:
    fclose(fp);
    unlink(tmpfile);
    redisLog(REDIS_WARNING,"Write error saving DB on disk: %s", strerror(errno));
    if (di) dictReleaseIterator(di);
    return REDIS_ERR;
}

  建立並開啟臨時檔案

  寫入檔案簽名“REDIS”和版本號

  遍歷所有db中的所有key

  對每個key,先判斷是否設定了expireTime, 如果設定了,則儲存expireTime到rdb檔案中。

rdbSaveKeyValuePair函式儲存沒個鍵值對,這個函式在上面介紹過了

  不同型別有有不同的儲存格式,詳細見rdb檔案格式

  最後寫入rdb檔案的結束符

  關閉檔案並重命名臨時檔名到正式檔名

  更新髒資料計數server.dirty為0和最近寫rdb檔案的時間server.lastsave為當前時間,這個只是在通過save命令觸發的情況下有用。因為如果是通過fork一個子程序來寫rdb檔案的,更新無效,因為更新的是子程序的資料。

如果是通過fork一個子程序來寫rdb檔案(即不是通過save命令觸發的),在寫rdb檔案的過程中,可能又有一些資料被更改了,那此時的髒資料計數server.dirty怎麼更新呢? redis是怎樣處理的呢?

  我們來看看寫rdb的子程序退出時的處理

  Redis.c(serverCron())

// 如果 BGSAVE 或者 BGREWRITEAOF 正在進行
    // 那麼檢查它們是否已經執行完畢
    if (server.rdb_child_pid != -1 || server.aof_child_pid != -1) {
        int statloc;
        pid_t pid;

        if ((pid = wait3(&statloc,WNOHANG,NULL)) != 0) {
            int exitcode = WEXITSTATUS(statloc);
            int bysignal = 0;
            
            if (WIFSIGNALED(statloc)) bysignal = WTERMSIG(statloc);

            if (pid == server.rdb_child_pid) {
                backgroundSaveDoneHandler(exitcode,bysignal);
            } else if (pid == server.aof_child_pid) {
                backgroundRewriteDoneHandler(exitcode,bysignal);
            } else {
                redisLog(REDIS_WARNING,
                    "Warning, detected child with unmatched pid: %ld",
                    (long)pid);
            }
            // 如果 BGSAVE 和 BGREWRITEAOF 都已經完成,那麼重新開始 REHASH
            updateDictResizePolicy();
        }
    } 

 如果捕捉到寫rdb檔案的子程序退出,則呼叫backgroundSaveDoneHandler進行處理

  接著看看backgroundSaveDoneHandler函式

  Rdb.c

<span style="color:#333333;">/* A background saving child (BGSAVE) terminated its work. Handle this. */
/*
 * 根據 BGSAVE 子程序的返回值,對伺服器狀態進行更新
 */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {
    // 儲存成功
    if (!bysignal && exitcode == 0) {
        redisLog(REDIS_NOTICE, "Background saving terminated with success");
       </span><strong><span style="color:#ff0000;"> server.dirty = server.dirty - server.dirty_before_bgsave;</span></strong><span style="color:#333333;">
        </span><strong><span style="color:#ff0000;">server.lastsave = time(NULL);</span></strong><span style="color:#333333;">
        server.lastbgsave_status = REDIS_OK;
    // 儲存失敗
    } else if (!bysignal && exitcode != 0) {
        redisLog(REDIS_WARNING, "Background saving error");
        server.lastbgsave_status = REDIS_ERR;
    // 子程序被終結
    } else {
        redisLog(REDIS_WARNING,
            "Background saving terminated by signal %d", bysignal);
        rdbRemoveTempFile(server.rdb_child_pid);
        server.lastbgsave_status = REDIS_ERR;
    }

    // 更新伺服器狀態
    server.rdb_child_pid = -1;
    server.rdb_save_time_last = time(NULL)-server.rdb_save_time_start;
    server.rdb_save_time_start = -1;

    /* Possibly there are slaves waiting for a BGSAVE in order to be served
     * (the first stage of SYNC is a bulk transfer of dump.rdb) */
    // 將 rdb 檔案儲存完畢的訊息報告可能正在等待複製的附屬節點
    updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}</span>

  更新髒資料計數server.dirty和最近寫rdb檔案的時間server.lastsave為當前時間

       注意這裡AOF的處理方式的不同,對於AOF,他是將快照期間的資料同時寫入了一個buf和AOF日誌檔案,當rewrite結束後,將buf中的資料追加到AOF中就可以保證資料的完整,但是在RDB中,沒有必要這樣做,因為RDB本來就不需要保證資料的完全完整,所以只需要更新髒資料,將髒資料設定為這期間被修改的數量就可以了。

  快照匯入

  當redis因為停電或者某些原因掛掉了,此時重啟redis時,我們就需要從rdb檔案中讀取快照檔案,把儲存到rdb檔案中的資料重新匯入到記憶體中。

  先來看看啟動時對快照匯入的處理

  Redis.c

<span style="color:#333333;">// 從 RDB 檔案或 AOF 檔案中載入資料到記憶體
void loadDataFromDisk(void) {
    long long start = ustime();

    // 如果開啟了 AOF 功能,那麼優先使用 AOF 檔案來還原資料
    if (server.aof_state == REDIS_AOF_ON) {
        if (</span><strong><span style="color:#ff0000;">loadAppendOnlyFile</span></strong><span style="color:#333333;">(server.aof_filename) == REDIS_OK)
            redisLog(REDIS_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);
    } else {
        // 在沒有開啟 AOF 功能時,才使用 RDB 來還原
        if (</span><strong><span style="color:#ff0000;">rdbLoad</span></strong><span style="color:#333333;">(server.rdb_filename) == REDIS_OK) {
            redisLog(REDIS_NOTICE,"DB loaded from disk: %.3f seconds",
                (float)(ustime()-start)/1000000);
        } else if (errno != ENOENT) {
            redisLog(REDIS_WARNING,"Fatal error loading the DB. Exiting.");
            exit(1);
        }
    }
}</span>

  如果儲存了AOF檔案,則使用AOF檔案來恢復資料,AOF的具體內容見AOF

  如果沒有AOF,則使用rdb檔案恢復資料,呼叫rdbLoad函式

  接著看看rdbLoad函式

  Rdb.c

<span style="color:#333333;">/*
 * 讀取 rdb 檔案,並將其中的物件儲存到記憶體中
 */
int rdbLoad(char *filename) {
    uint32_t dbid;
    int type, rdbver;
    redisDb *db = server.db+0;
    char buf[1024];
    long long expiretime, now = mstime();
    long loops = 0;
    FILE *fp;
    rio rdb;

    // 開啟檔案
    fp = fopen(filename,"r");
    if (!fp) {
        errno = ENOENT;
        return REDIS_ERR;
    }

    // 初始化 rdb 檔案
    rioInitWithFile(&rdb,fp);
    if (server.rdb_checksum)
        rdb.update_cksum = rioGenericUpdateChecksum;

    // 檢查 rdb 檔案頭(“REDIS”字串,以及版本號)
    if (rioRead(&rdb,buf,9) == 0) goto eoferr;
    buf[9] = '\0';
    if (memcmp(buf,"REDIS",5) != 0) {   // "REDIS"
        fclose(fp);
        redisLog(REDIS_WARNING,"Wrong signature trying to load DB from file");
        errno = EINVAL;
        return REDIS_ERR;
    }
    rdbver = atoi(buf+5);   // 版本號
    if (rdbver < 1 || rdbver > REDIS_RDB_VERSION) {
        fclose(fp);
        redisLog(REDIS_WARNING,"Can't handle RDB format version %d",rdbver);
        errno = EINVAL;
        return REDIS_ERR;
    }

    startLoading(fp);
    while(1) {
        robj *key, *val;
        expiretime = -1;

        /* Serve the clients from time to time */
        // 間隔性服務客戶端
        if (!(loops++ % 1000)) {
            // 重新整理載入程序資訊
            loadingProgress(rioTell(&rdb));
            // 處理事件
            aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT);
        }

        /* Read type. */
        // 讀入型別識別符號
        if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;

        // 接下來的值是一個過期時間
        if (type == REDIS_RDB_OPCODE_EXPIRETIME) {
            // 讀取毫秒計數的過期時間
            if ((expiretime = rdbLoadTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            // 讀取下一個值(一個字串 key )的型別識別符號
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
            /* the EXPIRETIME opcode specifies time in seconds, so convert
             * into milliesconds. */
             // 將毫秒轉換為秒
            expiretime *= 1000;
        } else if (type == REDIS_RDB_OPCODE_EXPIRETIME_MS) {
            /* Milliseconds precision expire times introduced with RDB
             * version 3. */
            // 讀取毫秒計數的過期時間
            if ((expiretime = rdbLoadMillisecondTime(&rdb)) == -1) goto eoferr;
            /* We read the time so we need to read the object type again. */
            // 讀取下一個值(一個字串 key )的型別識別符號
            if ((type = rdbLoadType(&rdb)) == -1) goto eoferr;
        }
    
        // 到達 EOF ,跳出
        if (type == REDIS_RDB_OPCODE_EOF)
            break;

        /* Handle SELECT DB opcode as a special case */
        // 資料庫號碼識別符號
        if (type == REDIS_RDB_OPCODE_SELECTDB) {
            // 讀取資料庫號
            if ((dbid = rdbLoadLen(&rdb,NULL)) == REDIS_RDB_LENERR)
                goto eoferr;
            // 檢查資料庫號是否合法
            if (dbid >= (unsigned)server.dbnum) {
                redisLog(REDIS_WARNING,"FATAL: Data file was created with a Redis server configured to handle more than %d databases. Exiting\n", server.dbnum);
                exit(1);
            }
            db = server.db+dbid;
            continue;
        }

        /* Read key */
        // 讀入 key
        if ((key = </span><strong><span style="color:#ff0000;">rdbLoadStringObject(&rdb)</span></strong><span style="color:#333333;">) == NULL) goto eoferr;

        /* Read value */
        // 讀入 value
        if ((val = </span><strong><span style="color:#ff0000;">rdbLoadObject(type,&rdb)</span></strong><span style="color:#333333;">) == NULL) goto eoferr;

        /* Check if the key already expired. This function is used when loading
         * an RDB file from disk, either at startup, or when an RDB was
         * received from the master. In the latter case, the master is
         * responsible for key expiry. If we would expire keys here, the
         * snapshot taken by the master may not be reflected on the slave. */
        // 如果 key 已經過期,那麼釋放 key 和 value
        if (server.masterhost == NULL && expiretime != -1 && expiretime < now) {
            decrRefCount(key);
            decrRefCount(val);
            continue;
        }

        /* Add the new object in the hash table */
        // 將物件新增到資料庫
        </span><strong><span style="color:#ff0000;">dbAdd(db,key,val);</span></strong><span style="color:#333333;">

        /* Set the expire time if needed */
        // 如果有過期時間,設定過期時間
        if (expiretime != -1) setExpire(db,key,expiretime);

        decrRefCount(key);
    }

    /* Verify the checksum if RDB version is >= 5 */
    // 檢查校驗和
    if (rdbver >= 5 && server.rdb_checksum) {
        uint64_t cksum, expected = rdb.cksum;

        if (rioRead(&rdb,&cksum,8) == 0) goto eoferr;
        memrev64ifbe(&cksum);
        if (cksum == 0) {
            redisLog(REDIS_WARNING,"RDB file was saved with checksum disabled: no check performed.");
        } else if (cksum != expected) {
            redisLog(REDIS_WARNING,"Wrong RDB checksum. Aborting now.");
            exit(1);
        }
    }

    fclose(fp);
    stopLoading();
    return REDIS_OK;

eoferr: /* unexpected end of file is handled here with a fatal exit */
    redisLog(REDIS_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now.");
    exit(1);
    return REDIS_ERR; /* Just to avoid warning */
}</span>

  開啟rdb檔案

  讀取rdb檔案的簽名和版本號

  開始進入 型別 | 值 | 型別 | 值 的迴圈讀取,可參考rdb檔案格式

  作者還做了匯入的進度條,是有人反饋說rdb檔案很大時匯入時要很久,但又不知道進度,所以作者就加了匯入的進度條,改善使用者體驗

  讀取型別

  如果型別是過期時間型別REDIS_EXPIRETIME,則讀取過期時間

  如果型別是檔案結束型別REDIS_EOF,則跳出 型別 | 值 | 型別 | 值 的迴圈讀取

  如果型別是選擇db型別REDIS_SELECTDB,則讀取db索引並把當前db轉成該db,然後繼續 型別 | 值 | 型別 | 值 的迴圈讀取。

  如果不是以上型別,則表明該型別是資料型別,讀取作為key的字串,即讀取字串型別的值,然後接著讀取作為value的字串。不同型別的編碼不一樣,根據寫入時得規則解釋讀取到的值即可

  讀取到key和value後,判斷下該key是否過期,如果過期則丟棄,不再匯入,然後繼續 型別 | 值 | 型別 | 值 的迴圈讀取。

  如果讀取成功,則匯入到記憶體,如果有過期時間則設定過期時間

  總結

  落地儲存是資料設計的一大重點也是難點。原理很簡單,定義某種協議,然後按照某種協議寫入讀出。Redis為了節省空間和讀寫時的I/O操作,做了很多很細緻的工作來壓縮資料。另外redis的豐富的資料型別也加大了落地的實現難度。作者也曾經在他的部落格說過,redis的豐富的資料型別導致了很多經典的優化辦法無法在redis上實現。

參考:

http://www.searchdatabase.com.cn/showcontent_62814.htm