1. 程式人生 > >Memcached原始碼分析之訊息迴應(3)

Memcached原始碼分析之訊息迴應(3)

文章列表:

《Memcached原始碼分析 - Memcached原始碼分析之總結篇(8)》

前言

上一章《Memcached原始碼分析 - Memcached原始碼分析之命令解析(2)》,我們花了很大的力氣去講解Memcached如何從客戶端讀取命令,並且解析命令,然後處理命令並且向客戶端迴應訊息。

這一章,我們主要來講解Memcached迴應訊息的技術細節

本章前,我們先需要了解幾個知識點(msghdr和iovc)。

msghdr結構:

struct msghdr {
     void *msg_name;
     socklen_t msg_namelen;
     struct iovec *msg_iov;
     size_t msg_iovlen;
     void *msg_control;
     size_t msg_controllen;
     int msg_flags;
};
iovc結構:
#include <sys/uio.h>
/* Structure for scatter/gather I/O. */
struct iovec {
    void *iov_base; /* Pointer to data. */
    size_t iov_len; /* Length of data. */
};
Memcached是通過sendmsg函式向客戶端傳送資料的,就會用到上面的結構,不瞭解這個結構的,建議先了解之後再繼續往下看。

Memcached訊息迴應原始碼分析

資料結構

我們繼續看一下conn這個結構。conn結構我們上一期說過,主要是儲存單個客戶端的連線詳情資訊。每一個客戶端連線到Memcached都會有這麼一個數據結構。

typedef struct conn conn;
struct conn {
    //....
    /* data for the mwrite state */
    //iov主要儲存iov的資料結構
    //iov資料結構會在conn_new中初始化,初始化的時候,系統會分配400個iovec的結構,最高水位600個
    struct iovec *iov;
    //iov的長度
    int    iovsize;   /* number of elements allocated in iov[] */
    //iovused 這個主要記錄iov使用了多少
    int    iovused;   /* number of elements used in iov[] */

    //msglist主要儲存msghdr的列表資料結構
    //msglist資料結構在conn_new中初始化的時候,系統會分配10個結構
    struct msghdr *msglist;
    //msglist的長度,初始化為10個,最高水位100,不夠用的時候會realloc,每次擴容都會擴容一倍
    int    msgsize;   /* number of elements allocated in msglist[] */
    //msglist已經使用的長度
    int    msgused;   /* number of elements used in msglist[] */
    //這個引數主要幫助記錄那些msglist已經發送過了,哪些沒有傳送過。
    int    msgcurr;   /* element in msglist[] being transmitted now */
    int    msgbytes;  /* number of bytes in current msg */
}

我們可以看一下conn_new這個方法,這個方法應該在第一章節的時候講到過。這邊主要看一下iov和msglist兩個引數初始化的過程。

conn *conn_new(const int sfd, enum conn_states init_state,
		const int event_flags, const int read_buffer_size,
		enum network_transport transport, struct event_base *base) {
//...
		c->rbuf = c->wbuf = 0;
		c->ilist = 0;
		c->suffixlist = 0;
		c->iov = 0;
		c->msglist = 0;
		c->hdrbuf = 0;

		c->rsize = read_buffer_size;
		c->wsize = DATA_BUFFER_SIZE;
		c->isize = ITEM_LIST_INITIAL;
		c->suffixsize = SUFFIX_LIST_INITIAL;
		c->iovsize = IOV_LIST_INITIAL; //初始化400
		c->msgsize = MSG_LIST_INITIAL; //初始化10
		c->hdrsize = 0;

		c->rbuf = (char *) malloc((size_t) c->rsize);
		c->wbuf = (char *) malloc((size_t) c->wsize);
		c->ilist = (item **) malloc(sizeof(item *) * c->isize);
		c->suffixlist = (char **) malloc(sizeof(char *) * c->suffixsize);
		c->iov = (struct iovec *) malloc(sizeof(struct iovec) * c->iovsize); //初始化iov
		c->msglist = (struct msghdr *) malloc(
				sizeof(struct msghdr) * c->msgsize); //初始化msglist
//...
}

資料結構關係圖(iov和msglist之間的關係):


從process_get_command開始

我們繼續從process_get_command,獲取memcached的快取資料這個方法開始。

在這個方法中,我們主要看add_iov這個方法。Memcached主要是通過add_iov方法,將需要傳送給客戶端的資料裝到iov和msglist結構中去的。

