redis cluster叢集的原始碼分析(2)
阿新 • • 發佈:2018-12-25
本文的分析主要介紹叢集中的槽和叢集中命令的執行。
一、叢集中的槽
1、槽的基本結構資訊
redis叢集通過分片的方式來儲存資料庫中的鍵值對:叢集的整個資料庫被分為16384個槽,
資料庫中的每個鍵屬於這16384個槽的一個,每個節點可以處理0~16384個槽。當資料庫中只有
所有的槽都有節點處理是,叢集才是上線狀態;反之,只要有一個槽沒有節點處理,那叢集就處
於下線狀態。
struct clusterNode{
……
unsigned char slots[CLUSTER_SLOTS/8];
int numslots;
……
};
clusterNode的slots是一個bit array,共包含16384個二進位制位,以0為起始索引,16383為終止索
對slots編碼,並根據索引i上的二進位制位判斷是否處理槽i。
typedef struct clusterState {
……
clusterNode *slots[CLUSTER_SLOTS];
……
} clusterState;
clusterState的slots記錄叢集中所有槽的指派資訊。
1)slots[i]為NULL,則該槽未進行指派
2)slots[i]為clusterNode指標,則該槽指派給clusterNode指標對應的節點
2、槽的指派
槽的相關處理主要有槽指派和重新分片。命令如下所示:
CLUSTER ADDSLOTS <slot> [slot] ...
void clusterCommand(client *c) {
……
if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
!strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
{
int j, slot;
unsigned char *slots = zmalloc(CLUSTER_SLOTS);
int del = !strcasecmp(c->argv[1]->ptr,"delslots");
memset(slots,0,CLUSTER_SLOTS);
……//判斷槽是否可處理,不可處理直接返回
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (slots[j]) {
int retval;
retval = del ? clusterDelSlot(j) :clusterAddSlot(myself,j);
}
}
zfree(slots);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
addReply(c,shared.ok);
}
……
}//槽的新增
int clusterAddSlot(clusterNode *n, int slot) {
if (server.cluster->slots[slot]) return C_ERR;
clusterNodeSetSlotBit(n,slot);//更新clusterNode的slots
server.cluster->slots[slot] = n;//更新clusterState的slots
return C_OK;
}
二、叢集中命令的執行
在叢集中執行客戶端傳送的命令時,接收的命令的節點先需要判斷叢集狀態,然後在去計算命令中要處理的key處於
哪個槽,並獲取槽相應的節點。如果槽對應的節點是當前節點,就直接執行命令,反之,將指引客戶端重定向到正確的
節點。
叢集中的命令處理
int processCommand(client *c) {
……
if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
{
int hashslot;
if (server.cluster->state != CLUSTER_OK) {//叢集狀態為下線狀態
clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
return C_OK;
} else {
int error_code;//獲取key所在的節點
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
if (n == NULL || n != server.cluster->myself) {
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
}
……
}
計算命令中要處理的key處於哪個槽,並獲取槽相應的節點
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
……
//檢查這些命令的key是否在同一slot,並且此節點關聯了這個slot
for (i = 0; i < ms->count; i++) {
……
mcmd = ms->commands[i].cmd;
margc = ms->commands[i].argc;
margv = ms->commands[i].argv;
keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
for (j = 0; j < numkeys; j++) {
robj *thiskey = margv[keyindex[j]];
int thisslot = keyHashSlot((char*)thiskey->ptr,
sdslen(thiskey->ptr));
if (firstkey == NULL) {
//事務的第一個命令的key,獲取key的slot和node
firstkey = thiskey;
slot = thisslot;
n = server.cluster->slots[slot];
if (n == NULL) {//key對應的槽沒有節點指派
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_DOWN_UNBOUND;
return NULL;
}//判斷是否在重新分片——key所在的槽要遷移到別的節點或者從別的節點遷移過來
if (n == myself &&
server.cluster->migrating_slots_to[slot] != NULL)
{
migrating_slot = 1;
} else if (server.cluster->importing_slots_from[slot] != NULL) {
importing_slot = 1;
}
} else {//用來確認所有的key是否都在同一個slot
if (!equalStringObjects(firstkey,thiskey)) {
if (slot != thisslot) {
getKeysFreeResult(keyindex);
if (error_code)
*error_code = CLUSTER_REDIR_CROSS_SLOT;
return NULL;
} else {
multiple_keys = 1;
}
}
}//key是否已經遷移到別的節點或者還沒從別的節點遷移過來
if ((migrating_slot || importing_slot) &&
lookupKeyRead(&server.db[0],thiskey) == NULL)
{
missing_keys++;
}
}
getKeysFreeResult(keyindex);
}
if (n == NULL) return myself;
if (hashslot) *hashslot = slot;
if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
return myself;
//如果slot已經遷移到別的節點,返回CLUSTER_REDIR_ASK
if (migrating_slot && missing_keys) {
if (error_code) *error_code = CLUSTER_REDIR_ASK;
return server.cluster->migrating_slots_to[slot];
}
if (importing_slot &&
(c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
{
if (multiple_keys && missing_keys) {
//如果slot還沒從別的節點遷移過來,返回CLUSTER_REDIR_DOWN_UNBOUND
if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
return NULL;
} else {
return myself;
}
}
//我是slave,slot是在我的master,並且cmd是隻讀操作
if (c->flags & CLIENT_READONLY &&
cmd->flags & CMD_READONLY &&
nodeIsSlave(myself) &&
myself->slaveof == n)
{
return myself;
}
//slot不在此節點,返回CLUSTER_REDIR_MOVED
if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
return n;
}
根據計算key對應的槽和節點時返回的錯誤,返回給客戶端進行重定向
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
} else if (error_code == CLUSTER_REDIR_UNSTABLE) {
addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
} else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
} else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
} else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
}
}