1. 程式人生 > >Memcached原始碼分析之增刪改查操作(5)

Memcached原始碼分析之增刪改查操作(5)

文章列表:

《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》

前言

在看Memcached的增刪改查操作前,我們先來看一下process_command方法。Memcached解析命令之後,就通過process_command方法將不同操作型別的命令進行分發。

//命令處理函式
//前一個方法中,我們找到了rbuf中\n的字元,然後將其替換成\0
static void process_command(conn *c, char *command) {

	//tokens結構,這邊會將c->rcurr(command)命令拆分出來
	//並且將命令通過空格符號來分隔成多個元素
	//例如:set username zhuli,則會拆分成3個元素,分別是set和username和zhuli
	//MAX_TOKENS最大值為8,說明memcached的命令列,最多可以拆分成8個元素
	token_t tokens[MAX_TOKENS];
	size_t ntokens;
	int comm;

	assert(c != NULL);

	MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

	if (settings.verbose > 1)
		fprintf(stderr, "<%d %s\n", c->sfd, command);

	/*
	 * for commands set/add/replace, we build an item and read the data
	 * directly into it, then continue in nread_complete().
	 */

	c->msgcurr = 0;
	c->msgused = 0;
	c->iovused = 0;
	if (add_msghdr(c) != 0) {
		out_of_memory(c, "SERVER_ERROR out of memory preparing response");
		return;
	}

	//tokenize_command非常重要,主要就是拆分命令的
	//並且將拆分出來的命令元素放進tokens的陣列中
	//引數:command為命令
	ntokens = tokenize_command(command, tokens, MAX_TOKENS);

	//tokens[COMMAND_TOKEN] COMMAND_TOKEN=0
	//分解出來的命令的第一個引數為操作方法
	if (ntokens >= 3
			&& ((strcmp(tokens[COMMAND_TOKEN].value, "get") == 0)
					|| (strcmp(tokens[COMMAND_TOKEN].value, "bget") == 0))) {

		//處理get命令
		process_get_command(c, tokens, ntokens, false);

	} else if ((ntokens == 6 || ntokens == 7)
			&& ((strcmp(tokens[COMMAND_TOKEN].value, "add") == 0 && (comm =
					NREAD_ADD))
					|| (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0
							&& (comm = NREAD_SET))
					|| (strcmp(tokens[COMMAND_TOKEN].value, "replace") == 0
							&& (comm = NREAD_REPLACE))
					|| (strcmp(tokens[COMMAND_TOKEN].value, "prepend") == 0
							&& (comm = NREAD_PREPEND))
					|| (strcmp(tokens[COMMAND_TOKEN].value, "append") == 0
							&& (comm = NREAD_APPEND)))) {

		//處理更新命令 add/set/replace/prepend/append
		process_update_command(c, tokens, ntokens, comm, false);

	} else if ((ntokens == 7 || ntokens == 8)
			&& (strcmp(tokens[COMMAND_TOKEN].value, "cas") == 0 && (comm =
					NREAD_CAS))) {

		process_update_command(c, tokens, ntokens, comm, true);

	} else if ((ntokens == 4 || ntokens == 5)
			&& (strcmp(tokens[COMMAND_TOKEN].value, "incr") == 0)) {

		process_arithmetic_command(c, tokens, ntokens, 1);

	} else if (ntokens >= 3
			&& (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0)) {

		process_get_command(c, tokens, ntokens, true);

	} else if ((ntokens == 4 || ntokens == 5)
			&& (strcmp(tokens[COMMAND_TOKEN].value, "decr") == 0)) {

		process_arithmetic_command(c, tokens, ntokens, 0);

	} else if (ntokens >= 3 && ntokens <= 5
			&& (strcmp(tokens[COMMAND_TOKEN].value, "delete") == 0)) {

		//處理刪除命令 delete
		process_delete_command(c, tokens, ntokens);

	} else if ((ntokens == 4 || ntokens == 5)
			&& (strcmp(tokens[COMMAND_TOKEN].value, "touch") == 0)) {

		process_touch_command(c, tokens, ntokens);

	} else if (ntokens >= 2
			&& (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0)) {

		//獲取狀態的命令
		process_stat(c, tokens, ntokens);
//.....more code
}

Memcached的增刪改查操作原始碼分析

增/改 set add replace 操作

我們看一個Memcached的命令列的set操作命令:

set key flags exptime vlen
value
set:操作方法名稱

key:快取的key

flags:快取標識

exptime:快取時間,0 - 不過期

vlen:快取value的長度

value:快取的值,一般會在第二行。

例子:

set username 0 10 9
woshishen

我們在第二章《Linux c 開發 - Memcached原始碼分析之命令解析(2)》中講解到了如何解析命令的。Memcached一般會通過\n

符號去分隔每個命令列語句,然後通過空格將一行命令切割成N個元素,元素會放進一個tokens的陣列中。

這邊我們可以看到,set命令會分層兩部分:命令列部分Value值部分

1. Memcached會先去解析命令列部分,並且命令列部分中帶上了vlen,就可以知道value的長度,然後就會去初始化一個Item的資料結構,用於存放快取資料。

2. 命令列部分解析完畢,Memcached會去繼續讀取Socket中的剩餘資料報文,邊讀取邊複製到Item的資料結構中,直到讀取到的Value資料長度和命令列中的vlen長度一致的時候才會結束。然後會去儲存item,如果item儲存成功,則會將item掛到HashTable和LRU鏈上面;如果儲存失敗,則會刪除item。

下面我們先看一下process_update_command這個方法,這個方法主要作用:

1.  幫助解析命令列部分

2.  分配一個Item資料結構用於儲存資料。

該方法結束後,會跳轉到狀態機drive_machine中conn_nread的程式碼塊。conn_nread主要是用於讀取value資料。

/*********************************
新增、編輯操作
看一個set操作的命令

命令:
set key flags exptime vlen
value

其中vlen為快取資料長度
flages 為標誌
exptime為過期時間,0 不過期
value 為需要快取的資料,value一般都會在第二行

例如
set username 0 10 9
woshishen
**************************************/
static void process_update_command(conn *c, token_t *tokens,
		const size_t ntokens, int comm, bool handle_cas) {
	char *key; //key
	size_t nkey; //key的長度
	unsigned int flags; //命令標誌
	int32_t exptime_int = 0;
	time_t exptime; //有效期
	int vlen; //value的快取資料長度
	uint64_t req_cas_id = 0;
	//item結構,Memcached的key/value等值都是儲存在item的資料結構中
	//item的分配在slabclass上的
	item *it;

	assert(c != NULL);

	set_noreply_maybe(c, tokens, ntokens);
	//檢查 key的長度,key最大長度250個位元組
	if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {
		out_string(c, "CLIENT_ERROR bad command line format");
		return;
	}

	//獲取key的值和key的長度
	//tokens[0]為操作命令
	key = tokens[KEY_TOKEN].value; //獲取key的值
	nkey = tokens[KEY_TOKEN].length; //key的長度

	//檢查引數的合法性
	if (!(safe_strtoul(tokens[2].value, (uint32_t *) &flags)
			&& safe_strtol(tokens[3].value, &exptime_int)
			&& safe_strtol(tokens[4].value, (int32_t *) &vlen))) {
		out_string(c, "CLIENT_ERROR bad command line format");
		return;
	}

	/* Ubuntu 8.04 breaks when I pass exptime to safe_strtol */
	exptime = exptime_int;

	/* Negative exptimes can underflow and end up immortal. realtime() will
	 immediately expire values that are greater than REALTIME_MAXDELTA, but less
	 than process_started, so lets aim for that. */
	if (exptime < 0)
		exptime = REALTIME_MAXDELTA + 1;

	// does cas value exist?
	if (handle_cas) {
		if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
			out_string(c, "CLIENT_ERROR bad command line format");
			return;
		}
	}

	//這邊為何vlen要+2呢?
	//因為value儲存的時候,每次在資料結尾都會加上/r/n
	//加上/r/n後,客戶端獲取資料就可以通過\r\n來分割 資料報文
	vlen += 2;
	if (vlen < 0 || vlen - 2 < 0) {
		out_string(c, "CLIENT_ERROR bad command line format");
		return;
	}

	if (settings.detail_enabled) {
		stats_prefix_record_set(key, nkey);
	}

	//item_alloc是最核心的方法,item_alloc主要就是去分配一個item
	//結構用於儲存需要快取的資訊
	//key:快取的key
	//nkey:快取的長度
	//flags:標識
	//exptime:過期時間
	//vlen:快取value的長度
	//這邊你可能有疑問了?為何這邊只傳遞了vlen,快取資料的位元組長度,而沒有value的值呢?
	//1. 因為set/add/replace等這些命令,會將命令列和資料行分為兩行傳輸
	//2. 而我們首選會去解析命令列,命令列中需要包括快取資料value的長度,這樣我們就可以根據長度去預先分配記憶體空間
	//3. 然後我們繼續取解析資料行。因為快取的資料一般都比較長,TCP傳送會有粘包和拆包的情況,需要接收多次後才能接收到
	//完整的資料,所以會在命令列中先傳遞一個value的長度值,這樣就可以在解析命令列的過程中預先分配儲存的空間,等接收完
	//value的資料後,儲存到記憶體空間即可。
	//4. 此函式最後一行:conn_set_state(c, conn_nread); 就是跳轉到conn_nread這個狀態中,而conn_nread
	//就是用來讀取value的快取資料的
	it = item_alloc(key, nkey, flags, realtime(exptime), vlen);

	//分配失敗的情況
	if (it == 0) {
		if (!item_size_ok(nkey, flags, vlen))
			out_string(c, "SERVER_ERROR object too large for cache");
		else
			out_of_memory(c, "SERVER_ERROR out of memory storing object");
		/* swallow the data line */
		c->write_and_go = conn_swallow;
		c->sbytes = vlen;

		/* Avoid stale data persisting in cache because we failed alloc.
		 * Unacceptable for SET. Anywhere else too? */
		if (comm == NREAD_SET) {
			it = item_get(key, nkey);
			if (it) {
				item_unlink(it);
				item_remove(it);
			}
		}

		return;
	}
	ITEM_set_cas(it, req_cas_id);

	c->item = it;
	c->ritem = ITEM_data(it); //value儲存的指標地址
	c->rlbytes = it->nbytes; //value的長度
	c->cmd = comm;
	//狀態跳轉到conn_nread,繼續迴圈讀取快取的value資料
	conn_set_state(c, conn_nread);
}