/* ntokens is overwritten here... shrug.. */
//處理GET請求的命令
static inline void process_get_command(conn *c, token_t *tokens, size_t ntokens,
		bool return_cas) {
	//處理GET命令
	char *key;
	size_t nkey;
	int i = 0;
	item *it;
	//&tokens[0] 是操作的方法
	//&tokens[1] 為key
	//token_t 儲存了value和length
	token_t *key_token = &tokens[KEY_TOKEN];
	char *suffix;
	assert(c != NULL);

	do {
		//如果key的長度不為0
		while (key_token->length != 0) {

			key = key_token->value;
			nkey = key_token->length;

			//判斷key的長度是否超過了最大的長度,memcache key的最大長度為250
			//這個地方需要非常注意,我們在平常的使用中,還是要注意key的位元組長度的
			if (nkey > KEY_MAX_LENGTH) {
				//out_string 向外部輸出資料
				out_string(c, "CLIENT_ERROR bad command line format");
				while (i-- > 0) {
					item_remove(*(c->ilist + i));
				}
				return;
			}
			//這邊是從Memcached的記憶體儲存快中去取資料
			it = item_get(key, nkey);
			if (settings.detail_enabled) {
				//狀態記錄,key的記錄數的方法
				stats_prefix_record_get(key, nkey, NULL != it);
			}
			//如果獲取到了資料
			if (it) {
				//c->ilist 存放用於向外部寫資料的buf
				//如果ilist太小,則重新分配一塊記憶體
				if (i >= c->isize) {
					item **new_list = realloc(c->ilist,
							sizeof(item *) * c->isize * 2);
					if (new_list) {
						//存放需要向客戶端寫資料的item的列表的長度
						c->isize *= 2;
						//存放需要向客戶端寫資料的item的列表,這邊支援
						c->ilist = new_list;
					} else {
						STATS_LOCK();
						stats.malloc_fails++;
						STATS_UNLOCK();
						item_remove(it);
						break;
					}
				}

				/*
				 * Construct the response. Each hit adds three elements to the
				 * outgoing data list:
				 *   "VALUE "
				 *   key
				 *   " " + flags + " " + data length + "\r\n" + data (with \r\n)
				 */
				//初始化返回出去的資料結構
				if (return_cas) {
					//......
				} else {
					MEMCACHED_COMMAND_GET(c->sfd, ITEM_key(it), it->nkey,
							it->nbytes, ITEM_get_cas(it));
					//將需要返回的資料填充到IOV結構中
					//命令:get userId
					//返回的結構:
					//VALUE userId 0 5
					//55555
					//END
					if (<strong><span style="color:#FF0000;">add_iov</span></strong>(c, "VALUE ", 6) != 0
							|| <strong><span style="color:#FF0000;">add_iov</span></strong>(c, ITEM_key(it), it->nkey) != 0
							|| <strong><span style="color:#FF0000;">add_iov</span></strong>(c, ITEM_suffix(it),
									it->nsuffix + it->nbytes) != 0) {
						item_remove(it);
						break;
					}
				}

				if (settings.verbose > 1) {
					int ii;
					fprintf(stderr, ">%d sending key ", c->sfd);
					for (ii = 0; ii < it->nkey; ++ii) {
						fprintf(stderr, "%c", key[ii]);
					}
					fprintf(stderr, "\n");
				}

				/* item_get() has incremented it->refcount for us */
				pthread_mutex_lock(&c->thread->stats.mutex);
				c->thread->stats.slab_stats[it->slabs_clsid].get_hits++;
				c->thread->stats.get_cmds++;
				pthread_mutex_unlock(&c->thread->stats.mutex);
				item_update(it);
				*(c->ilist + i) = it;
				i++;

			} else {
				pthread_mutex_lock(&c->thread->stats.mutex);
				c->thread->stats.get_misses++;
				c->thread->stats.get_cmds++;
				pthread_mutex_unlock(&c->thread->stats.mutex);
				MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
			}

			key_token++;
		}

		/*
		 * If the command string hasn't been fully processed, get the next set
		 * of tokens.
		 */
		//如果命令列中的命令沒有全部被處理,則繼續下一個命令
		//一個命令列中,可以get多個元素
		if (key_token->value != NULL) {
			ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
			key_token = tokens;
		}

	} while (key_token->value != NULL);

	c->icurr = c->ilist;
	c->ileft = i;
	if (return_cas) {
		c->suffixcurr = c->suffixlist;
		c->suffixleft = i;
	}

	if (settings.verbose > 1)
		fprintf(stderr, ">%d END\n", c->sfd);

	/*
	 If the loop was terminated because of out-of-memory, it is not
	 reliable to add END\r\n to the buffer, because it might not end
	 in \r\n. So we send SERVER_ERROR instead.
	 */
	//新增結束標誌符號
	if (key_token->value != NULL || <strong><span style="color:#FF0000;">add_iov</span></strong>(c, "END\r\n", 5) != 0
			|| (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
		out_of_memory(c, "SERVER_ERROR out of memory writing get response");
	} else {
		//將狀態修改為寫,這邊讀取到item的資料後,又開始需要往客戶端寫資料了。
		conn_set_state(c, conn_mwrite);
		c->msgcurr = 0;
	}
}

add_iov 方法

add_iov方法,主要作用:

1. 將Memcached需要傳送的資料,分成N多個IOV的塊

2. 將IOV塊新增到msghdr的結構中去。

static int add_iov(conn *c, const void *buf, int len) {
	struct msghdr *m;
	int leftover;
	bool limit_to_mtu;

	assert(c != NULL);

	do {
		//訊息陣列 msglist 儲存msghdr結構
		//這邊是獲取最新的msghdr資料結構指標
		m = &c->msglist[c->msgused - 1];

		/*
		 * Limit UDP packets, and the first payloads of TCP replies, to
		 * UDP_MAX_PAYLOAD_SIZE bytes.
		 */
		limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);

		/* We may need to start a new msghdr if this one is full. */
		//如果msghdr結構中的iov滿了,則需要使用更新的msghdr資料結構
		if (m->msg_iovlen == IOV_MAX
				|| (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
			//新增msghdr,這個方法中回去判斷初始化的時候10個msghdr結構是否夠用,不夠用的話會擴容
			add_msghdr(c);
			//指向下一個新的msghdr資料結構
			m = &c->msglist[c->msgused - 1];
		}

		//確認IOV的空間大小,初始化預設是400個,水位600
		//如果IOV也不夠用了,就會去擴容
		if (ensure_iov_space(c) != 0)
			return -1;

		/* If the fragment is too big to fit in the datagram, split it up */
		if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
			leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
			len -= leftover;
		} else {
			leftover = 0;
		}

		m = &c->msglist[c->msgused - 1];
		//m->msg_iov引數指向c->iov這個結構。
		//具體m->msg_iov如何指向到c->iov這個結構的,需要看一下add_msghdr這個方法
		//向IOV中填充BUF
		m->msg_iov[m->msg_iovlen].iov_base = (void *) buf;
		//buf的長度
		m->msg_iov[m->msg_iovlen].iov_len = len; //填充長度

		c->msgbytes += len;
		c->iovused++;
		m->msg_iovlen++; //msg_iovlen + 1

		buf = ((char *) buf) + len;
		len = leftover;
	} while (leftover > 0);

	return 0;
}

