1. 程式人生 > >redis cluster叢集的原始碼分析(2)

redis cluster叢集的原始碼分析(2)

        本文的分析主要介紹叢集中的槽和叢集中命令的執行。

一、叢集中的槽

1、槽的基本結構資訊

        redis叢集通過分片的方式來儲存資料庫中的鍵值對:叢集的整個資料庫被分為16384個槽,

資料庫中的每個鍵屬於這16384個槽的一個,每個節點可以處理0~16384個槽。當資料庫中只有

所有的槽都有節點處理是,叢集才是上線狀態;反之,只要有一個槽沒有節點處理,那叢集就處

於下線狀態。  

struct clusterNode{
	……
	unsigned char slots[CLUSTER_SLOTS/8]; 
	int numslots;
	……
};

       clusterNode的slots是一個bit array,共包含16384個二進位制位,以0為起始索引,16383為終止索

slots編碼,並根據索引i上的二進位制位判斷是否處理槽i。


typedef struct clusterState {
    ……
    clusterNode *slots[CLUSTER_SLOTS];
    ……
} clusterState;
       clusterState的slots記錄叢集中所有槽的指派資訊。

1)slots[i]為NULL,則該槽未進行指派

2)slots[i]為clusterNode指標,則該槽指派給clusterNode指標對應的節點

2、槽的指派

     槽的相關處理主要有槽指派和重新分片。命令如下所示:

CLUSTER ADDSLOTS <slot> [slot] ... 

void clusterCommand(client *c) {
    ……
    if ((!strcasecmp(c->argv[1]->ptr,"addslots") ||
               !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3)
    {
        int j, slot;
        unsigned char *slots = zmalloc(CLUSTER_SLOTS);
        int del = !strcasecmp(c->argv[1]->ptr,"delslots");
        memset(slots,0,CLUSTER_SLOTS);
        ……//判斷槽是否可處理,不可處理直接返回      
        for (j = 0; j < CLUSTER_SLOTS; j++) {
            if (slots[j]) {
                int retval;
                retval = del ? clusterDelSlot(j) :clusterAddSlot(myself,j);
            }
        }
        zfree(slots);
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
        addReply(c,shared.ok);
    }
    ……
}//槽的新增
int clusterAddSlot(clusterNode *n, int slot) {
    if (server.cluster->slots[slot]) return C_ERR;
    clusterNodeSetSlotBit(n,slot);//更新clusterNode的slots
server.cluster->slots[slot] = n;//更新clusterState的slots return C_OK; }

二、叢集中命令的執行

        在叢集中執行客戶端傳送的命令時,接收的命令的節點先需要判斷叢集狀態,然後在去計算命令中要處理的key處於

哪個槽,並獲取槽相應的節點。如果槽對應的節點是當前節點,就直接執行命令,反之,將指引客戶端重定向到正確的

節點。


     叢集中的命令處理

int processCommand(client *c) {
	……
	if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&
        !(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0))
    {
        int hashslot;
        if (server.cluster->state != CLUSTER_OK) {//叢集狀態為下線狀態
            clusterRedirectClient(c,NULL,0,CLUSTER_REDIR_DOWN_STATE);
            return C_OK;
        } else {
            int error_code;//獲取key所在的節點
            clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);
            if (n == NULL || n != server.cluster->myself) {
                clusterRedirectClient(c,n,hashslot,error_code);
                return C_OK;
            }
        }
    }
    ……
}
    計算命令中要處理的key處於哪個槽,並獲取槽相應的節點
clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *error_code) {
    ……
    //檢查這些命令的key是否在同一slot,並且此節點關聯了這個slot
    for (i = 0; i < ms->count; i++) {
        ……
        mcmd = ms->commands[i].cmd;
        margc = ms->commands[i].argc;
        margv = ms->commands[i].argv;
        keyindex = getKeysFromCommand(mcmd,margv,margc,&numkeys);
        for (j = 0; j < numkeys; j++) {
            robj *thiskey = margv[keyindex[j]];
            int thisslot = keyHashSlot((char*)thiskey->ptr,
                                       sdslen(thiskey->ptr));
            if (firstkey == NULL) {
                //事務的第一個命令的key,獲取key的slot和node
                firstkey = thiskey;
                slot = thisslot;
                n = server.cluster->slots[slot];
                if (n == NULL) {//key對應的槽沒有節點指派
                    getKeysFreeResult(keyindex);
                    if (error_code)
                        *error_code = CLUSTER_REDIR_DOWN_UNBOUND;
                    return NULL;
                }//判斷是否在重新分片——key所在的槽要遷移到別的節點或者從別的節點遷移過來
                if (n == myself &&
                    server.cluster->migrating_slots_to[slot] != NULL)
                {
                    migrating_slot = 1;
                } else if (server.cluster->importing_slots_from[slot] != NULL) {
                    importing_slot = 1;
                }
            } else {//用來確認所有的key是否都在同一個slot
                if (!equalStringObjects(firstkey,thiskey)) {
                    if (slot != thisslot) {
                        getKeysFreeResult(keyindex);
                        if (error_code)
                            *error_code = CLUSTER_REDIR_CROSS_SLOT;
                        return NULL;
                    } else {
                        multiple_keys = 1;
                    }
                }
            }//key是否已經遷移到別的節點或者還沒從別的節點遷移過來
            if ((migrating_slot || importing_slot) &&
                lookupKeyRead(&server.db[0],thiskey) == NULL)
            {
                missing_keys++;
            }
        }
        getKeysFreeResult(keyindex);
    }
    if (n == NULL) return myself;
    if (hashslot) *hashslot = slot;
    if ((migrating_slot || importing_slot) && cmd->proc == migrateCommand)
        return myself;
    //如果slot已經遷移到別的節點,返回CLUSTER_REDIR_ASK
    if (migrating_slot && missing_keys) {
        if (error_code) *error_code = CLUSTER_REDIR_ASK;
        return server.cluster->migrating_slots_to[slot];
    }
    if (importing_slot &&
        (c->flags & CLIENT_ASKING || cmd->flags & CMD_ASKING))
    {
        if (multiple_keys && missing_keys) {
        	//如果slot還沒從別的節點遷移過來,返回CLUSTER_REDIR_DOWN_UNBOUND
            if (error_code) *error_code = CLUSTER_REDIR_UNSTABLE;
            return NULL;
        } else {
            return myself;
        }
    }
    //我是slave,slot是在我的master,並且cmd是隻讀操作
    if (c->flags & CLIENT_READONLY &&
        cmd->flags & CMD_READONLY &&
        nodeIsSlave(myself) &&
        myself->slaveof == n)
    {
        return myself;
    }
    //slot不在此節點,返回CLUSTER_REDIR_MOVED
    if (n != myself && error_code) *error_code = CLUSTER_REDIR_MOVED;
    return n;
}
   根據計算key對應的槽和節點時返回的錯誤,返回給客戶端進行重定向
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
    if (error_code == CLUSTER_REDIR_CROSS_SLOT) {
        addReplySds(c,sdsnew("-CROSSSLOT Keys in request don't hash to the same slot\r\n"));
    } else if (error_code == CLUSTER_REDIR_UNSTABLE) {
        addReplySds(c,sdsnew("-TRYAGAIN Multiple keys request during rehashing of slot\r\n"));
    } else if (error_code == CLUSTER_REDIR_DOWN_STATE) {
        addReplySds(c,sdsnew("-CLUSTERDOWN The cluster is down\r\n"));
    } else if (error_code == CLUSTER_REDIR_DOWN_UNBOUND) {
        addReplySds(c,sdsnew("-CLUSTERDOWN Hash slot not served\r\n"));
    } else if (error_code == CLUSTER_REDIR_MOVED ||
               error_code == CLUSTER_REDIR_ASK)
    {
        addReplySds(c,sdscatprintf(sdsempty(),
            "-%s %d %s:%d\r\n",(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
            hashslot,n->ip,n->port));
    } 
}