看一下item_alloc方法,主要作用:

1. 分配一塊可以用的Item記憶體塊,用於儲存快取資料。

2. Memcached是通過儲存資料的長度選擇合適的slab class,然後在該slabs class上分配一塊item。

先看一下Item的資料結構。

//item的具體結構
typedef struct _stritem {
    //連結串列結構:記錄下一個item的地址
    struct _stritem *next;  //下一個結構
    //連結串列結構:記錄前一個Item的地址
    struct _stritem *prev;  //前一個結構
    struct _stritem *h_next; //hashtable的list   /* hash chain next */
    //最近一次的訪問時間
    rel_time_t      time;       /* least recent access */
    //過期時間
    rel_time_t      exptime;    /* expire time */
    //value資料大小
    int             nbytes;     /* size of data */
    unsigned short  refcount;
    uint8_t         nsuffix;    /* length of flags-and-length string */
    uint8_t         it_flags;   /* ITEM_* above */
    //slab class的ID,在哪個slab class上
    uint8_t         slabs_clsid;/* which slab class we're in */
    uint8_t         nkey;       /* key length, w/terminating null and padding */
    /* this odd type prevents type-punning issues when we do
     * the little shuffle to save space when not using CAS. */
    //儲存資料的
    union {
        uint64_t cas;
        char end;
    } data[];
    /* if it_flags & ITEM_CAS we have 8 bytes CAS */
    /* then null-terminated key */
    /* then " flags length\r\n" (no terminating null) */
    /* then data with terminating \r\n (no terminating null; it's binary!) */
} item;
/*
 * Allocates a new item.
 */