add_msghdr 方法 msghdr擴容

在add_iov方法中,我們可以看到,當IOV塊新增滿了之後,會呼叫這個方法擴容msgdhr的個數。

這個方法主要兩個作用:

1. 檢查c->msglist列表長度是否夠用。

2. 使用最新的c->msglist中的一個msghdr元素,並且將msghdr->msg_iov指向c->iov最新未使用的那個iov的指標地址。

static int add_msghdr(conn *c) {
	//c->msglist 這個列表用來儲存msghdr結構
	struct msghdr *msg;

	assert(c != NULL);

	//如果msglist的長度和已經使用的長度相等的時候,說明msglist已經用完了,需要擴容
	if (c->msgsize == c->msgused) {
		//擴容兩倍
		msg = realloc(c->msglist, c->msgsize * 2 * sizeof(struct msghdr));
		if (!msg) {
			STATS_LOCK();
			stats.malloc_fails++;
			STATS_UNLOCK();
			return -1;
		}
		c->msglist = msg; //將c->msglist指向當前新的列表
		c->msgsize *= 2; //size也會跟著增加
	}

	//msg重新指向未使用的msghdr指標位置
	msg = c->msglist + c->msgused;

	/* this wipes msg_iovlen, msg_control, msg_controllen, and
	 msg_flags, the last 3 of which aren't defined on solaris: */
	//將新的msghdr塊初始化設定為0
	memset(msg, 0, sizeof(struct msghdr));

	//新的msghdr的msg_iov指向 struct iovec *iov結構
	msg->msg_iov = &c->iov[c->iovused];

	if (IS_UDP(c->transport) && c->request_addr_size > 0) {
		msg->msg_name = &c->request_addr;
		msg->msg_namelen = c->request_addr_size;
	}

	c->msgbytes = 0;
	c->msgused++;

	if (IS_UDP(c->transport)) {
		/* Leave room for the UDP header, which we'll fill in later. */
		return add_iov(c, NULL, UDP_HEADER_SIZE);
	}

	return 0;
}

