Memcached原始碼分析之增刪改查操作(5)
文章列表:
《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》前言
在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_command方法將不同操作型別的命令進行分發。
//命令處理函式 //前一個方法中,我們找到了rbuf中\n的字元,然後將其替換成\0 static void process_command(conn *c, char *command) { //tokens結構,這邊會將c->rcurr(command)命令拆分出來 //並且將命令通過空格符號來分隔成多個元素 //例如:set username zhuli,則會拆分成3個元素,分別是set和username和zhuli //MAX_TOKENS最大值為8,說明memcached的命令列,最多可以拆分成8個元素 token_t tokens[MAX_TOKENS]; size_t ntokens; int comm; assert(c != NULL); MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes); if (settings.verbose > 1) fprintf(stderr, "<%d %s\n", c->sfd, command); /* * for commands set/add/replace, we build an item and read the data * directly into it, then continue in nread_complete(). */ c->msgcurr = 0; c->msgused = 0; c->iovused = 0; if (add_msghdr(c) != 0) { out_of_memory(c, "SERVER_ERROR out of memory preparing response"); return; } //tokenize_command非常重要,主要就是拆分命令的 //並且將拆分出來的命令元素放進tokens的陣列中 //引數:command為命令 ntokens = tokenize_command(command, tokens, MAX_TOKENS); //tokens[COMMAND_TOKEN] COMMAND_TOKEN=0 //分解出來的命令的第一個引數為操作方法 if (ntokens >= 3 && ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) || (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) { //處理get命令 process_get_command(c, tokens, ntokens, false); } else if ((ntokens == 6 || ntokens == 7) && ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm = NREAD_ADD)) || (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) || (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0 && (comm = NREAD_REPLACE)) || (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0 && (comm = NREAD_PREPEND)) || (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0 && (comm = NREAD_APPEND)))) { //處理更新命令 add/set/replace/prepend/append process_update_command(c, tokens, ntokens, comm, false); } else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm = NREAD_CAS))) { process_update_command(c, tokens, ntokens, comm, true); } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 1); } else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) { process_get_command(c, tokens, ntokens, true); } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) { process_arithmetic_command(c, tokens, ntokens, 0); } else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) { //處理刪除命令 delete process_delete_command(c, tokens, ntokens); } else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) { process_touch_command(c, tokens, ntokens); } else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) { //獲取狀態的命令 process_stat(c, tokens, ntokens); //.....more code }
Memcached的增刪改查操作原始碼分析
增/改 set add replace 操作
我們看一個Memcached的命令列的set操作命令:
set key flags exptime vlen
value
set:操作方法名稱
key:快取的key
flags:快取標識
exptime:快取時間,0 - 不過期
vlen:快取value的長度
value:快取的值,一般會在第二行。
例子:
set username 0 10 9
woshishen
我們在第二章《Linux c 開發 - Memcached原始碼分析之命令解析(2)》中講解到了如何解析命令的。Memcached一般會通過\n
這邊我們可以看到,set命令會分層兩部分:命令列部分和Value值部分:
1. Memcached會先去解析命令列部分,並且命令列部分中帶上了vlen,就可以知道value的長度,然後就會去初始化一個Item的資料結構,用於存放快取資料。
2. 命令列部分解析完畢,Memcached會去繼續讀取Socket中的剩餘資料報文,邊讀取邊複製到Item的資料結構中,直到讀取到的Value資料長度和命令列中的vlen長度一致的時候才會結束。然後會去儲存item,如果item儲存成功,則會將item掛到HashTable和LRU鏈上面;如果儲存失敗,則會刪除item。
下面我們先看一下process_update_command這個方法,這個方法主要作用:
1. 幫助解析命令列部分
2. 分配一個Item資料結構用於儲存資料。
該方法結束後,會跳轉到狀態機drive_machine中conn_nread的程式碼塊。conn_nread主要是用於讀取value資料。
/*********************************
新增、編輯操作
看一個set操作的命令
命令:
set key flags exptime vlen
value
其中vlen為快取資料長度
flages 為標誌
exptime為過期時間,0 不過期
value 為需要快取的資料,value一般都會在第二行
例如
set username 0 10 9
woshishen
**************************************/
static void process_update_command(conn *c, token_t *tokens,
const size_t ntokens, int comm, bool handle_cas) {
char *key; //key
size_t nkey; //key的長度
unsigned int flags; //命令標誌
int32_t exptime_int = 0;
time_t exptime; //有效期
int vlen; //value的快取資料長度
uint64_t req_cas_id = 0;
//item結構,Memcached的key/value等值都是儲存在item的資料結構中
//item的分配在slabclass上的
item *it;
assert(c != NULL);
set_noreply_maybe(c, tokens, ntokens);
//檢查 key的長度,key最大長度250個位元組
if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
//獲取key的值和key的長度
//tokens[0]為操作命令
key = tokens[KEY_TOKEN].value; //獲取key的值
nkey = tokens[KEY_TOKEN].length; //key的長度
//檢查引數的合法性
if (!(safe_strtoul(tokens[2].value, (uint32_t *) &flags)
&& safe_strtol(tokens[3].value, &exptime_int)
&& safe_strtol(tokens[4].value, (int32_t *) &vlen))) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
/* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
exptime = exptime_int;
/* Negative exptimes can underflow and end up immortal. realtime() will
immediately expire values that are greater than REALTIME_MAXDELTA, but less
than process_started, so lets aim for that. */
if (exptime < 0)
exptime = REALTIME_MAXDELTA + 1;
// does cas value exist?
if (handle_cas) {
if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
}
//這邊為何vlen要+2呢?
//因為value儲存的時候,每次在資料結尾都會加上/r/n
//加上/r/n後,客戶端獲取資料就可以通過\r\n來分割 資料報文
vlen += 2;
if (vlen < 0 || vlen - 2 < 0) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
if (settings.detail_enabled) {
stats_prefix_record_set(key, nkey);
}
//item_alloc是最核心的方法,item_alloc主要就是去分配一個item
//結構用於儲存需要快取的資訊
//key:快取的key
//nkey:快取的長度
//flags:標識
//exptime:過期時間
//vlen:快取value的長度
//這邊你可能有疑問了?為何這邊只傳遞了vlen,快取資料的位元組長度,而沒有value的值呢?
//1. 因為set/add/replace等這些命令,會將命令列和資料行分為兩行傳輸
//2. 而我們首選會去解析命令列,命令列中需要包括快取資料value的長度,這樣我們就可以根據長度去預先分配記憶體空間
//3. 然後我們繼續取解析資料行。因為快取的資料一般都比較長,TCP傳送會有粘包和拆包的情況,需要接收多次後才能接收到
//完整的資料,所以會在命令列中先傳遞一個value的長度值,這樣就可以在解析命令列的過程中預先分配儲存的空間,等接收完
//value的資料後,儲存到記憶體空間即可。
//4. 此函式最後一行:conn_set_state(c, conn_nread); 就是跳轉到conn_nread這個狀態中,而conn_nread
//就是用來讀取value的快取資料的
it = item_alloc(key, nkey, flags, realtime(exptime), vlen);
//分配失敗的情況
if (it == 0) {
if (!item_size_ok(nkey, flags, vlen))
out_string(c, "SERVER_ERROR object too large for cache");
else
out_of_memory(c, "SERVER_ERROR out of memory storing object");
/* swallow the data line */
c->write_and_go = conn_swallow;
c->sbytes = vlen;
/* Avoid stale data persisting in cache because we failed alloc.
* Unacceptable for SET. Anywhere else too? */
if (comm == NREAD_SET) {
it = item_get(key, nkey);
if (it) {
item_unlink(it);
item_remove(it);
}
}
return;
}
ITEM_set_cas(it, req_cas_id);
c->item = it;
c->ritem = ITEM_data(it); //value儲存的指標地址
c->rlbytes = it->nbytes; //value的長度
c->cmd = comm;
//狀態跳轉到conn_nread,繼續迴圈讀取快取的value資料
conn_set_state(c, conn_nread);
}
看一下item_alloc方法,主要作用:
1. 分配一塊可以用的Item記憶體塊,用於儲存快取資料。
2. Memcached是通過儲存資料的長度選擇合適的slab class,然後在該slabs class上分配一塊item。
先看一下Item的資料結構。
//item的具體結構
typedef struct _stritem {
//連結串列結構:記錄下一個item的地址
struct _stritem *next; //下一個結構
//連結串列結構:記錄前一個Item的地址
struct _stritem *prev; //前一個結構
struct _stritem *h_next; //hashtable的list /* hash chain next */
//最近一次的訪問時間
rel_time_t time; /* least recent access */
//過期時間
rel_time_t exptime; /* expire time */
//value資料大小
int nbytes; /* size of data */
unsigned short refcount;
uint8_t nsuffix; /* length of flags-and-length string */
uint8_t it_flags; /* ITEM_* above */
//slab class的ID,在哪個slab class上
uint8_t slabs_clsid;/* which slab class we're in */
uint8_t nkey; /* key length, w/terminating null and padding */
/* this odd type prevents type-punning issues when we do
* the little shuffle to save space when not using CAS. */
//儲存資料的
union {
uint64_t cas;
char end;
} data[];
/* if it_flags & ITEM_CAS we have 8 bytes CAS */
/* then null-terminated key */
/* then " flags length\r\n" (no terminating null) */
/* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
/*
* Allocates a new item.
*/
//分配一個新的Item
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
item *it;
/* do_item_alloc handles its own locks */
it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
return it;
}
//建立一個新的Item
item *do_item_alloc(char *key, const size_t nkey, const int flags,
const rel_time_t exptime, const int nbytes,
const uint32_t cur_hv) {
uint8_t nsuffix;
item *it = NULL; //item結構
char suffix[40];
//item_make_header 計算儲存資料的總長度
size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
if (settings.use_cas) {
ntotal += sizeof(uint64_t);
}
//通過ntotal 查詢在哪個slabs_class上面
//Memcached會根據儲存資料長度的不同,分為N多個slabs_class
//使用者儲存資料的時候,根據需要儲存資料的長度,就可以查詢到需要儲存到哪個slabs_class中。
//每個slabs_class都由諾幹個slabs組成,slabs每個大小為1M,我們的item結構的資料就會被分配在slabs上
//每個slabs都會根據自己slabs_class儲存的資料塊的大小,會被分割為諾幹個chunk
//
//舉個例子:
//如果id=1的slabs_class為儲存 最大為224個位元組的快取資料
//當用戶的設定的快取資料總資料長度為200個位元組,則這個item結構就會儲存到id=1的slabs_class上。
//當第一次或者slabs_class中的slabs不夠用的時候,slabs_class就會去分配一個1M的slabs給儲存item使用
//因為id=1的slabs_class儲存小於224個位元組的資料,所以slabs會被分割為諾幹個大小為224位元組的chunk塊
//我們的item結構資料,就會儲存在這個chunk塊上面
unsigned int id = slabs_clsid(ntotal);
if (id == 0)
return 0;
mutex_lock(&cache_lock);
/* do a quick check if we have any expired items in the tail.. */
int tries = 5;
/* Avoid hangs if a slab has nothing but refcounted stuff in it. */
int tries_lrutail_reflocked = 1000;
int tried_alloc = 0;
item *search;
item *next_it;
void *hold_lock = NULL;
rel_time_t oldest_live = settings.oldest_live;
//這邊就可以得到slabs_class上第一個item的地址
//item資料結構通過item->next和item->prev 來記錄連結串列結構
search = tails[id];
/* We walk up *only* for locked items. Never searching for expired.
* Waste of CPU for almost all deployments */
for (; tries > 0 && search != NULL; tries--, search=next_it) {
/* we might relink search mid-loop, so search->prev isn't reliable */
next_it = search->prev;
if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
/* We are a crawler, ignore it. */
tries++;
continue;
}
uint32_t hv = hash(ITEM_key(search), search->nkey);
/* Attempt to hash item lock the "search" item. If locked, no
* other callers can incr the refcount
*/
/* Don't accidentally grab ourselves, or bail if we can't quicklock */
if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
continue;
/* Now see if the item is refcount locked */
if (refcount_incr(&search->refcount) != 2) {
/* Avoid pathological case with ref'ed items in tail */
do_item_update_nolock(search);
tries_lrutail_reflocked--;
tries++;
refcount_decr(&search->refcount);
itemstats[id].lrutail_reflocked++;
/* Old rare bug could cause a refcount leak. We haven't seen
* it in years, but we leave this code in to prevent failures
* just in case */
if (settings.tail_repair_time &&
search->time + settings.tail_repair_time < current_time) {
itemstats[id].tailrepairs++;
search->refcount = 1;
do_item_unlink_nolock(search, hv);
}
if (hold_lock)
item_trylock_unlock(hold_lock);
if (tries_lrutail_reflocked < 1)
break;
continue;
}
/* Expired or flushed */
if ((search->exptime != 0 && search->exptime < current_time)
|| (search->time <= oldest_live && oldest_live <= current_time)) {
itemstats[id].reclaimed++;
if ((search->it_flags & ITEM_FETCHED) == 0) {
itemstats[id].expired_unfetched++;
}
it = search;
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
do_item_unlink_nolock(it, hv);
/* Initialize the item block: */
it->slabs_clsid = 0;
//slabs_alloc方法是去分配一個新的記憶體塊
} else if ((it = slabs_alloc(ntotal, id)) == NULL) {
tried_alloc = 1;
if (settings.evict_to_free == 0) {
itemstats[id].outofmemory++;
} else {
itemstats[id].evicted++;
itemstats[id].evicted_time = current_time - search->time;
if (search->exptime != 0)
itemstats[id].evicted_nonzero++;
if ((search->it_flags & ITEM_FETCHED) == 0) {
itemstats[id].evicted_unfetched++;
}
it = search;
slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
do_item_unlink_nolock(it, hv);
/* Initialize the item block: */
it->slabs_clsid = 0;
/* If we've just evicted an item, and the automover is set to
* angry bird mode, attempt to rip memory into this slab class.
* TODO: Move valid object detection into a function, and on a
* "successful" memory pull, look behind and see if the next alloc
* would be an eviction. Then kick off the slab mover before the
* eviction happens.
*/
if (settings.slab_automove == 2)
slabs_reassign(-1, id);
}
}
refcount_decr(&search->refcount);
/* If hash values were equal, we don't grab a second lock */
if (hold_lock)
item_trylock_unlock(hold_lock);
break;
}
if (!tried_alloc && (tries == 0 || search == NULL))
it = slabs_alloc(ntotal, id);
if (it == NULL) {
itemstats[id].outofmemory++;
mutex_unlock(&cache_lock);
return NULL;
}
assert(it->slabs_clsid == 0);
assert(it != heads[id]);
/* Item initialization can happen outside of the lock; the item's already
* been removed from the slab LRU.
*/
it->refcount = 1; /* the caller will have a reference */
mutex_unlock(&cache_lock);
it->next = it->prev = it->h_next = 0;
it->slabs_clsid = id;
DEBUG_REFCNT(it, '*');
it->it_flags = settings.use_cas ? ITEM_CAS : 0;
it->nkey = nkey;
it->nbytes = nbytes;
//這邊是記憶體拷貝,拷貝到item結構地址的記憶體塊上
memcpy(ITEM_key(it), key, nkey);
it->exptime = exptime;
//這邊也是記憶體拷貝
memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
it->nsuffix = nsuffix;
return it;
}
然後我們看一下狀態機drive_machine中conn_nread的程式碼塊,這段程式碼主要作用:
1. 讀取快取的value值
2. 將資料拷貝到item資料結構。
//conn_nread 主要用於讀取快取的value資料報文
case conn_nread:
//快取 value資料報文的長度為0的時候,說明已經讀取完成了
if (c->rlbytes == 0) {
complete_nread(c);
break;
}
/* Check if rbytes < 0, to prevent crash */
//失敗的情況,關閉連線
if (c->rlbytes < 0) {
if (settings.verbose) {
fprintf(stderr, "Invalid rlbytes to read: len %d\n",
c->rlbytes);
}
conn_set_state(c, conn_closing);
break;
}
/* first check if we have leftovers in the conn_read buffer */
//c->rbytes 未解析的資料報文長度
//c->rlbytes 快取value資料報文長度
//如果有為解析的資料報文,則處理
if (c->rbytes > 0) {
//總共需要拷貝的資料,我們的目的是拷貝c->rlbytes長度的資料
//如果c->rbytes 大於 c->rlbytes 說明命令列未解析容器中待處理的資料大於value資料報文的長度
//如果c->rbytes 小於 c->rlbytes 說明我們只接收到了一部分的value資料,另外一部分資料報文還在路上
int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
//c->ritem 就是這次set/add/replace操作的資料儲存value的指標地址
if (c->ritem != c->rcurr) {
memmove(c->ritem, c->rcurr, tocopy);
}
c->ritem += tocopy; //指標地址往上加
c->rlbytes -= tocopy; //總的需要讀取的value值的資料報文長度 減去已經拷貝的長度
c->rcurr += tocopy; //改變指標地址
c->rbytes -= tocopy; //未解析的資料報文 減去 已經處理的資料報文
//如果c->rlbytes為0,說明value值已經讀取完了,則跳出
if (c->rlbytes == 0) {
break;
}
}
/* now try reading from the socket */
//這邊是真正的讀取方法
//從socket中讀取資料,讀取到c->ritem資料value儲存的指標,並且讀取長度為c->rlbytes
//這邊就會進入迴圈讀取,直到value的資料報文讀取完整為止
res = read(c->sfd, c->ritem, c->rlbytes);
if (res > 0) {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.bytes_read += res;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (c->rcurr == c->ritem) {
c->rcurr += res;
}
c->ritem += res;
c->rlbytes -= res;
break;
}
//如果流關閉,則關閉連線
if (res == 0) { /* end of stream */
conn_set_state(c, conn_closing);
break;
}
//如果連線被關閉,或者出現錯誤
if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
if (!update_event(c, EV_READ | EV_PERSIST)) {
if (settings.verbose > 0)
fprintf(stderr, "Couldn't update event\n");
conn_set_state(c, conn_closing);
break;
}
stop = true;
break;
}
/* otherwise we have a real error, on which we close the connection */
if (settings.verbose > 0) {
fprintf(stderr, "Failed to read, and not due to blocking:\n"
"errno: %d %s \n"
"rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", errno,
strerror(errno), (long) c->rcurr, (long) c->ritem,
(long) c->rbuf, (int) c->rlbytes, (int) c->rsize);
}
//呼叫Socket關閉
conn_set_state(c, conn_closing);
break;
這邊如果讀取完成了,會呼叫complete_nread(c)這個方法。這個方法往下一直看,我們找到complete_nread_ascii,這個方法主要作用:
1. 呼叫儲存資料store_item的方法
2. 呼叫item_remove刪除item的方法。
static void complete_nread_ascii(conn *c) {
assert(c != NULL);
item *it = c->item;
int comm = c->cmd;
enum store_item_type ret;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
out_string(c, "CLIENT_ERROR bad data chunk");
} else {
//這邊呼叫儲存Item的方法
ret = store_item(it, comm, c);
//....
switch (ret) {
case STORED:
out_string(c, "STORED");
break;
case EXISTS:
out_string(c, "EXISTS");
break;
case NOT_FOUND:
out_string(c, "NOT_FOUND");
break;
case NOT_STORED:
out_string(c, "NOT_STORED");
break;
default:
out_string(c, "SERVER_ERROR Unhandled storage type.");
}
}
//這邊竟然刪除這個Item?你不覺得奇怪麼?
//我們知道刪除item是需要通過判斷item->refcount,引用的次數
//我們在alloc一個item的時候,refcount會預設設定為1
//
//當我們呼叫store_item,add/set/replace/prepend/append等操作成功的時候,會呼叫do_item_link
//這個方法,這個方法會將refcount設定為2,則再次去刪除item的時候判斷引用次數
//if (refcount_decr(&it->refcount) == 0) 就不會被刪除
//
//如果我們呼叫store_item,發現儲存失敗了,這個時候因為引用次數為1,而且我們的確需要刪除這個item,則刪除這個item
//
//很繞的邏輯,但是很巧妙
item_remove(c->item); /* release the c->item reference */
c->item = 0;
}
然後我們看一下非常重要的do_store_item方法。這個方法主要是用來儲存資料。基本包括兩種狀態:儲存成功和儲存失敗。
1. add/replace命令,會判斷item是否存在,如果已經存在,則add命令操作失敗
2. set命令,item存在或者不存在,都會建立新的item,替換老的item。
//儲存Item操作
enum store_item_type do_store_item(item *it, int comm, conn *c,
const uint32_t hv) {
char *key = ITEM_key(it);
//通過KEY找到舊的item
//add/set/replace/prepend/append等都會先建立一個新的item
item *old_it = do_item_get(key, it->nkey, hv);
enum store_item_type stored = NOT_STORED;
item *new_it = NULL;
int flags;
//ADD操作,要保證ITEM不存在的情況下才能成功
//如果ADD操作,發現item已經存在,則返回NOT_STORED
if (old_it != NULL && comm == NREAD_ADD) {
/* add only adds a nonexistent item, but promote to head of LRU */
//這邊為何要更新item,有兩個原因:
//1.更新當前item的it->time時間,並且重建LRU鏈的順序
//2.這邊程式碼後邊會去執行do_item_remove操作,每次remove操作都會判斷it->refcount
//如果引用次數減去1,則需要被刪除。這邊重建LRU鏈之後,it->refcount=2,所有old_it不會被刪除
do_item_update(old_it);
//replace/prepend/append 等操作,是需要item已經存在的情況下操作做處理
//如果item不存在,則返回NOT_STORED
} else if (!old_it
&& (comm == NREAD_REPLACE || comm == NREAD_APPEND
|| comm == NREAD_PREPEND)) {
/* replace only replaces an existing value; don't store */
} else if (comm == NREAD_CAS) {
/* validate cas operation */
if (old_it == NULL) {
// LRU expired
stored = NOT_FOUND;
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.cas_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
} else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
// cas validates
// it and old_it may belong to different classes.
// I'm updating the stats for the one that's getting pushed out
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_replace(old_it, it, hv);
stored = STORED;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
pthread_mutex_unlock(&c->thread->stats.mutex);
if (settings.verbose > 1) {
fprintf(stderr, "CAS: failure: expected %llu, got %llu\n",
(unsigned long long) ITEM_get_cas(old_it),
(unsigned long long) ITEM_get_cas(it));
}
stored = EXISTS;
}
} else {
/*
* Append - combine new and old record into single one. Here it's
* atomic and thread-safe.
*/
//這邊是在老的item結構上面追加資料 append和prepend操作
if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
/*
* Validate CAS
*/
if (ITEM_get_cas(it) != 0) {
// CAS much be equal
if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
stored = EXISTS;
}
}
if (stored == NOT_STORED) {
/* we have it and old_it here - alloc memory to hold both */
/* flags was already lost - so recover them from ITEM_suffix(it) */
flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);
new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime,
it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);
if (new_it == NULL) {
/* SERVER_ERROR out of memory */
if (old_it != NULL)
do_item_remove(old_it);
return NOT_STORED;
}
/* copy data from it and old_it to new_it */
if (comm == NREAD_APPEND) {
memcpy(ITEM_data(new_it), ITEM_data(old_it),
old_it->nbytes);
memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */,
ITEM_data(it), it->nbytes);
} else {
/* NREAD_PREPEND */
memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */,
ITEM_data(old_it), old_it->nbytes);
}
it = new_it;
}
}
//這邊是add/set/replace/prepend/append等操作
if (stored == NOT_STORED) {
if (old_it != NULL)
//替換操作,old_it會被刪除
//it會被新增到LRU鏈和HASHTABLE上面,並且it->refcount=2
item_replace(old_it, it, hv);
else
//將新的item新增的LRU連結串列和HASHTABLE上面,it->refcount=2
do_item_link(it, hv);
c->cas = ITEM_get_cas(it);
stored = STORED;
}
//說明:
//這邊程式碼註解中為何一次又一次提到it->refcount?
//1. 因為it->refcount代表的是引用次數,防止不同執行緒刪除item
//2. do_item_remove操作前會去判斷it->refcount減一後,變成0,則會刪除這個ITEM
//
//在呼叫do_store_item方法之後,memcached會去呼叫do_item_remove(it);的操作。
//do_item_remove操作主要是將item生成後,結果SET/ADD等操作失敗的情況,會去將已經分配好的item刪除
//如果SET和ADD操作成功,一般都會呼叫do_item_link這個方法會將item的refcount值加上1,變成2,當
//再次呼叫do_item_remove(it);操作的時候,因為引用次數大於0而不會被刪除
//這邊的程式碼塊,真心很繞.....
}
//如果老的item存在,則需要刪除
if (old_it != NULL)
do_item_remove(old_it); /* release our reference */
//new_it主要用於prepend/append操作
if (new_it != NULL)
do_item_remove(new_it);
if (stored == STORED) {
c->cas = ITEM_get_cas(it);
}
return stored;
}
在do_store_item方法中,我們最終會找到do_item_link這個方法,這個方法主要作用:
1. 將item掛到Hashtable上面
2. 將item掛到LRU鏈上面
HashTable:把Item掛到HashTable上去後,使用者就可以通過快取的key到HashTable上查詢這個Item資料了。
LRU:是一個清除快取的策略,一般會清理最不常用的元素。LRU的鏈,會放在下面兩個Item指標地址的陣列連結串列上面。
static item *heads[LARGEST_ID]; //儲存連結串列頭部地址
static item *tails[LARGEST_ID]; //儲存連結串列尾部地址
看一下do_item_link這個方法//新增一個Item的連線關係
int do_item_link(item *it, const uint32_t hv) {
MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
mutex_lock(&cache_lock);
it->it_flags |= ITEM_LINKED;
it->time = current_time;
STATS_LOCK();
stats.curr_bytes += ITEM_ntotal(it);
stats.curr_items += 1;
stats.total_items += 1;
STATS_UNLOCK();
/* Allocate a new CAS ID on link. */
ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
//分配到HashTable的桶上
assoc_insert(it, hv);
//LRU鏈
item_link_q(it);
refcount_incr(&it->refcount); //引用次數+1
mutex_unlock(&cache_lock);
return 1;
}
查詢 get 操作
查詢操作主要看下process_get_command方法,該方法主要作用:
1. 分解get命令。
2. 通過key去HashTable上找到item的地址值。
3. 返回找到的item資料值。
/* ntokens is overwritten here... shrug.. */
//處理GET請求的命令
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
bool return_cas) {
//處理GET命令
char *key;
size_t nkey;
int i = 0;
item *it;
//&tokens[0] 是操作的方法
//&tokens[1] 為key
//token_t 儲存了value和length
token_t *key_token = &tokens[KEY_TOKEN];
char *suffix;
assert(c != NULL);
do {
//如果key的長度不為0
while (key_token->length != 0) {
key = key_token->value;
nkey = key_token->length;
//判斷key的長度是否超過了最大的長度,memcache key的最大長度為250
//這個地方需要非常注意,我們在平常的使用中,還是要注意key的位元組長度的
if (nkey > KEY_MAX_LENGTH) {
//out_string 向外部輸出資料
out_string(c, "CLIENT_ERROR bad command line format");
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}
//這邊是從Memcached的記憶體儲存快中去取資料
it = item_get(key, nkey);
if (settings.detail_enabled) {
//狀態記錄,key的記錄數的方法
stats_prefix_record_get(key, nkey, NULL != it);
}
//如果獲取到了資料
if (it) {
//c->ilist 存放用於向外部寫資料的buf
//如果ilist太小,則重新分配一塊記憶體
if (i >= c->isize) {
item **new_list = realloc(c->ilist,
sizeof(item *) * c->isize * 2);
if (new_list) {
//存放需要向客戶端寫資料的item的列表的長度
c->isize *= 2;
//存放需要向客戶端寫資料的item的列表,這邊支援
c->ilist = new_list;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
item_remove(it);
break;
}
}
/*
* Construct the response. Each hit adds three elements to the
* outgoing data list:
* "VALUE "
* key
* " " + flags + " " + data length + "\r\n" + data (with \r\n)
*/
//初始化返回出去的資料結構
if (return_cas) {
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
/* Goofy mid-flight realloc. */
if (i >= c->suffixsize) {
char **new_suffix_list = realloc(c->suffixlist,
sizeof(char *) * c->suffixsize * 2);
if (new_suffix_list) {
c->suffixsize *= 2;
c->suffixlist = new_suffix_list;
} else {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
item_remove(it);
break;
}
}
suffix = cache_alloc(c->thread->suffix_cache);
if (suffix == NULL) {
STATS_LOCK();
stats.malloc_fails++;
STATS_UNLOCK();
out_of_memory(c,
"SERVER_ERROR out of memory making CAS suffix");
item_remove(it);
while (i-- > 0) {
item_remove(*(c->ilist + i));
}
return;
}
*(c->suffixlist + i) = suffix;
int suffix_len = snprintf(suffix, SUFFIX_SIZE, " %llu\r\n",
(unsigned long long) ITEM_get_cas(it));
if (add_iov(c, "VALUE ", 6) != 0
|| add_iov(c, ITEM_key(it), it->nkey) != 0
|| add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0
|| add_iov(c, suffix, suffix_len) != 0
|| add_iov(c, ITEM_data(it), it->nbytes) != 0) {
item_remove(it);
break;
}
} else {
MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
it->nbytes, ITEM_get_cas(it));
//將需要返回的資料填充到IOV結構中
//命令:get userId
//返回的結構:
//VALUE userId 0 5
//55555
//END
if (add_iov(c, "VALUE ", 6) != 0
|| add_iov(c, ITEM_key(it), it->nkey) != 0
|| add_iov(c, ITEM_suffix(it),
it->nsuffix + it->nbytes) != 0) {
item_remove(it);
break;
}
}
if (settings.verbose > 1) {
int ii;
fprintf(stderr, ">%d sending key ", c->sfd);
for (ii = 0; ii < it->nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
fprintf(stderr, "\n");
}
/* item_get() has incremented it->refcount for us */
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
c->thread->stats.get_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
item_update(it);
*(c->ilist + i) = it;
i++;
} else {
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.get_misses++;
c->thread->stats.get_cmds++;
pthread_mutex_unlock(&c->thread->stats.mutex);
MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
}
key_token++;
}
/*
* If the command string hasn't been fully processed, get the next set
* of tokens.
*/
//如果命令列中的命令沒有全部被處理,則繼續下一個命令
//一個命令列中,可以get多個元素
if (key_token->value != NULL) {
ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
key_token = tokens;
}
} while (key_token->value != NULL);
c->icurr = c->ilist;
c->ileft = i;
if (return_cas) {
c->suffixcurr = c->suffixlist;
c->suffixleft = i;
}
if (settings.verbose > 1)
fprintf(stderr, ">%d END\n", c->sfd);
/*
If the loop was terminated because of out-of-memory, it is not
reliable to add END\r\n to the buffer, because it might not end
in \r\n. So we send SERVER_ERROR instead.
*/
//新增結束標誌符號
if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
out_of_memory(c, "SERVER_ERROR out of memory writing get response");
} else {
//將狀態修改為寫,這邊讀取到item的資料後,又開始需要往客戶端寫資料了。
conn_set_state(c, conn_mwrite);
c->msgcurr = 0;
}
}
Memcached的查詢主要是通過HashTable來查詢快取資料的。
HashTable我們在上一章已經講過。前面也講過,當快取資料SET操作完成後,Memcached會將item資料結構關聯到HashTable和它的LRU的鏈上面。
//這邊的item_*系列的方法,就是Memcached核心儲存塊的介面
item *item_get(const char *key, const size_t nkey) {
item *it;
uint32_t hv;
hv = hash(key, nkey); //對key進行hash,返回一個uint32_t型別的值
item_lock(hv); //塊鎖,當取資料的時候,不允許其他的操作,保證取資料的原子性
it = do_item_get(key, nkey, hv);
item_unlock(hv);
return it;
}
這邊著重看assoc_find這個方法,主要作用:從HashTable上找到對應的Item地址值。
/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
//mutex_lock(&cache_lock);
//在HashTable上找Item
item *it = assoc_find(key, nkey, hv);
if (it != NULL) {
refcount_incr(&it->refcount);
/* Optimization for slab reassignment. prevents popular items from
* jamming in busy wait. Can only do this here to satisfy lock order
* of item_lock, cache_lock, slabs_lock. */
if (slab_rebalance_signal &&
((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
do_item_unlink_nolock(it, hv);
do_item_remove(it);
it = NULL;
}
}
//mutex_unlock(&cache_lock);
int was_found = 0;
if (settings.verbose > 2) {
int ii;
if (it == NULL) {
fprintf(stderr, "> NOT FOUND ");
} else {
fprintf(stderr, "> FOUND KEY ");
was_found++;
}
for (ii = 0; ii < nkey; ++ii) {
fprintf(stderr, "%c", key[ii]);
}
}
if (it != NULL) {
if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
it->time <= settings.oldest_live) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by flush");
}
//檢查是否過期
} else if (it->exptime != 0 && it->exptime <= current_time) {
do_item_unlink(it, hv);
do_item_remove(it);
it = NULL;
if (was_found) {
fprintf(stderr, " -nuked by expire");
}
} else {
it->it_flags |= ITEM_FETCHED;
DEBUG_REFCNT(it, '+');
}
}
if (settings.verbose > 2)
fprintf(stderr, "\n");
return it;
}
刪除 delete 操作
刪除操作主要看process_delete_command方法:
1. 先查詢item是否存在
2. 如果存在則刪除item,不存在,則返回NOT FOUND
static void process_delete_command(conn *c, token_t *tokens,
const size_t ntokens) {
char *key;
size_t nkey;
item *it;
assert(c != NULL);
//檢查命令合法性
if (ntokens > 3) {
bool hold_is_zero = strcmp(tokens[KEY_TOKEN + 1].value, "0") == 0;
bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
|| (ntokens == 5 && hold_is_zero && sets_noreply);
if (!valid) {
out_string(c, "CLIENT_ERROR bad command line format. "
"Usage: delete <key> [noreply]");
return;
}
}
//獲取key的值和長度
key = tokens[KEY_TOKEN].value;
nkey = tokens[KEY_TOKEN].length;
if (nkey > KEY_MAX_LENGTH) {
out_string(c, "CLIENT_ERROR bad command line format");
return;
}
if (settings.detail_enabled) {
stats_prefix_record_delete(key, nkey);
}
//先去查詢一次,如果查詢到了,則刪除,否則返回NOT FOUND
it = item_get(key, nkey);
if (it) {
MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
pthread_mutex_unlock(&c->thread->stats.mutex);
//如果找到了Item,則刪除Item
item_unlink(it);
item_remove(it); /* release our reference */
out_string(c, "DELETED");
} else {
//否則就是不能找到
pthread_mutex_lock(&c->thread->stats.mutex);
c->thread->stats.delete_misses++;
pthread_mutex_unlock(&c->thread->stats.mutex);
out_string(c, "NOT_FOUND");
}
}
item_unlink和do_item_unlink方法主要兩個作用:
1. 從HashTable上將Item的地址值刪除
2. 從LRU的連結串列上,將Item的地址值刪除(LRU連結串列只要處理頭部和尾部就行了)
//從LRU和HashTable解綁
void item_unlink(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey);
item_lock(hv);
do_item_unlink(item, hv);
item_unlock(hv);
}
item_remove主要是釋放item
//刪除Item
void item_remove(item *item) {
uint32_t hv;
hv = hash(ITEM_key(item), item->nkey); //Hash值
item_lock(hv);
do_item_remove(item);
item_unlock(hv);
}