//分配一個新的Item
item *item_alloc(char *key, size_t nkey, int flags, rel_time_t exptime, int nbytes) {
    item *it;
    /* do_item_alloc handles its own locks */
    it = do_item_alloc(key, nkey, flags, exptime, nbytes, 0);
    return it;
}
//建立一個新的Item
item *do_item_alloc(char *key, const size_t nkey, const int flags,
                    const rel_time_t exptime, const int nbytes,
                    const uint32_t cur_hv) {
    uint8_t nsuffix;
    item *it = NULL; //item結構
    char suffix[40];
    //item_make_header 計算儲存資料的總長度
    size_t ntotal = item_make_header(nkey + 1, flags, nbytes, suffix, &nsuffix);
    if (settings.use_cas) {
        ntotal += sizeof(uint64_t);
    }

    //通過ntotal 查詢在哪個slabs_class上面
    //Memcached會根據儲存資料長度的不同,分為N多個slabs_class
    //使用者儲存資料的時候,根據需要儲存資料的長度,就可以查詢到需要儲存到哪個slabs_class中。
    //每個slabs_class都由諾幹個slabs組成,slabs每個大小為1M,我們的item結構的資料就會被分配在slabs上
    //每個slabs都會根據自己slabs_class儲存的資料塊的大小,會被分割為諾幹個chunk
    //
    //舉個例子:
    //如果id=1的slabs_class為儲存 最大為224個位元組的快取資料
    //當用戶的設定的快取資料總資料長度為200個位元組,則這個item結構就會儲存到id=1的slabs_class上。
    //當第一次或者slabs_class中的slabs不夠用的時候,slabs_class就會去分配一個1M的slabs給儲存item使用
    //因為id=1的slabs_class儲存小於224個位元組的資料,所以slabs會被分割為諾幹個大小為224位元組的chunk塊
    //我們的item結構資料,就會儲存在這個chunk塊上面
    unsigned int id = slabs_clsid(ntotal);
    if (id == 0)
        return 0;

    mutex_lock(&cache_lock);
    /* do a quick check if we have any expired items in the tail.. */
    int tries = 5;
    /* Avoid hangs if a slab has nothing but refcounted stuff in it. */
    int tries_lrutail_reflocked = 1000;
    int tried_alloc = 0;
    item *search;
    item *next_it;
    void *hold_lock = NULL;
    rel_time_t oldest_live = settings.oldest_live;

    //這邊就可以得到slabs_class上第一個item的地址
    //item資料結構通過item->next和item->prev 來記錄連結串列結構
    search = tails[id];
    /* We walk up *only* for locked items. Never searching for expired.
     * Waste of CPU for almost all deployments */
    for (; tries > 0 && search != NULL; tries--, search=next_it) {
        /* we might relink search mid-loop, so search->prev isn't reliable */
        next_it = search->prev;
        if (search->nbytes == 0 && search->nkey == 0 && search->it_flags == 1) {
            /* We are a crawler, ignore it. */
            tries++;
            continue;
        }
        uint32_t hv = hash(ITEM_key(search), search->nkey);
        /* Attempt to hash item lock the "search" item. If locked, no
         * other callers can incr the refcount
         */
        /* Don't accidentally grab ourselves, or bail if we can't quicklock */
        if (hv == cur_hv || (hold_lock = item_trylock(hv)) == NULL)
            continue;
        /* Now see if the item is refcount locked */
        if (refcount_incr(&search->refcount) != 2) {
            /* Avoid pathological case with ref'ed items in tail */
            do_item_update_nolock(search);
            tries_lrutail_reflocked--;
            tries++;
            refcount_decr(&search->refcount);
            itemstats[id].lrutail_reflocked++;
            /* Old rare bug could cause a refcount leak. We haven't seen
             * it in years, but we leave this code in to prevent failures
             * just in case */
            if (settings.tail_repair_time &&
                    search->time + settings.tail_repair_time < current_time) {
                itemstats[id].tailrepairs++;
                search->refcount = 1;
                do_item_unlink_nolock(search, hv);
            }
            if (hold_lock)
                item_trylock_unlock(hold_lock);

            if (tries_lrutail_reflocked < 1)
                break;

            continue;
        }

        /* Expired or flushed */
        if ((search->exptime != 0 && search->exptime < current_time)
            || (search->time <= oldest_live && oldest_live <= current_time)) {
            itemstats[id].reclaimed++;
            if ((search->it_flags & ITEM_FETCHED) == 0) {
                itemstats[id].expired_unfetched++;
            }
            it = search;
            slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
            do_item_unlink_nolock(it, hv);
            /* Initialize the item block: */
            it->slabs_clsid = 0;
        //slabs_alloc方法是去分配一個新的記憶體塊
        } else if ((it = slabs_alloc(ntotal, id)) == NULL) {
            tried_alloc = 1;
            if (settings.evict_to_free == 0) {
                itemstats[id].outofmemory++;
            } else {
                itemstats[id].evicted++;
                itemstats[id].evicted_time = current_time - search->time;
                if (search->exptime != 0)
                    itemstats[id].evicted_nonzero++;
                if ((search->it_flags & ITEM_FETCHED) == 0) {
                    itemstats[id].evicted_unfetched++;
                }
                it = search;
                slabs_adjust_mem_requested(it->slabs_clsid, ITEM_ntotal(it), ntotal);
                do_item_unlink_nolock(it, hv);
                /* Initialize the item block: */
                it->slabs_clsid = 0;

                /* If we've just evicted an item, and the automover is set to
                 * angry bird mode, attempt to rip memory into this slab class.
                 * TODO: Move valid object detection into a function, and on a
                 * "successful" memory pull, look behind and see if the next alloc
                 * would be an eviction. Then kick off the slab mover before the
                 * eviction happens.
                 */
                if (settings.slab_automove == 2)
                    slabs_reassign(-1, id);
            }
        }

        refcount_decr(&search->refcount);
        /* If hash values were equal, we don't grab a second lock */
        if (hold_lock)
            item_trylock_unlock(hold_lock);
        break;
    }

    if (!tried_alloc && (tries == 0 || search == NULL))
        it = slabs_alloc(ntotal, id);

    if (it == NULL) {
        itemstats[id].outofmemory++;
        mutex_unlock(&cache_lock);
        return NULL;
    }

    assert(it->slabs_clsid == 0);
    assert(it != heads[id]);

    /* Item initialization can happen outside of the lock; the item's already
     * been removed from the slab LRU.
     */
    it->refcount = 1;     /* the caller will have a reference */
    mutex_unlock(&cache_lock);
    it->next = it->prev = it->h_next = 0;
    it->slabs_clsid = id;

    DEBUG_REFCNT(it, '*');
    it->it_flags = settings.use_cas ? ITEM_CAS : 0;
    it->nkey = nkey;
    it->nbytes = nbytes;
    //這邊是記憶體拷貝,拷貝到item結構地址的記憶體塊上
    memcpy(ITEM_key(it), key, nkey);
    it->exptime = exptime;
    //這邊也是記憶體拷貝
    memcpy(ITEM_suffix(it), suffix, (size_t)nsuffix);
    it->nsuffix = nsuffix;
    return it;
}