ensure_iov_space 方法 IOV擴容

這個方法主要檢查c->iov是否還有剩餘空間,如果不夠用了,則擴容2倍。

static int ensure_iov_space(conn *c) {
	assert(c != NULL);

	//如果IOV也使用完了....IOV,分配新的IOV
	if (c->iovused >= c->iovsize) {
		int i, iovnum;
		struct iovec *new_iov = (struct iovec *) realloc(c->iov,
				(c->iovsize * 2) * sizeof(struct iovec));
		if (!new_iov) {
			STATS_LOCK();
			stats.malloc_fails++;
			STATS_UNLOCK();
			return -1;
		}
		c->iov = new_iov;
		c->iovsize *= 2; //擴容兩倍

		/* Point all the msghdr structures at the new list. */
		for (i = 0, iovnum = 0; i < c->msgused; i++) {
			c->msglist[i].msg_iov = &c->iov[iovnum];
			iovnum += c->msglist[i].msg_iovlen;
		}
	}

	return 0;
}

conn_mwrite

conn_mwrite狀態在drive_machine這個方法中。主要就是向客戶端寫資料了。

從上面的add_iov方法中,我們知道Memcached會將需要待發送的資料寫入c->msglist結構中。

真正寫資料的方法是transmit

//drive_machine方法
		//這個conn_mwrite是向客戶端寫資料
		case conn_mwrite:
			if (IS_UDP(c->transport) && c->msgcurr == 0
					&& build_udp_headers(c) != 0) {
				if (settings.verbose > 0)
					fprintf(stderr, "Failed to build UDP headers\n");
				conn_set_state(c, conn_closing);
				break;
			}
			//transmit這個方法非常重要,主要向客戶端寫資料的操作都在這個方法中進行
			//返回transmit_result列舉型別,用於判斷是否寫成功,如果失敗,則關閉連線
			switch (transmit(c)) {

			//如果向客戶端傳送資料成功
			case TRANSMIT_COMPLETE:
				if (c->state == conn_mwrite) {
					conn_release_items(c);
					/* XXX:  I don't know why this wasn't the general case */
					if (c->protocol == binary_prot) {
						conn_set_state(c, c->write_and_go);
					} else {
						//這邊是TCP的狀態
						//狀態又會切回到conn_new_cmd這個狀態
						//conn_new_cmd主要是繼續解析c->rbuf容器中剩餘的命令引數
						conn_set_state(c, conn_new_cmd);
					}
				} else if (c->state == conn_write) {
					if (c->write_and_free) {
						free(c->write_and_free);
						c->write_and_free = 0;
					}
					conn_set_state(c, c->write_and_go);
				} else {
					if (settings.verbose > 0)
						fprintf(stderr, "Unexpected state %d\n", c->state);
					conn_set_state(c, conn_closing);
				}
				break;

transmit 方法

這個方法主要作用:向客戶端傳送資料

//這個方法主要向客戶端寫資料
//如果資料沒有傳送完,則會一直迴圈conn_mwrite這個狀態,直到資料傳送完成為止
static enum transmit_result transmit(conn *c) {
	assert(c != NULL);

	//每次傳送之前,都會來校驗前一次的資料是否傳送完了
	//如果前一次的msghdr結構體內的資料已經發送完了,則c->msgcurr指標就會往後移動一位,
	//移動到下一個等待發送的msghdr結構體指標上
	//c->msgcurr初始值為:0
	if (c->msgcurr < c->msgused && c->msglist[c->msgcurr].msg_iovlen == 0) {
		/* Finished writing the current msg; advance to the next. */
		c->msgcurr++;
	}

	//如果c->msgcurr(已傳送)小於c->msgused(已使用),則就可以知道還沒傳送完,則需要繼續傳送
	//如果c->msgcurr(已傳送)等於c->msgused(已使用),則說明已經發送完了,返回TRANSMIT_COMPLETE狀態
	if (c->msgcurr < c->msgused) {
		ssize_t res;

		//從c->msglist取出一個待發送的msghdr結構
		struct msghdr *m = &c->msglist[c->msgcurr];
		//向客戶端傳送資料
		res = sendmsg(c->sfd, m, 0);
		//傳送成功的情況
		if (res > 0) {
			pthread_mutex_lock(&c->thread->stats.mutex);
			c->thread->stats.bytes_written += res;
			pthread_mutex_unlock(&c->thread->stats.mutex);

			/* We've written some of the data. Remove the completed
			 iovec entries from the list of pending writes. */
			//這邊會檢查傳送了多少
			while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
				res -= m->msg_iov->iov_len;
				m->msg_iovlen--;
				m->msg_iov++;
			}

			/* Might have written just part of the last iovec entry;
			 adjust it so the next write will do the rest. */
			if (res > 0) {
				m->msg_iov->iov_base = (caddr_t) m->msg_iov->iov_base + res;
				m->msg_iov->iov_len -= res;
			}
			return TRANSMIT_INCOMPLETE;
		}
		//傳送失敗的情況
		if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
			if (!update_event(c, EV_WRITE | EV_PERSIST)) {
				if (settings.verbose > 0)
					fprintf(stderr, "Couldn't update event\n");
				conn_set_state(c, conn_closing);
				return TRANSMIT_HARD_ERROR;
			}
			return TRANSMIT_SOFT_ERROR;
		}
		/* if res == 0 or res == -1 and error is not EAGAIN or EWOULDBLOCK,
		 we have a real error, on which we close the connection */
		if (settings.verbose > 0)
			perror("Failed to write, and not due to blocking");

		if (IS_UDP(c->transport))
			conn_set_state(c, conn_read);
		else
			conn_set_state(c, conn_closing);
		return TRANSMIT_HARD_ERROR;
	} else {
		return TRANSMIT_COMPLETE;
	}
}

