redis原始碼分析與思考(十三)——字串型別的命令實現(t_string.c)
阿新 • • 發佈:2018-11-12
在對字串操作的命令中,主要有增加刪查該、批處理操作以及編碼的轉換命令,現在列出對字串物件操作的主要常用命令:
常用命令表
命令 | 對應操作 | 時間複雜度 |
---|---|---|
set key value | 增加鍵值對 | O(1) |
setex key seconds value | 增加鍵值對,且設定秒級的過期時間 | O(1) |
psetex key milliseconds value | 增加鍵值對,且設定毫秒級的過期時間 | O(1) |
setnx key value | 增加鍵值對,鍵不存在才能成功 | O(1) |
set key value xx | 修改鍵值,鍵必須存在 | O(1) |
mset key value [key value…] | 批量設定鍵 | O(n) |
mget key [key…] | 批量獲得鍵值 | O(n) |
get key | 獲得鍵值 | O(1) |
del key | 刪除鍵 | O(1) |
incr key | 鍵值自增 | O(1) |
decr key | 鍵值自減 | O(1) |
incrby key increment | 自增指定數字 | O(1) |
decrby key decrement | 自減指定數字 | O(1) |
incrybyfloat key increment | 自增浮點數 | O(1) |
append key value | 追加值 | O(1) |
strlen key | 獲取鍵值的長度 | O(1) |
setrange key offset value | 設定指定位置的字元 | O(1) |
getrange key start end | 獲取指定偏移量的鍵值 | O(n) |
在每次增加鍵的時候,都會檢查其大小是否超過了512M:
static int checkStringLength(redisClient *c, long long size) {
if (size > 512*1024*1024) {
//返回客戶端錯誤資訊
addReplyError(c,"string exceeds maximum allowed size (512MB)");
return REDIS_ERR;
}
return REDIS_OK;
}
鍵的增加
鍵增加的策略是先檢查是否超了大小,再檢查是否有生存時間,最後檢查輸入命令的引數,下面列出SET命令的底層實現:
#define REDIS_SET_NO_FLAGS 0
#define REDIS_SET_NX (1<<0) /* Set if key not exists. */
#define REDIS_SET_XX (1<<1) /* Set if key exists. */
void setGenericCommand(redisClient *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply) {
long long milliseconds = 0; /* initialized to avoid any harmness warning */
// 取出過期時間
if (expire) {
// 將取 expire 引數的值取出來
if (getLongLongFromObjectOrReply(c, expire, &milliseconds, NULL) != REDIS_OK)
return;
// expire 引數的值不正確時報錯
if (milliseconds <= 0) {
addReplyError(c,"invalid expire time in SETEX");
return;
}
// 不論輸入的過期時間是秒還是毫秒
// Redis 實際都以毫秒的形式儲存過期時間
// 如果輸入的過期時間為秒,那麼將它轉換為毫秒
if (unit == UNIT_SECONDS) milliseconds *= 1000;
}
// 如果設定了 NX 或者 XX 引數,那麼檢查條件是否不符合這兩個設定
// 在條件不符合時報錯,報錯的內容由 abort_reply 引數決定
if ((flags & REDIS_SET_NX && lookupKeyWrite(c->db,key) != NULL) ||
(flags & REDIS_SET_XX && lookupKeyWrite(c->db,key) == NULL))
{
addReply(c, abort_reply ? abort_reply : shared.nullbulk);
return;
}
// 將鍵值關聯到資料庫,新增或者修改
setKey(c->db,key,val);
// 將資料庫設為髒
server.dirty++;
// 為鍵設定過期時間
if (expire) setExpire(c->db,key,mstime()+milliseconds);
// 傳送事件通知,通知鍵的增加情況
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",key,c->db->id);
// 傳送事件通知
if (expire) notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,
"expire",key,c->db->id);
// 設定成功,向客戶端傳送回覆
// 回覆的內容由 ok_reply 決定
addReply(c, ok_reply ? ok_reply : shared.ok);
}
SET命令的實現,先判斷命令引數是哪個,再呼叫setGenericCommand函式:
void setCommand(redisClient *c) {
int j;
robj *expire = NULL;
int unit = UNIT_SECONDS;
int flags = REDIS_SET_NO_FLAGS;
// 判斷選項引數
for (j = 3; j < c->argc; j++) {
char *a = c->argv[j]->ptr;
robj *next = (j == c->argc-1) ? NULL : c->argv[j+1];
if ((a[0] == 'n' || a[0] == 'N') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_NX;
} else if ((a[0] == 'x' || a[0] == 'X') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0') {
flags |= REDIS_SET_XX;
} else if ((a[0] == 'e' || a[0] == 'E') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_SECONDS;
expire = next;
j++;
} else if ((a[0] == 'p' || a[0] == 'P') &&
(a[1] == 'x' || a[1] == 'X') && a[2] == '\0' && next) {
unit = UNIT_MILLISECONDS;
expire = next;
j++;
} else {
addReply(c,shared.syntaxerr);
return;
}
}
// 嘗試對值物件進行編碼,如果是raw與embstr字串就嘗試將其裝換為int存貯
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,flags,c->argv[1],c->argv[2],expire,unit,NULL,NULL);
}
tryObjectEncoding函式的作用是對字串物件進行重新編碼,看是否能將其裝換為int編碼存貯,raw是否能轉換為emstr格式,需要物件是raw與embstr格式字串時,這樣可以節約記憶體:
robj *tryObjectEncoding(robj *o) {
long value;
sds s = o->ptr;
size_t len;
redisAssertWithInfo(NULL,o,o->type == REDIS_STRING);
// 只在字串的編碼為 RAW 或者 EMBSTR 時嘗試進行編碼
if (!sdsEncodedObject(o)) return o;
// 不對共享物件進行編碼
if (o->refcount > 1) return o;
// 對字串進行檢查
// 只對長度小於或等於 21 位元組,並且可以被解釋為整數的字串進行編碼
len = sdslen(s);
if (len <= 21 && string2l(s,len,&value)) {
if (server.maxmemory == 0 &&
value >= 0 &&
value < REDIS_SHARED_INTEGERS)
{
decrRefCount(o);
incrRefCount(shared.integers[value]);
return shared.integers[value];
} else {
if (o->encoding == REDIS_ENCODING_RAW) sdsfree(o->ptr);
o->encoding = REDIS_ENCODING_INT;
o->ptr = (void*) value;
return o;
}
}
// 嘗試將 RAW 編碼的字串編碼為 EMBSTR 編碼
if (len <= REDIS_ENCODING_EMBSTR_SIZE_LIMIT) {
robj *emb;
if (o->encoding == REDIS_ENCODING_EMBSTR) return o;
emb = createEmbeddedStringObject(s,sdslen(s));
decrRefCount(o);
return emb;
}
// 這個物件沒辦法進行編碼,嘗試從 SDS 中移除所有空餘空間
if (o->encoding == REDIS_ENCODING_RAW &&
sdsavail(s) > len/10)
{
o->ptr = sdsRemoveFreeSpace(o->ptr);
}
/* Return the original object. */
return o;
}
其餘SET命令:
//SETNX命令
void setnxCommand(redisClient *c) {
c->argv[2] = tryObjectEncoding(c->argv[2]);
setGenericCommand(c,REDIS_SET_NX,c->argv[1],c->argv[2],NULL,0,shared.cone,shared.czero);
}
//SETEX命令
void setexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_SECONDS,NULL,NULL);
}
//PSETEX命令
void psetexCommand(redisClient *c) {
c->argv[3] = tryObjectEncoding(c->argv[3]);
setGenericCommand(c,REDIS_SET_NO_FLAGS,c->argv[1],c->argv[3],c->argv[2],UNIT_MILLISECONDS,NULL,NULL);
}
獲取鍵
鍵的獲取比較的簡單,在這裡直接貼出原始碼以及註釋:
//獲取鍵的底層實現
int getGenericCommand(redisClient *c) {
robj *o;
// 嘗試從資料庫中取出鍵 c->argv[1] 對應的值物件
// 如果鍵不存在時,向客戶端傳送回覆資訊,並返回 NULL
//嘗試取出鍵值
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL)
return REDIS_OK;
// 值物件存在,檢查它的型別
if (o->type != REDIS_STRING) {
// 型別錯誤
addReply(c,shared.wrongtypeerr);
return REDIS_ERR;
} else {
// 型別正確,向客戶端返回物件的值
addReplyBulk(c,o);
return REDIS_OK;
}
}
//GET命令
void getCommand(redisClient *c) {
getGenericCommand(c);
}
//GETSET命令,先返回值,再修改
void getsetCommand(redisClient *c) {
// 取出並返回鍵的值物件
if (getGenericCommand(c) == REDIS_ERR) return;
// 編碼鍵的新值 c->argv[2]
c->argv[2] = tryObjectEncoding(c->argv[2]);
// 將資料庫中關聯鍵 c->argv[1] 和新值物件 c->argv[2]
setKey(c->db,c->argv[1],c->argv[2]);
// 傳送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[1],c->db->id);
// 將伺服器設為髒
server.dirty++;
}
批處理操作
既然有了SET與GET命令,那麼為什麼要加入批處理操作呢?其實主要是解決網路延遲的問題,假如你有10萬條資料需要插入到資料庫中,每條插入資料的命令從客戶端傳送到服務端的延遲是1ms,那麼也要花費100s的時間來傳輸。批處理的策略是將所有的命令通過一次傳輸到服務端,將網路延遲的影響降到了最少:
//MSET命令底層實現
void msetGenericCommand(redisClient *c, int nx) {
int j, busykeys = 0;
//檢查格式是否正確
// 鍵值引數不是成相成對出現的,格式不正確
if ((c->argc % 2) == 0) {
//返回給客戶端錯誤資訊
addReplyError(c,"wrong number of arguments for MSET");
return;
}
/* Handle the NX flag. The MSETNX semantic is to return zero and don't
* set nothing at all if at least one already key exists. */
// 如果 nx 引數為真,那麼檢查所有輸入鍵在資料庫中是否存在
// 只要有一個鍵是存在的,那麼就向客戶端傳送空回覆
// 並放棄執行接下來的設定操作,保證了原子性
if (nx) {
//判斷其中是否有有鍵存在
for (j = 1; j < c->argc; j += 2) {
if (lookupKeyWrite(c->db,c->argv[j]) != NULL) {
busykeys++;
}
}
// 鍵存在
// 傳送空白回覆,並放棄執行接下來的設定操作
if (busykeys) {
addReply(c, shared.czero);
return;
}
}
// 設定所有鍵值對
for (j = 1; j < c->argc; j += 2) {
// 對值物件進行解碼
c->argv[j+1] = tryObjectEncoding(c->argv[j+1]);
// 將鍵值對關聯到資料庫
// c->argc[j] 為鍵
// c->argc[j+1] 為值
setKey(c->db,c->argv[j],c->argv[j+1]);
// 傳送事件通知
notifyKeyspaceEvent(REDIS_NOTIFY_STRING,"set",c->argv[j],c->db->id);
}
// 將伺服器設為髒
server.dirty += (c->argc-1)/2;
// 設定成功
// MSET 返回 OK ,而 MSETNX 返回 1
addReply(c, nx ? shared.cone : shared.ok);
}
//MSET命令
void msetCommand(redisClient *c) {
msetGenericCommand(c,0);
}
//MSETNX命令
void msetnxCommand(redisClient *c) {
msetGenericCommand(c,1);
}
MGET命令的實現:
void mgetCommand(redisClient *c) {
int j;
addReplyMultiBulkLen(c,c->argc-1);
// 查詢並返回所有輸入鍵的值
for (j = 1; j < c->argc; j++) {
// 查詢鍵 c->argc[j] 的值
robj *o = lookupKeyRead(c->db,c->argv[j]);
if (o == NULL) {
// 值不存在,向客戶端傳送空回覆
addReply(c,shared.nullbulk);
} else {
if (o->type != REDIS_STRING) {
// 值存在,但不是字串型別
addReply(c,shared.nullbulk);
} else {
// 值存在,並且是字串
addReplyBulk(c,o);
}
}
}
}
修改以及獲取值物件操作
SETRANGE命令是修改值物件中指定字元,而GETRANGE是獲取部分字串,也就是值物件:
//SETRANGE命令
void setrangeCommand(redisClient *c) {
robj *o;
long offset;
sds value = c->argv[3]->ptr;
// 取出 offset 引數
if (getLongFromObjectOrReply(c,c->argv[2],&offset,NULL) != REDIS_OK)
return;
// 檢查 offset 引數
if (offset < 0) {
addReplyError(c,"offset is out of range");
return;
}
// 取出鍵現在的值物件
o = lookupKeyWrite(c->db,c->argv[1]);
if (o == NULL) {
// 鍵不存在於資料庫中。。。
// value 為空,沒有什麼可設定的,向客戶端返回 0
if (sdslen(value) == 0) {
addReply(c,shared.czero);
return;
}
// 如果設定後的長度會超過 Redis 的限制的話
// 那麼放棄設定,向客戶端傳送一個出錯回覆
if (checkStringLength(c,offset+sdslen(value)) != REDIS_OK)
return;
// 如果 value 沒有問題,可以設定,那麼建立一個空字串值物件