然後我們看一下狀態機drive_machine中conn_nread的程式碼塊,這段程式碼主要作用:

1. 讀取快取的value值

2. 將資料拷貝到item資料結構。

//conn_nread 主要用於讀取快取的value資料報文
		case conn_nread:
			//快取 value資料報文的長度為0的時候,說明已經讀取完成了
			if (c->rlbytes == 0) {
				complete_nread(c);
				break;
			}

			/* Check if rbytes < 0, to prevent crash */
			//失敗的情況,關閉連線
			if (c->rlbytes < 0) {
				if (settings.verbose) {
					fprintf(stderr, "Invalid rlbytes to read: len %d\n",
							c->rlbytes);
				}
				conn_set_state(c, conn_closing);
				break;
			}

			/* first check if we have leftovers in the conn_read buffer */
			//c->rbytes 未解析的資料報文長度
			//c->rlbytes 快取value資料報文長度
			//如果有為解析的資料報文,則處理
			if (c->rbytes > 0) {
				//總共需要拷貝的資料,我們的目的是拷貝c->rlbytes長度的資料
				//如果c->rbytes 大於 c->rlbytes 說明命令列未解析容器中待處理的資料大於value資料報文的長度
				//如果c->rbytes 小於 c->rlbytes 說明我們只接收到了一部分的value資料,另外一部分資料報文還在路上
				int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;
				//c->ritem 就是這次set/add/replace操作的資料儲存value的指標地址
				if (c->ritem != c->rcurr) {
					memmove(c->ritem, c->rcurr, tocopy);
				}
				c->ritem += tocopy; //指標地址往上加
				c->rlbytes -= tocopy; //總的需要讀取的value值的資料報文長度 減去已經拷貝的長度
				c->rcurr += tocopy; //改變指標地址
				c->rbytes -= tocopy; //未解析的資料報文 減去 已經處理的資料報文

				//如果c->rlbytes為0,說明value值已經讀取完了,則跳出
				if (c->rlbytes == 0) {
					break;
				}
			}

			/*  now try reading from the socket */
			//這邊是真正的讀取方法
			//從socket中讀取資料,讀取到c->ritem資料value儲存的指標,並且讀取長度為c->rlbytes
			//這邊就會進入迴圈讀取,直到value的資料報文讀取完整為止
			res = read(c->sfd, c->ritem, c->rlbytes);
			if (res > 0) {
				pthread_mutex_lock(&c->thread->stats.mutex);
				c->thread->stats.bytes_read += res;
				pthread_mutex_unlock(&c->thread->stats.mutex);
				if (c->rcurr == c->ritem) {
					c->rcurr += res;
				}
				c->ritem += res;
				c->rlbytes -= res;
				break;
			}
			//如果流關閉,則關閉連線
			if (res == 0) { /* end of stream */
				conn_set_state(c, conn_closing);
				break;
			}
			//如果連線被關閉,或者出現錯誤
			if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
				if (!update_event(c, EV_READ | EV_PERSIST)) {
					if (settings.verbose > 0)
						fprintf(stderr, "Couldn't update event\n");
					conn_set_state(c, conn_closing);
					break;
				}
				stop = true;
				break;
			}
			/* otherwise we have a real error, on which we close the connection */
			if (settings.verbose > 0) {
				fprintf(stderr, "Failed to read, and not due to blocking:\n"
						"errno: %d %s \n"
						"rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n", errno,
						strerror(errno), (long) c->rcurr, (long) c->ritem,
						(long) c->rbuf, (int) c->rlbytes, (int) c->rsize);
			}
			//呼叫Socket關閉
			conn_set_state(c, conn_closing);
			break;