conn_shrink 方法

當資料傳送成功後,會跳轉到conn_new_cmd這個狀態繼續處理,然後進入reset_cmd_handler方法,然後進入conn_shrink方法。

conn_shrink主要是用於檢查讀取和傳送的buf的大小,是否超過了預定的水位,如果超過了,則需要重新realloc。

//重新設定命令handler
static void reset_cmd_handler(conn *c) {
	c->cmd = -1;
	c->substate = bin_no_state;
	if (c->item != NULL) {
		item_remove(c->item);
		c->item = NULL;
	}
	conn_shrink(c); //這個方法是檢查c->rbuf容器的大小
	//如果剩餘未解析的命令 > 0的話,繼續跳轉到conn_parse_cmd解析命令
	if (c->rbytes > 0) {
		conn_set_state(c, conn_parse_cmd);
	} else {
		//如果命令都解析完成了,則繼續等待新的資料到來
		conn_set_state(c, conn_waiting);
	}
}

//檢查rbuf的大小
static void conn_shrink(conn *c) {
	assert(c != NULL);

	if (IS_UDP(c->transport))
		return;

	//如果bufsize大於READ_BUFFER_HIGHWAT(8192)的時候需要重新處理
	//DATA_BUFFER_SIZE等於2048,所以我們可以看到之前的程式碼中對rbuf最多隻能進行4次recalloc
	if (c->rsize > READ_BUFFER_HIGHWAT && c->rbytes < DATA_BUFFER_SIZE) {
		char *newbuf;

		if (c->rcurr != c->rbuf)
			memmove(c->rbuf, c->rcurr, (size_t) c->rbytes); //記憶體移動

		newbuf = (char *) realloc((void *) c->rbuf, DATA_BUFFER_SIZE);

		if (newbuf) {
			c->rbuf = newbuf;
			c->rsize = DATA_BUFFER_SIZE;
		}
		/* TODO check other branch... */
		c->rcurr = c->rbuf;
	}

	if (c->isize > ITEM_LIST_HIGHWAT) {
		item **newbuf = (item**) realloc((void *) c->ilist,
				ITEM_LIST_INITIAL * sizeof(c->ilist[0]));
		if (newbuf) {
			c->ilist = newbuf;
			c->isize = ITEM_LIST_INITIAL;
		}
		/* TODO check error condition? */
	}

	//如果大於c->msglist的水位了,則重新realloc
	if (c->msgsize > MSG_LIST_HIGHWAT) {
		struct msghdr *newbuf = (struct msghdr *) realloc((void *) c->msglist,
				MSG_LIST_INITIAL * sizeof(c->msglist[0]));
		if (newbuf) {
			c->msglist = newbuf;
			c->msgsize = MSG_LIST_INITIAL;
		}
		/* TODO check error condition? */
	}

	//如果大於c->iovsize的水位了,則重新realloc
	if (c->iovsize > IOV_LIST_HIGHWAT) {
		struct iovec *newbuf = (struct iovec *) realloc((void *) c->iov,
				IOV_LIST_INITIAL * sizeof(c->iov[0]));
		if (newbuf) {
			c->iov = newbuf;
			c->iovsize = IOV_LIST_INITIAL;
		}
		/* TODO check return value */
	}
}