1. 程式人生 > >redis原始碼分析與思考(十三)——字串型別的命令實現(t_string.c)

redis原始碼分析與思考(十三)——字串型別的命令實現(t_string.c)

    在對字串操作的命令中,主要有增加刪查該、批處理操作以及編碼的轉換命令,現在列出對字串物件操作的主要常用命令:

常用命令表

命令 對應操作 時間複雜度
set key value 增加鍵值對 O(1)
setex key seconds value 增加鍵值對,且設定秒級的過期時間 O(1)
psetex key milliseconds value 增加鍵值對,且設定毫秒級的過期時間 O(1)
setnx key value 增加鍵值對,鍵不存在才能成功 O(1)
set key value xx 修改鍵值,鍵必須存在 O(1)
mset key value [key value…] 批量設定鍵 O(n)
mget key [key…] 批量獲得鍵值 O(n)
get key 獲得鍵值 O(1)
del key 刪除鍵 O(1)
incr key 鍵值自增 O(1)
decr key 鍵值自減 O(1)
incrby key increment 自增指定數字 O(1)
decrby key decrement 自減指定數字 O(1)
incrybyfloat key increment 自增浮點數 O(1)
append key value 追加值 O(1)
strlen key 獲取鍵值的長度 O(1)
setrange key offset value 設定指定位置的字元 O(1)
getrange key start end 獲取指定偏移量的鍵值 O(n)

    在每次增加鍵的時候,都會檢查其大小是否超過了512M:

static int checkStringLength(redisClient *c, long long size) {
    if (size > 512*1024*1024) {
    //返回客戶端錯誤資訊
        addReplyError(c,"string exceeds maximum allowed size (512MB)");
        return REDIS_ERR;
    }
    return REDIS_OK;
}

鍵的增加

    鍵增加的策略是先檢查是否超了大小,再檢查是否有生存時間,最後檢查輸入命令的引數,下面列出SET命令的底層實現:

#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0)     /* Set if key not exists. */
#define REDIS_SET_XX (1<<1)     /* Set if key exists. */

void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
    long long milliseconds = 0; /* initialized to avoid any harmness warning */
    // 取出過期時間
    if (expire) {
        // 將取 expire 引數的值取出來
        if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
            return;
        // expire 引數的值不正確時報錯
        if (milliseconds <= 0) {
            addReplyError(c,"invalid expire time in SETEX");
            return;
        }
        // 不論輸入的過期時間是秒還是毫秒
        // Redis 實際都以毫秒的形式儲存過期時間
        // 如果輸入的過期時間為秒,那麼將它轉換為毫秒
        if (unit == UNIT_SECONDS) milliseconds *= 1000;
    }
    // 如果設定了 NX 或者 XX 引數,那麼檢查條件是否不符合這兩個設定
    // 在條件不符合時報錯,報錯的內容由 abort_reply 引數決定
    if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
        (flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL))
    {
        addReply(c, abort_reply ? abort_reply : shared.nullbulk);
        return;
    }
    // 將鍵值關聯到資料庫,新增或者修改
    setKey(c->db,key,val);
    // 將資料庫設為髒
    server.dirty++;
    // 為鍵設定過期時間
    if (expire) setExpire(c->db,key,mstime()+milliseconds);
    // 傳送事件通知,通知鍵的增加情況
    notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id);
    // 傳送事件通知
    if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
        "expire",key,c->db->id);
    // 設定成功,向客戶端傳送回覆
    // 回覆的內容由 ok_reply 決定
    addReply(c, ok_reply ? ok_reply : shared.ok);
}

    SET命令的實現,先判斷命令引數是哪個,再呼叫setGenericCommand函式:

void setCommand(redisClient *c) {
    int j;
    robj *expire = NULL;
    int unit = UNIT_SECONDS;
    int flags = REDIS_SET_NO_FLAGS;
    // 判斷選項引數
    for (j = 3; j < c->argc; j++) {
        char *a = c->argv[j]->ptr;
        robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
        if ((a[0] == 'n' || a[0] == 'N') &&
            (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_NX;
        } else if ((a[0] == 'x' || a[0] == 'X') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
            flags |= REDIS_SET_XX;
        } else if ((a[0] == 'e' || a[0] == 'E') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_SECONDS;
            expire = next;
            j++;
        } else if ((a[0] == 'p' || a[0] == 'P') &&
                   (a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
            unit = UNIT_MILLISECONDS;
            expire = next;
            j++;
        } else {
            addReply(c,shared.syntaxerr);
            return;
        }
    }
    // 嘗試對值物件進行編碼,如果是raw與embstr字串就嘗試將其裝換為int存貯
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}

    tryObjectEncoding函式的作用是對字串物件進行重新編碼,看是否能將其裝換為int編碼存貯,raw是否能轉換為emstr格式,需要物件是raw與embstr格式字串時,這樣可以節約記憶體:

robj *tryObjectEncoding(robj *o) {
    long value;
    sds s = o->ptr;
    size_t len;
    redisAssertWithInfo(NULL,o,o->type == REDIS_STRING)// 只在字串的編碼為 RAW 或者 EMBSTR 時嘗試進行編碼
    if (!sdsEncodedObject(o)) return o;
     // 不對共享物件進行編碼
     if (o->refcount > 1) return o;
    // 對字串進行檢查
    // 只對長度小於或等於 21 位元組,並且可以被解釋為整數的字串進行編碼
    len = sdslen(s);
    if (len <= 21 && string2l(s,len,&value)) {
        if (server.maxmemory == 0 &&
            value >= 0 &&
            value < REDIS_SHARED_INTEGERS)
        {
            decrRefCount(o);
            incrRefCount(shared.integers[value]);
            return shared.integers[value];
        } else {
            if (o->encoding == REDIS_ENCODING_RAW) sdsfree(o->ptr);
            o->encoding = REDIS_ENCODING_INT;
            o->ptr = (void*) value;
            return o;
        }
    }
    // 嘗試將 RAW 編碼的字串編碼為 EMBSTR 編碼
    if (len <= REDIS_ENCODING_EMBSTR_SIZE_LIMIT) {
        robj *emb;
        if (o->encoding == REDIS_ENCODING_EMBSTR) return o;
        emb = createEmbeddedStringObject(s,sdslen(s));
        decrRefCount(o);
        return emb;
    }
    // 這個物件沒辦法進行編碼,嘗試從 SDS 中移除所有空餘空間
    if (o->encoding == REDIS_ENCODING_RAW &&
        sdsavail(s) > len/10)
    {
        o->ptr = sdsRemoveFreeSpace(o->ptr);
    }
    /* Return the original object. */
    return o;
}

    其餘SET命令:

//SETNX命令
void setnxCommand(redisClient *c) {
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}

//SETEX命令
void setexCommand(redisClient *c) {
    c->argv[3] = tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}

//PSETEX命令
void psetexCommand(redisClient *c) {
    c->argv[3] = tryObjectEncoding(c->argv[3]);
    setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}

獲取鍵

    鍵的獲取比較的簡單,在這裡直接貼出原始碼以及註釋:

//獲取鍵的底層實現
int getGenericCommand(redisClient *c) {
    robj *o;
    // 嘗試從資料庫中取出鍵 c->argv[1] 對應的值物件
    // 如果鍵不存在時,向客戶端傳送回覆資訊,並返回 NULL
    //嘗試取出鍵值
    if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
        return REDIS_OK;
    // 值物件存在,檢查它的型別
    if (o->type != REDIS_STRING) {
        // 型別錯誤
        addReply(c,shared.wrongtypeerr);
        return REDIS_ERR;
    } else {
        // 型別正確,向客戶端返回物件的值
        addReplyBulk(c,o);
        return REDIS_OK;
    }
}

//GET命令
void getCommand(redisClient *c) {
    getGenericCommand(c);
}

//GETSET命令,先返回值,再修改
void getsetCommand(redisClient *c) {
    // 取出並返回鍵的值物件
    if (getGenericCommand(c) == REDIS_ERR) return;
    // 編碼鍵的新值 c->argv[2]
    c->argv[2] = tryObjectEncoding(c->argv[2]);
    // 將資料庫中關聯鍵 c->argv[1] 和新值物件 c->argv[2]
    setKey(c->db,c->argv[1],c->argv[2]);
    // 傳送事件通知
    notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[1],c->db->id);
    // 將伺服器設為髒
    server.dirty++;
}

批處理操作

    既然有了SET與GET命令,那麼為什麼要加入批處理操作呢?其實主要是解決網路延遲的問題,假如你有10萬條資料需要插入到資料庫中,每條插入資料的命令從客戶端傳送到服務端的延遲是1ms,那麼也要花費100s的時間來傳輸。批處理的策略是將所有的命令通過一次傳輸到服務端,將網路延遲的影響降到了最少:

//MSET命令底層實現
void msetGenericCommand(redisClient *c, int nx) {
    int j, busykeys = 0;
    //檢查格式是否正確
    // 鍵值引數不是成相成對出現的,格式不正確
    if ((c->argc % 2) == 0) {
        //返回給客戶端錯誤資訊
        addReplyError(c,"wrong number of arguments for MSET");
        return;
    }
    /* Handle the NX flag. The MSETNX semantic is to return zero and don't
     * set nothing at all if at least one already key exists. */
    // 如果 nx 引數為真,那麼檢查所有輸入鍵在資料庫中是否存在
    // 只要有一個鍵是存在的,那麼就向客戶端傳送空回覆
    // 並放棄執行接下來的設定操作,保證了原子性
    if (nx) {
    //判斷其中是否有有鍵存在
        for (j = 1; j < c->argc; j += 2) {
            if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
                busykeys++;
            }
        }
        // 鍵存在
        // 傳送空白回覆,並放棄執行接下來的設定操作
        if (busykeys) {
            addReply(c, shared.czero);
            return;
        }
    }
    // 設定所有鍵值對
    for (j = 1; j < c->argc; j += 2) {
        // 對值物件進行解碼
        c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
        // 將鍵值對關聯到資料庫
        // c->argc[j] 為鍵
        // c->argc[j+1] 為值
        setKey(c->db,c->argv[j],c->argv[j+1]);
        // 傳送事件通知
        notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
    }
    // 將伺服器設為髒
    server.dirty += (c->argc-1)/2;
    // 設定成功
    // MSET 返回 OK ,而 MSETNX 返回 1
    addReply(c, nx ? shared.cone : shared.ok);
}
//MSET命令
void msetCommand(redisClient *c) {
    msetGenericCommand(c,0);
}
//MSETNX命令
void msetnxCommand(redisClient *c) {
    msetGenericCommand(c,1);
}

    MGET命令的實現:

void mgetCommand(redisClient *c) {
    int j;
    addReplyMultiBulkLen(c,c->argc-1);
    // 查詢並返回所有輸入鍵的值
    for (j = 1; j < c->argc; j++) {
        // 查詢鍵 c->argc[j] 的值
        robj *o = lookupKeyRead(c->db,c->argv[j]);
        if (o == NULL) {
            // 值不存在,向客戶端傳送空回覆
            addReply(c,shared.nullbulk);
        } else {
            if (o->type != REDIS_STRING) {
                // 值存在,但不是字串型別
                addReply(c,shared.nullbulk);
            } else {
                // 值存在,並且是字串
                addReplyBulk(c,o);
            }
        }
    }
}

修改以及獲取值物件操作

    SETRANGE命令是修改值物件中指定字元,而GETRANGE是獲取部分字串,也就是值物件:

//SETRANGE命令
void setrangeCommand(redisClient *c) {
    robj *o;
    long offset;
    sds value = c->argv[3]->ptr;
    // 取出 offset 引數
    if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != REDIS_OK)
        return;
    // 檢查 offset 引數
    if (offset < 0) {
        addReplyError(c,"offset is out of range");
        return;
    }
    // 取出鍵現在的值物件
    o = lookupKeyWrite(c->db,c->argv[1]);
    if (o == NULL) {
        // 鍵不存在於資料庫中。。。
        // value 為空,沒有什麼可設定的,向客戶端返回 0
        if (sdslen(value) == 0) {
            addReply(c,shared.czero);
            return;
        }
        // 如果設定後的長度會超過 Redis 的限制的話
        // 那麼放棄設定,向客戶端傳送一個出錯回覆
        if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK)
            return;
        // 如果 value 沒有問題,可以設定,那麼建立一個空字串值物件