這邊如果讀取完成了,會呼叫complete_nread(c)這個方法。這個方法往下一直看,我們找到complete_nread_ascii,這個方法主要作用:

1. 呼叫儲存資料store_item的方法

2. 呼叫item_remove刪除item的方法。

static void complete_nread_ascii(conn *c) {
	assert(c != NULL);

	item *it = c->item;
	int comm = c->cmd;
	enum store_item_type ret;

	pthread_mutex_lock(&c->thread->stats.mutex);
	c->thread->stats.slab_stats[it->slabs_clsid].set_cmds++;
	pthread_mutex_unlock(&c->thread->stats.mutex);

	if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) != 0) {
		out_string(c, "CLIENT_ERROR bad data chunk");
	} else {
		//這邊呼叫儲存Item的方法
		ret = store_item(it, comm, c);
//....

		switch (ret) {
		case STORED:
			out_string(c, "STORED");
			break;
		case EXISTS:
			out_string(c, "EXISTS");
			break;
		case NOT_FOUND:
			out_string(c, "NOT_FOUND");
			break;
		case NOT_STORED:
			out_string(c, "NOT_STORED");
			break;
		default:
			out_string(c, "SERVER_ERROR Unhandled storage type.");
		}

	}

	//這邊竟然刪除這個Item?你不覺得奇怪麼?
	//我們知道刪除item是需要通過判斷item->refcount,引用的次數
	//我們在alloc一個item的時候,refcount會預設設定為1
	//
	//當我們呼叫store_item,add/set/replace/prepend/append等操作成功的時候,會呼叫do_item_link
	//這個方法,這個方法會將refcount設定為2,則再次去刪除item的時候判斷引用次數
	//if (refcount_decr(&it->refcount) == 0) 就不會被刪除
	//
	//如果我們呼叫store_item,發現儲存失敗了,這個時候因為引用次數為1,而且我們的確需要刪除這個item,則刪除這個item
	//
	//很繞的邏輯,但是很巧妙
	item_remove(c->item); /* release the c->item reference */
	c->item = 0;
}

然後我們看一下非常重要的do_store_item方法。這個方法主要是用來儲存資料。基本包括兩種狀態:儲存成功和儲存失敗。

1. add/replace命令,會判斷item是否存在,如果已經存在,則add命令操作失敗

2. set命令,item存在或者不存在,都會建立新的item,替換老的item。

//儲存Item操作
enum store_item_type do_store_item(item *it, int comm, conn *c,
		const uint32_t hv) {
	char *key = ITEM_key(it);

	//通過KEY找到舊的item
	//add/set/replace/prepend/append等都會先建立一個新的item
	item *old_it = do_item_get(key, it->nkey, hv);
	enum store_item_type stored = NOT_STORED;

	item *new_it = NULL;
	int flags;

	//ADD操作,要保證ITEM不存在的情況下才能成功
	//如果ADD操作,發現item已經存在,則返回NOT_STORED
	if (old_it != NULL && comm == NREAD_ADD) {
		/* add only adds a nonexistent item, but promote to head of LRU */
		//這邊為何要更新item,有兩個原因:
		//1.更新當前item的it->time時間,並且重建LRU鏈的順序
		//2.這邊程式碼後邊會去執行do_item_remove操作,每次remove操作都會判斷it->refcount
		//如果引用次數減去1,則需要被刪除。這邊重建LRU鏈之後,it->refcount=2,所有old_it不會被刪除
		do_item_update(old_it);

	//replace/prepend/append 等操作,是需要item已經存在的情況下操作做處理
	//如果item不存在,則返回NOT_STORED
	} else if (!old_it
			&& (comm == NREAD_REPLACE || comm == NREAD_APPEND
					|| comm == NREAD_PREPEND)) {
		/* replace only replaces an existing value; don't store */
	} else if (comm == NREAD_CAS) {
		/* validate cas operation */
		if (old_it == NULL) {
			// LRU expired
			stored = NOT_FOUND;
			pthread_mutex_lock(&c->thread->stats.mutex);
			c->thread->stats.cas_misses++;
			pthread_mutex_unlock(&c->thread->stats.mutex);
		} else if (ITEM_get_cas(it) == ITEM_get_cas(old_it)) {
			// cas validates
			// it and old_it may belong to different classes.
			// I'm updating the stats for the one that's getting pushed out
			pthread_mutex_lock(&c->thread->stats.mutex);
			c->thread->stats.slab_stats[old_it->slabs_clsid].cas_hits++;
			pthread_mutex_unlock(&c->thread->stats.mutex);

			item_replace(old_it, it, hv);
			stored = STORED;
		} else {
			pthread_mutex_lock(&c->thread->stats.mutex);
			c->thread->stats.slab_stats[old_it->slabs_clsid].cas_badval++;
			pthread_mutex_unlock(&c->thread->stats.mutex);

			if (settings.verbose > 1) {
				fprintf(stderr, "CAS:  failure: expected %llu, got %llu\n",
						(unsigned long long) ITEM_get_cas(old_it),
						(unsigned long long) ITEM_get_cas(it));
			}
			stored = EXISTS;
		}
	} else {
		/*
		 * Append - combine new and old record into single one. Here it's
		 * atomic and thread-safe.
		 */
		//這邊是在老的item結構上面追加資料 append和prepend操作
		if (comm == NREAD_APPEND || comm == NREAD_PREPEND) {
			/*
			 * Validate CAS
			 */
			if (ITEM_get_cas(it) != 0) {
				// CAS much be equal
				if (ITEM_get_cas(it) != ITEM_get_cas(old_it)) {
					stored = EXISTS;
				}
			}

			if (stored == NOT_STORED) {
				/* we have it and old_it here - alloc memory to hold both */
				/* flags was already lost - so recover them from ITEM_suffix(it) */

				flags = (int) strtol(ITEM_suffix(old_it), (char **) NULL, 10);

				new_it = do_item_alloc(key, it->nkey, flags, old_it->exptime,
						it->nbytes + old_it->nbytes - 2 /* CRLF */, hv);

				if (new_it == NULL) {
					/* SERVER_ERROR out of memory */
					if (old_it != NULL)
						do_item_remove(old_it);

					return NOT_STORED;
				}

				/* copy data from it and old_it to new_it */

				if (comm == NREAD_APPEND) {
					memcpy(ITEM_data(new_it), ITEM_data(old_it),
							old_it->nbytes);
					memcpy(ITEM_data(new_it) + old_it->nbytes - 2 /* CRLF */,
							ITEM_data(it), it->nbytes);
				} else {
					/* NREAD_PREPEND */
					memcpy(ITEM_data(new_it), ITEM_data(it), it->nbytes);
					memcpy(ITEM_data(new_it) + it->nbytes - 2 /* CRLF */,
							ITEM_data(old_it), old_it->nbytes);
				}

				it = new_it;
			}
		}

		//這邊是add/set/replace/prepend/append等操作
		if (stored == NOT_STORED) {
			if (old_it != NULL)
				//替換操作,old_it會被刪除
				//it會被新增到LRU鏈和HASHTABLE上面,並且it->refcount=2
				item_replace(old_it, it, hv);
			else
				//將新的item新增的LRU連結串列和HASHTABLE上面,it->refcount=2
				do_item_link(it, hv);

			c->cas = ITEM_get_cas(it);

			stored = STORED;
		}
		//說明:
		//這邊程式碼註解中為何一次又一次提到it->refcount?
		//1. 因為it->refcount代表的是引用次數,防止不同執行緒刪除item
		//2. do_item_remove操作前會去判斷it->refcount減一後,變成0,則會刪除這個ITEM
		//
		//在呼叫do_store_item方法之後,memcached會去呼叫do_item_remove(it);的操作。
		//do_item_remove操作主要是將item生成後,結果SET/ADD等操作失敗的情況,會去將已經分配好的item刪除
		//如果SET和ADD操作成功,一般都會呼叫do_item_link這個方法會將item的refcount值加上1,變成2,當
		//再次呼叫do_item_remove(it);操作的時候,因為引用次數大於0而不會被刪除
		//這邊的程式碼塊,真心很繞.....
	}

	//如果老的item存在,則需要刪除
	if (old_it != NULL)
		do_item_remove(old_it); /* release our reference */
	//new_it主要用於prepend/append操作
	if (new_it != NULL)
		do_item_remove(new_it);

	if (stored == STORED) {
		c->cas = ITEM_get_cas(it);
	}

	return stored;
}

在do_store_item方法中,我們最終會找到do_item_link這個方法,這個方法主要作用:

1. 將item掛到Hashtable上面

2. 將item掛到LRU鏈上面

HashTable:把Item掛到HashTable上去後,使用者就可以通過快取的key到HashTable上查詢這個Item資料了。

LRU:是一個清除快取的策略,一般會清理最不常用的元素。LRU的鏈,會放在下面兩個Item指標地址的陣列連結串列上面。

static item *heads[LARGEST_ID]; //儲存連結串列頭部地址
static item *tails[LARGEST_ID]; //儲存連結串列尾部地址
看一下do_item_link這個方法
//新增一個Item的連線關係
int do_item_link(item *it, const uint32_t hv) {
    MEMCACHED_ITEM_LINK(ITEM_key(it), it->nkey, it->nbytes);
    assert((it->it_flags & (ITEM_LINKED|ITEM_SLABBED)) == 0);
    mutex_lock(&cache_lock);
    it->it_flags |= ITEM_LINKED;
    it->time = current_time;

    STATS_LOCK();
    stats.curr_bytes += ITEM_ntotal(it);
    stats.curr_items += 1;
    stats.total_items += 1;
    STATS_UNLOCK();

    /* Allocate a new CAS ID on link. */
    ITEM_set_cas(it, (settings.use_cas) ? get_cas_id() : 0);
    //分配到HashTable的桶上
    assoc_insert(it, hv);
    //LRU鏈
    item_link_q(it);
    refcount_incr(&it->refcount); //引用次數+1
    mutex_unlock(&cache_lock);

    return 1;
}

查詢 get 操作

查詢操作主要看下process_get_command方法,該方法主要作用:

1. 分解get命令。

2. 通過key去HashTable上找到item的地址值。

3. 返回找到的item資料值。

/* ntokens is overwritten here... shrug.. */
//處理GET請求的命令
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
		bool return_cas) {
	//處理GET命令
	char *key;
	size_t nkey;
	int i = 0;
	item *it;
	//&tokens[0] 是操作的方法
	//&tokens[1] 為key
	//token_t 儲存了value和length
	token_t *key_token = &tokens[KEY_TOKEN];
	char *suffix;
	assert(c != NULL);

	do {
		//如果key的長度不為0
		while (key_token->length != 0) {

			key = key_token->value;
			nkey = key_token->length;

			//判斷key的長度是否超過了最大的長度,memcache key的最大長度為250
			//這個地方需要非常注意,我們在平常的使用中,還是要注意key的位元組長度的
			if (nkey > KEY_MAX_LENGTH) {
				//out_string 向外部輸出資料
				out_string(c, "CLIENT_ERROR bad command line format");
				while (i-- > 0) {
					item_remove(*(c->ilist + i));
				}
				return;
			}
			//這邊是從Memcached的記憶體儲存快中去取資料
			it = item_get(key, nkey);
			if (settings.detail_enabled) {
				//狀態記錄,key的記錄數的方法
				stats_prefix_record_get(key, nkey, NULL != it);
			}
			//如果獲取到了資料
			if (it) {
				//c->ilist 存放用於向外部寫資料的buf
				//如果ilist太小,則重新分配一塊記憶體
				if (i >= c->isize) {
					item **new_list = realloc(c->ilist,
							sizeof(item *) * c->isize * 2);
					if (new_list) {
						//存放需要向客戶端寫資料的item的列表的長度
						c->isize *= 2;
						//存放需要向客戶端寫資料的item的列表,這邊支援
						c->ilist = new_list;
					} else {
						STATS_LOCK();
						stats.malloc_fails++;
						STATS_UNLOCK();
						item_remove(it);
						break;
					}
				}

				/*
				 * Construct the response. Each hit adds three elements to the
				 * outgoing data list:
				 *   "VALUE "
				 *   key
				 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
				 */
				//初始化返回出去的資料結構
				if (return_cas) {
					MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
							it->nbytes, ITEM_get_cas(it));
					/* Goofy mid-flight realloc. */
					if (i >= c->suffixsize) {
						char **new_suffix_list = realloc(c->suffixlist,
								sizeof(char *) * c->suffixsize * 2);
						if (new_suffix_list) {
							c->suffixsize *= 2;
							c->suffixlist = new_suffix_list;
						} else {
							STATS_LOCK();
							stats.malloc_fails++;
							STATS_UNLOCK();
							item_remove(it);
							break;
						}
					}

					suffix = cache_alloc(c->thread->suffix_cache);
					if (suffix == NULL) {
						STATS_LOCK();
						stats.malloc_fails++;
						STATS_UNLOCK();
						out_of_memory(c,
								"SERVER_ERROR out of memory making CAS suffix");
						item_remove(it);
						while (i-- > 0) {
							item_remove(*(c->ilist + i));
						}
						return;
					}
					*(c->suffixlist + i) = suffix;
					int suffix_len = snprintf(suffix, SUFFIX_SIZE, " %llu\r\n",
							(unsigned long long) ITEM_get_cas(it));
					if (add_iov(c, "VALUE ", 6) != 0
							|| add_iov(c, ITEM_key(it), it->nkey) != 0
							|| add_iov(c, ITEM_suffix(it), it->nsuffix - 2) != 0
							|| add_iov(c, suffix, suffix_len) != 0
							|| add_iov(c, ITEM_data(it), it->nbytes) != 0) {
						item_remove(it);
						break;
					}
				} else {
					MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
							it->nbytes, ITEM_get_cas(it));
					//將需要返回的資料填充到IOV結構中
					//命令:get userId
					//返回的結構:
					//VALUE userId 0 5
					//55555
					//END
					if (add_iov(c, "VALUE ", 6) != 0
							|| add_iov(c, ITEM_key(it), it->nkey) != 0
							|| add_iov(c, ITEM_suffix(it),
									it->nsuffix + it->nbytes) != 0) {
						item_remove(it);
						break;
					}
				}

				if (settings.verbose > 1) {
					int ii;
					fprintf(stderr, ">%d sending key ", c->sfd);
					for (ii = 0; ii < it->nkey; ++ii) {
						fprintf(stderr, "%c", key[ii]);
					}
					fprintf(stderr, "\n");
				}

				/* item_get() has incremented it->refcount for us */
				pthread_mutex_lock(&c->thread->stats.mutex);
				c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
				c->thread->stats.get_cmds++;
				pthread_mutex_unlock(&c->thread->stats.mutex);
				item_update(it);
				*(c->ilist + i) = it;
				i++;

			} else {
				pthread_mutex_lock(&c->thread->stats.mutex);
				c->thread->stats.get_misses++;
				c->thread->stats.get_cmds++;
				pthread_mutex_unlock(&c->thread->stats.mutex);
				MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
			}

			key_token++;
		}

		/*
		 * If the command string hasn't been fully processed, get the next set
		 * of tokens.
		 */
		//如果命令列中的命令沒有全部被處理,則繼續下一個命令
		//一個命令列中,可以get多個元素
		if (key_token->value != NULL) {
			ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
			key_token = tokens;
		}

	} while (key_token->value != NULL);

	c->icurr = c->ilist;
	c->ileft = i;
	if (return_cas) {
		c->suffixcurr = c->suffixlist;
		c->suffixleft = i;
	}

	if (settings.verbose > 1)
		fprintf(stderr, ">%d END\n", c->sfd);

	/*
	 If the loop was terminated because of out-of-memory, it is not
	 reliable to add END\r\n to the buffer, because it might not end
	 in \r\n. So we send SERVER_ERROR instead.
	 */
	//新增結束標誌符號
	if (key_token->value != NULL || add_iov(c, "END\r\n", 5) != 0
			|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
		out_of_memory(c, "SERVER_ERROR out of memory writing get response");
	} else {
		//將狀態修改為寫,這邊讀取到item的資料後,又開始需要往客戶端寫資料了。
		conn_set_state(c, conn_mwrite);
		c->msgcurr = 0;
	}
}

Memcached的查詢主要是通過HashTable來查詢快取資料的。

HashTable我們在上一章已經講過。前面也講過,當快取資料SET操作完成後,Memcached會將item資料結構關聯到HashTable和它的LRU的鏈上面。

//這邊的item_*系列的方法,就是Memcached核心儲存塊的介面
item *item_get(const char *key, const size_t nkey) {
    item *it;
    uint32_t hv;
    hv = hash(key, nkey); //對key進行hash,返回一個uint32_t型別的值
    item_lock(hv); //塊鎖,當取資料的時候,不允許其他的操作,保證取資料的原子性
    it = do_item_get(key, nkey, hv);
    item_unlock(hv);
    return it;
}

這邊著重看assoc_find這個方法,主要作用:從HashTable上找到對應的Item地址值。

/** wrapper around assoc_find which does the lazy expiration logic */
item *do_item_get(const char *key, const size_t nkey, const uint32_t hv) {
    //mutex_lock(&cache_lock);
	//在HashTable上找Item
    item *it = assoc_find(key, nkey, hv);
    if (it != NULL) {
        refcount_incr(&it->refcount);
        /* Optimization for slab reassignment. prevents popular items from
         * jamming in busy wait. Can only do this here to satisfy lock order
         * of item_lock, cache_lock, slabs_lock. */
        if (slab_rebalance_signal &&
            ((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {
            do_item_unlink_nolock(it, hv);
            do_item_remove(it);
            it = NULL;
        }
    }
    //mutex_unlock(&cache_lock);
    int was_found = 0;

    if (settings.verbose > 2) {
        int ii;
        if (it == NULL) {
            fprintf(stderr, "> NOT FOUND ");
        } else {
            fprintf(stderr, "> FOUND KEY ");
            was_found++;
        }
        for (ii = 0; ii < nkey; ++ii) {
            fprintf(stderr, "%c", key[ii]);
        }
    }

    if (it != NULL) {
        if (settings.oldest_live != 0 && settings.oldest_live <= current_time &&
            it->time <= settings.oldest_live) {
            do_item_unlink(it, hv);
            do_item_remove(it);
            it = NULL;
            if (was_found) {
                fprintf(stderr, " -nuked by flush");
            }
        //檢查是否過期
        } else if (it->exptime != 0 && it->exptime <= current_time) {
            do_item_unlink(it, hv);
            do_item_remove(it);
            it = NULL;
            if (was_found) {
                fprintf(stderr, " -nuked by expire");
            }
        } else {
            it->it_flags |= ITEM_FETCHED;
            DEBUG_REFCNT(it, '+');
        }
    }

    if (settings.verbose > 2)
        fprintf(stderr, "\n");

    return it;
}

刪除 delete 操作

刪除操作主要看process_delete_command方法:

1. 先查詢item是否存在

2. 如果存在則刪除item,不存在,則返回NOT FOUND

static void process_delete_command(conn *c, token_t *tokens,
		const size_t ntokens) {
	char *key;
	size_t nkey;
	item *it;

	assert(c != NULL);

	//檢查命令合法性
	if (ntokens > 3) {
		bool hold_is_zero = strcmp(tokens[KEY_TOKEN + 1].value, "0") == 0;
		bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
		bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
				|| (ntokens == 5 && hold_is_zero && sets_noreply);
		if (!valid) {
			out_string(c, "CLIENT_ERROR bad command line format.  "
					"Usage: delete <key> [noreply]");
			return;
		}
	}

	//獲取key的值和長度
	key = tokens[KEY_TOKEN].value;
	nkey = tokens[KEY_TOKEN].length;

	if (nkey > KEY_MAX_LENGTH) {
		out_string(c, "CLIENT_ERROR bad command line format");
		return;
	}

	if (settings.detail_enabled) {
		stats_prefix_record_delete(key, nkey);
	}

	//先去查詢一次,如果查詢到了,則刪除,否則返回NOT FOUND
	it = item_get(key, nkey);
	if (it) {
		MEMCACHED_COMMAND_DELETE(c->sfd, ITEM_key(it), it->nkey);

		pthread_mutex_lock(&c->thread->stats.mutex);
		c->thread->stats.slab_stats[it->slabs_clsid].delete_hits++;
		pthread_mutex_unlock(&c->thread->stats.mutex);
		//如果找到了Item,則刪除Item
		item_unlink(it);
		item_remove(it); /* release our reference */
		out_string(c, "DELETED");
	} else {
		//否則就是不能找到
		pthread_mutex_lock(&c->thread->stats.mutex);
		c->thread->stats.delete_misses++;
		pthread_mutex_unlock(&c->thread->stats.mutex);

		out_string(c, "NOT_FOUND");
	}
}

item_unlink和do_item_unlink方法主要兩個作用:

1. 從HashTable上將Item的地址值刪除

2. 從LRU的連結串列上,將Item的地址值刪除(LRU連結串列只要處理頭部和尾部就行了)

//從LRU和HashTable解綁
void item_unlink(item *item) {
    uint32_t hv;
    hv = hash(ITEM_key(item), item->nkey);
    item_lock(hv);
    do_item_unlink(item, hv);
    item_unlock(hv);
}

item_remove主要是釋放item

//刪除Item
void item_remove(item *item) {
    uint32_t hv;
    hv = hash(ITEM_key(item), item->nkey); //Hash值

    item_lock(hv);
    do_item_remove(item);
    item_unlock(hv);
}