1. 程式人生 > >Redis原始碼解析:27叢集(三)主從複製、故障轉移

Redis原始碼解析:27叢集(三)主從複製、故障轉移

一:主從複製

         在叢集中,為了保證叢集的健壯性,通常設定一部分叢集節點為主節點,另一部分叢集節點為這些主節點的從節點。一般情況下,需要保證每個主節點至少有一個從節點。

         叢集初始化時,每個叢集節點都是以獨立的主節點角色而存在的,通過向叢集節點發送”CLUSTER  MEET     <ip> <port>”命令,可以使叢集節點間相互認識。節點間相互認識之後,可以通過向某些叢集節點發送"CLUSTER  REPLICATE  <nodeID>"命令,使收到命令的叢集節點成為<nodeID>節點的從節點。

         在函式clusterCommand中,處理這部分的程式碼如下:

    else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) {
        /* CLUSTER REPLICATE <NODE ID> */
        clusterNode *n = clusterLookupNode(c->argv[2]->ptr);

        /* Lookup the specified node in our table. */
        if (!n) {
            addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[2]->ptr);
            return;
        }

        /* I can't replicate myself. */
        if (n == myself) {
            addReplyError(c,"Can't replicate myself");
            return;
        }

        /* Can't replicate a slave. */
        if (nodeIsSlave(n)) {
            addReplyError(c,"I can only replicate a master, not a slave.");
            return;
        }

        /* If the instance is currently a master, it should have no assigned
         * slots nor keys to accept to replicate some other node.
         * Slaves can switch to another master without issues. */
        if (nodeIsMaster(myself) &&
            (myself->numslots != 0 || dictSize(server.db[0].dict) != 0)) {
            addReplyError(c,
                "To set a master the node must be empty and "
                "without assigned slots.");
            return;
        }

        /* Set the master. */
        clusterSetMaster(n);
        clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG);
        addReply(c,shared.ok);
    } 

"CLUSTER  REPLICATE"命令的格式是"CLUSTER  REPLICATE  <nodeID>";

         首先,根據命令引數<nodeID>,從字典server.cluster->nodes中尋找對應的節點n;如果找不到n,或者,如果n就是當前節點,或者,n節點是個從節點,則回覆客戶端錯誤資訊後返回;

         如果當前節點為主節點,則當前節點不能有負責的槽位,當前節點的資料庫也必須為空,如果不滿足以上任一條件,則將不能置當前節點為從節點,因此回覆客戶端錯誤資訊後,直接返回;

         接下來,呼叫clusterSetMaster函式置當前節點為n節點的從節點,最後,回覆客戶端"OK";

         clusterSetMaster函式的程式碼如下:

void clusterSetMaster(clusterNode *n) {
    redisAssert(n != myself);
    redisAssert(myself->numslots == 0);

    if (nodeIsMaster(myself)) {
        myself->flags &= ~REDIS_NODE_MASTER;
        myself->flags |= REDIS_NODE_SLAVE;
        clusterCloseAllSlots();
    } else {
        if (myself->slaveof)
            clusterNodeRemoveSlave(myself->slaveof,myself);
    }
    myself->slaveof = n;
    clusterNodeAddSlave(n,myself);
    replicationSetMaster(n->ip, n->port);
    resetManualFailover();
}

首先,必須保證n不是當前節點,而且當前節點沒有負責任何槽位;

         如果當前節點已經是主節點了,則將節點標誌位中的REDIS_NODE_MASTER標記清除,並增加REDIS_NODE_SLAVE標記;然後呼叫clusterCloseAllSlots函式,置server.cluster->migrating_slots_to和server.cluster->importing_slots_from為空;

         如果當前節點為從節點,並且目前已有主節點,則呼叫clusterNodeRemoveSlave函式,將當前節點從其當前主節點的slaves陣列中刪除,解除當前節點與其當前主節點的關係;

         然後,置myself->slaveof為n,呼叫clusterNodeAddSlave函式,將當前節點插入到n->slaves中;

         然後,呼叫replicationSetMaster函式,這裡直接複用了主從複製部分的程式碼,相當於向當前節點發送了"SLAVE  OF"命令,開始主從複製流程;

         最後,呼叫resetManualFailover函式,清除手動故障轉移狀態;

二:故障轉移

1:紀元(epoch)

理解Redis叢集中的故障轉移,必須要理解紀元(epoch)在分散式Redis叢集中的作用,Redis叢集使用RAFT演算法中類似term的概念,在Redis叢集中這被稱之為紀元(epoch)。紀元的概念在介紹哨兵時已經介紹過了,在Redis叢集中,紀元的概念和作用與哨兵中的紀元類似。Redis叢集中的紀元主要是兩種:currentEpoch和configEpoch。

a、currentEpoch

         這是一個叢集狀態相關的概念,可以當做記錄叢集狀態變更的遞增版本號。每個叢集節點,都會通過server.cluster->currentEpoch記錄當前的currentEpoch。

叢集節點建立時,不管是主節點還是從節點,都置currentEpoch為0。當前節點接收到來自其他節點的包時,如果傳送者的currentEpoch(訊息頭部會包含傳送者的currentEpoch)大於當前節點的currentEpoch,那麼當前節點會更新currentEpoch為傳送者的currentEpoch。因此,叢集中所有節點的currentEpoch最終會達成一致,相當於對叢集狀態的認知達成了一致。

currentEpoch作用在於,當叢集的狀態發生改變,某個節點為了執行一些動作需要尋求其他節點的同意時,就會增加currentEpoch的值。目前currentEpoch只用於從節點的故障轉移流程,這就跟哨兵中的sentinel.current_epoch作用是一模一樣的。

當從節點A發現其所屬的主節點下線時,就會試圖發起故障轉移流程。首先就是增加currentEpoch的值,這個增加後的currentEpoch是所有叢集節點中最大的。然後從節點A向所有節點發包用於拉票,請求其他主節點投票給自己,使自己能成為新的主節點。

其他節點收到包後,發現傳送者的currentEpoch比自己的currentEpoch大,就會更新自己的currentEpoch,並在尚未投票的情況下,投票給從節點A,表示同意使其成為新的主節點。

b、configepoch

         這是一個叢集節點配置相關的概念,每個叢集節點都有自己獨一無二的configepoch。所謂的節點配置,實際上是指節點所負責的槽位資訊。

每一個主節點在向其他節點發送包時,都會附帶其configEpoch資訊,以及一份表示它所負責槽位的位陣列資訊。而從節點向其他節點發送包時,包中的configEpoch和負責槽位資訊,是其主節點的configEpoch和負責槽位資訊。節點收到包之後,就會根據包中的configEpoch和負責槽位資訊,記錄到相應節點屬性中。

         configEpoch主要用於解決不同的節點的配置發生衝突的情況。舉個例子就明白了:節點A宣稱負責槽位1,其向外傳送的包中,包含了自己的configEpoch和負責槽位資訊。節點C收到A發來的包後,發現自己當前沒有記錄槽位1的負責節點(也就是server.cluster->slots[1]為NULL),就會將A置為槽位1的負責節點(server.cluster->slots[1]= A),並記錄節點A的configEpoch。後來,節點C又收到了B發來的包,它也宣稱負責槽位1,此時,如何判斷槽位1到底由誰負責呢?這就是configEpoch起作用的時候了,C在B發來的包中,發現它的configEpoch,要比A的大,說明B是更新的配置,因此,就將槽位1的負責節點設定為B(server.cluster->slots[1] = B)。

         在從節點發起選舉,獲得足夠多的選票之後,成功當選時,也就是從節點試圖替代其下線主節點,成為新的主節點時,會增加它自己的configEpoch,使其成為當前所有叢集節點的configEpoch中的最大值。這樣,該從節點成為主節點後,就會向所有節點發送廣播包,強制其他節點更新相關槽位的負責節點為自己。

2:故障轉移概述

叢集中,當某個從節點發現其主節點下線時,就會嘗試在未來某個時間點發起故障轉移流程。具體而言就是先向其他叢集節點發送CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST包用於拉票,叢集主節點收到這樣的包後,如果在當前選舉紀元中沒有投過票,就會向該從節點發送CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK包,表示投票給該從節點。從節點如果在一段時間內收到了大部分主節點的投票,則表示選舉成功,接下來就是升級為主節點,並接管原主節點所負責的槽位,並將這種變化廣播給其他所有叢集節點,使它們感知這種變化,並修改自己記錄的配置資訊。

接下來,就是故障轉移中各個環節的詳細描述:

3:從節點的選舉和升級

         3.1、從節點發起故障轉移的時間

         從節點在發現其主節點下線時,並不是立即發起故障轉移流程,而是要等待一段時間,在未來的某個時間點才發起選舉。這個時間點是這樣計算的:

mstime() + 500ms + random()%500ms + rank*1000ms

其中,固定延時500ms,是為了留出時間,使主節點下線的訊息能傳播到叢集中其他節點,這樣叢集中的主節點才有可能投票;隨機延時是為了避免兩個從節點同時開始故障轉移流程;rank表示從節點的排名,排名是指當前從節點在下線主節點的所有從節點中的排名,排名主要是根據複製資料量來定,複製資料量越多,排名越靠前,因此,具有較多複製資料量的從節點可以更早發起故障轉移流程,從而更可能成為新的主節點。

rank主要是通過呼叫clusterGetSlaveRank得到的,該函式的程式碼如下:

int clusterGetSlaveRank(void) {
    long long myoffset;
    int j, rank = 0;
    clusterNode *master;

    redisAssert(nodeIsSlave(myself));
    master = myself->slaveof;
    if (master == NULL) return 0; /* Never called by slaves without master. */

    myoffset = replicationGetSlaveOffset();
    for (j = 0; j < master->numslaves; j++)
        if (master->slaves[j] != myself &&
            master->slaves[j]->repl_offset > myoffset) rank++;
    return rank;
}

在該函式中,首先得到當前從節點的主節點master,如果master為NULL,則直接返回0;

然後呼叫replicationGetSlaveOffset函式,得到當前從節點的複製偏移量myoffset;接下來輪訓master->slaves陣列,只要其中從節點的複製偏移量大於myoffset,則增加排名rank的值;

在沒有開始故障轉移之前,每隔一段時間就會呼叫一次clusterGetSlaveRank函式,以更新當前從節點的排名。

         3.2、從節點發起故障轉移,開始拉票

從節點的故障轉移,是在函式clusterHandleSlaveFailover中處理的,該函式在叢集定時器函式clusterCron中呼叫。本函式用於處理從節點進行故障轉移的整個流程,包括:判斷是否可以發起選舉;發起選舉;判斷選舉是否超時;判斷自己是否拉到了足夠的選票;使自己升級為新的主節點這些所有流程。首先看一下升級流程之前的程式碼,如下:

void clusterHandleSlaveFailover(void) {
    mstime_t data_age;
    mstime_t auth_age = mstime() - server.cluster->failover_auth_time;
    int needed_quorum = (server.cluster->size / 2) + 1;
    int manual_failover = server.cluster->mf_end != 0 &&
                          server.cluster->mf_can_start;
    mstime_t auth_timeout, auth_retry_time;

    server.cluster->todo_before_sleep &= ~CLUSTER_TODO_HANDLE_FAILOVER;

    /* Compute the failover timeout (the max time we have to send votes
     * and wait for replies), and the failover retry time (the time to wait
     * before trying to get voted again).
     *
     * Timeout is MIN(NODE_TIMEOUT*2,2000) milliseconds.
     * Retry is two times the Timeout.
     */
    auth_timeout = server.cluster_node_timeout*2;
    if (auth_timeout < 2000) auth_timeout = 2000;
    auth_retry_time = auth_timeout*2;

    /* Pre conditions to run the function, that must be met both in case
     * of an automatic or manual failover:
     * 1) We are a slave.
     * 2) Our master is flagged as FAIL, or this is a manual failover.
     * 3) It is serving slots. */
    if (nodeIsMaster(myself) ||
        myself->slaveof == NULL ||
        (!nodeFailed(myself->slaveof) && !manual_failover) ||
        myself->slaveof->numslots == 0)
    {
        /* There are no reasons to failover, so we set the reason why we
         * are returning without failing over to NONE. */
        server.cluster->cant_failover_reason = REDIS_CLUSTER_CANT_FAILOVER_NONE;
        return;
    }

    /* Set data_age to the number of seconds we are disconnected from
     * the master. */
    if (server.repl_state == REDIS_REPL_CONNECTED) {
        data_age = (mstime_t)(server.unixtime - server.master->lastinteraction)
                   * 1000;
    } else {
        data_age = (mstime_t)(server.unixtime - server.repl_down_since) * 1000;
    }

    /* Remove the node timeout from the data age as it is fine that we are
     * disconnected from our master at least for the time it was down to be
     * flagged as FAIL, that's the baseline. */
    if (data_age > server.cluster_node_timeout)
        data_age -= server.cluster_node_timeout;

    /* Check if our data is recent enough according to the slave validity
     * factor configured by the user.
     *
     * Check bypassed for manual failovers. */
    if (server.cluster_slave_validity_factor &&
        data_age >
        (((mstime_t)server.repl_ping_slave_period * 1000) +
         (server.cluster_node_timeout * server.cluster_slave_validity_factor)))
    {
        if (!manual_failover) {
            clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_DATA_AGE);
            return;
        }
    }

    /* If the previous failover attempt timedout and the retry time has
     * elapsed, we can setup a new one. */
    if (auth_age > auth_retry_time) {
        server.cluster->failover_auth_time = mstime() +
            500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */
            random() % 500; /* Random delay between 0 and 500 milliseconds. */
        server.cluster->failover_auth_count = 0;
        server.cluster->failover_auth_sent = 0;
        server.cluster->failover_auth_rank = clusterGetSlaveRank();
        /* We add another delay that is proportional to the slave rank.
         * Specifically 1 second * rank. This way slaves that have a probably
         * less updated replication offset, are penalized. */
        server.cluster->failover_auth_time +=
            server.cluster->failover_auth_rank * 1000;
        /* However if this is a manual failover, no delay is needed. */
        if (server.cluster->mf_end) {
            server.cluster->failover_auth_time = mstime();
            server.cluster->failover_auth_rank = 0;
        }
        redisLog(REDIS_WARNING,
            "Start of election delayed for %lld milliseconds "
            "(rank #%d, offset %lld).",
            server.cluster->failover_auth_time - mstime(),
            server.cluster->failover_auth_rank,
            replicationGetSlaveOffset());
        /* Now that we have a scheduled election, broadcast our offset
         * to all the other slaves so that they'll updated their offsets
         * if our offset is better. */
        clusterBroadcastPong(CLUSTER_BROADCAST_LOCAL_SLAVES);
        return;
    }

    /* It is possible that we received more updated offsets from other
     * slaves for the same master since we computed our election delay.
     * Update the delay if our rank changed.
     *
     * Not performed if this is a manual failover. */
    if (server.cluster->failover_auth_sent == 0 &&
        server.cluster->mf_end == 0)
    {
        int newrank = clusterGetSlaveRank();
        if (newrank > server.cluster->failover_auth_rank) {
            long long added_delay =
                (newrank - server.cluster->failover_auth_rank) * 1000;
            server.cluster->failover_auth_time += added_delay;
            server.cluster->failover_auth_rank = newrank;
            redisLog(REDIS_WARNING,
                "Slave rank updated to #%d, added %lld milliseconds of delay.",
                newrank, added_delay);
        }
    }

    /* Return ASAP if we can't still start the election. */
    if (mstime() < server.cluster->failover_auth_time) {
        clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_WAITING_DELAY);
        return;
    }

    /* Return ASAP if the election is too old to be valid. */
    if (auth_age > auth_timeout) {
        clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_EXPIRED);
        return;
    }

    /* Ask for votes if needed. */
    if (server.cluster->failover_auth_sent == 0) {
        server.cluster->currentEpoch++;
        server.cluster->failover_auth_epoch = server.cluster->currentEpoch;
        redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.",
            (unsigned long long) server.cluster->currentEpoch);
        clusterRequestFailoverAuth();
        server.cluster->failover_auth_sent = 1;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
        return; /* Wait for replies. */
    }

    ...
}

server.cluster->failover_auth_time屬性,表示從節點可以開始進行故障轉移的時間。叢集初始化時該屬性置為0,一旦滿足開始故障轉移的條件後,該屬性就置為未來的某個時間點,在該時間點,從節點才開始進行拉票。

函式中,首先計算auth_age,該變量表示距離發起故障轉移流程,已經過去了多少時間;然後計算needed_quorum,該變量表示當前從節點必須至少獲得多少選票,才能成為新的主節點;manual_failover表示是否是管理員手動觸發的故障轉移流程;

然後計算auth_timeout,該變量表示故障轉移流程(發起投票,等待迴應)的超時時間,超過該時間後還沒有獲得足夠的選票,則表示本次故障轉移失敗;

計算auth_retry_time,該變量表示判斷是否可以開始下一次故障轉移流程的時間,只有距離上一次發起故障轉移時,已經超過auth_retry_time之後,才表示可以開始下一次故障轉移了(auth_age > auth_retry_time);

接下來判斷當前節點是否可以進行故障轉移:當前節點是主節點;當前節點是從節點但是沒有主節點;當前節點的主節點不處於下線狀態並且不是手動強制進行故障轉移;當前節點的主節點沒有負責的槽位。滿足以上任一條件,則不能進行故障轉移,直接返回即可;

接下來計算,現在距離當前從節點與主節點最後互動的時間data_age,也就是當前從節點與主節點已經斷鏈了多長時間。如果data_age大於server.cluster_node_timeout,則從data_age中減去server.cluster_node_timeout,因為經過server.cluster_node_timeout時間沒有收到主節點的PING回覆,才會將其標記為PFAIL,因此data_age實際上表示:在主節點下線之前,當前從節點有多長時間沒有與其互動過了。data_age主要用於判斷當前從節點的資料新鮮度;如果data_age超過了一定時間,表示當前從節點的資料已經太老了,不能替換掉下線主節點,因此在不是手動強制故障轉移的情況下,直接返回;

如果auth_age大於auth_retry_time,表示可以開始進行下一次故障轉移了。如果之前沒有進行過故障轉移,則auth_age等於mstime,肯定大於auth_retry_time;如果之前進行過故障轉移,則只有距離上一次發起故障轉移時,已經超過auth_retry_time之後,才表示可以開始下一次故障轉移。滿足該條件後,設定故障轉移流程的開始時間:server.cluster->failover_auth_time為mstime() + 500 +random()%500 + rank*1000,該屬性的計算原理之前已經講過,不再贅述;

注意如果是管理員發起的手動強制執行故障轉移,則設定server.cluster->failover_auth_time為當前時間,表示會立即開始故障轉移流程;最後,呼叫clusterBroadcastPong,向該下線主節點的所有從節點發送PONG包,包頭部分帶有當前從節點的複製資料量,因此其他從節點收到之後,可以更新自己的排名;最後直接返回;

如果還沒有開始故障轉移,則呼叫clusterGetSlaveRank,取得當前從節點的最新排名。因為在開始故障轉移之前,可能會收到其他從節點發來的心跳包,因而可以根據心跳包中的複製偏移量更新本節點的排名,獲得新排名newrank,如果newrank比之前的排名靠後,則需要增加故障轉移開始時間的延遲,然後將newrank記錄到server.cluster->failover_auth_rank中;

如果當前時間還不到開始故障轉移的時候,則直接返回即可;

如果auth_age大於auth_timeout,說明之前的故障轉移超時了,因此直接返回;

走到這裡,說明可以開始故障轉移了。因此,首先增加當前節點的currentEpoch的值,表示要開始新一輪選舉了。此時該從節點的currentEpoch就是所有叢集節點中最大的;然後將該currentEpoch記錄到server.cluster->failover_auth_epoch中;

然後呼叫clusterRequestFailoverAuth,向所有叢集節點發送CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST包用於拉票;然後,置server.cluster->failover_auth_sent為1,表示已發起了故障轉移流程;最後直接返回;

         3.3、主節點投票

叢集中所有節點收到用於拉票的CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST包後,只有負責一定槽位的主節點能投票,其他沒資格的節點直接忽略掉該包。

在clusterProcessPacket中,判斷收到的是CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST包後,就會呼叫clusterSendFailoverAuthIfNeeded函式,在滿足條件的基礎上,給傳送者投票。該函式的程式碼如下:

void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
    clusterNode *master = node->slaveof;
    uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch);
    uint64_t requestConfigEpoch = ntohu64(request->configEpoch);
    unsigned char *claimed_slots = request->myslots;
    int force_ack = request->mflags[0] & CLUSTERMSG_FLAG0_FORCEACK;
    int j;

    /* IF we are not a master serving at least 1 slot, we don't have the
     * right to vote, as the cluster size in Redis Cluster is the number
     * of masters serving at least one slot, and quorum is the cluster
     * size + 1 */
    if (nodeIsSlave(myself) || myself->numslots == 0) return;

    /* Request epoch must be >= our currentEpoch.
     * Note that it is impossible for it to actually be greater since
     * our currentEpoch was updated as a side effect of receiving this
     * request, if the request epoch was greater. */
    if (requestCurrentEpoch < server.cluster->currentEpoch) {
        redisLog(REDIS_WARNING,
            "Failover auth denied to %.40s: reqEpoch (%llu) < curEpoch(%llu)",
            node->name,
            (unsigned long long) requestCurrentEpoch,
            (unsigned long long) server.cluster->currentEpoch);
        return;
    }

    /* I already voted for this epoch? Return ASAP. */
    if (server.cluster->lastVoteEpoch == server.cluster->currentEpoch) {
        redisLog(REDIS_WARNING,
                "Failover auth denied to %.40s: already voted for epoch %llu",
                node->name,
                (unsigned long long) server.cluster->currentEpoch);
        return;
    }

    /* Node must be a slave and its master down.
     * The master can be non failing if the request is flagged
     * with CLUSTERMSG_FLAG0_FORCEACK (manual failover). */
    if (nodeIsMaster(node) || master == NULL ||
        (!nodeFailed(master) && !force_ack))
    {
        if (nodeIsMaster(node)) {
            redisLog(REDIS_WARNING,
                    "Failover auth denied to %.40s: it is a master node",
                    node->name);
        } else if (master == NULL) {
            redisLog(REDIS_WARNING,
                    "Failover auth denied to %.40s: I don't know its master",
                    node->name);
        } else if (!nodeFailed(master)) {
            redisLog(REDIS_WARNING,
                    "Failover auth denied to %.40s: its master is up",
                    node->name);
        }
        return;
    }

    /* We did not voted for a slave about this master for two
     * times the node timeout. This is not strictly needed for correctness
     * of the algorithm but makes the base case more linear. */
    if (mstime() - node->slaveof->voted_time < server.cluster_node_timeout * 2)
    {
        redisLog(REDIS_WARNING,
                "Failover auth denied to %.40s: "
                "can't vote about this master before %lld milliseconds",
                node->name,
                (long long) ((server.cluster_node_timeout*2)-
                             (mstime() - node->slaveof->voted_time)));
        return;
    }

    /* The slave requesting the vote must have a configEpoch for the claimed
     * slots that is >= the one of the masters currently serving the same
     * slots in the current configuration. */
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(claimed_slots, j) == 0) continue;
        if (server.cluster->slots[j] == NULL ||
            server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
        {
            continue;
        }
        /* If we reached this point we found a slot that in our current slots
         * is served by a master with a greater configEpoch than the one claimed
         * by the slave requesting our vote. Refuse to vote for this slave. */
        redisLog(REDIS_WARNING,
                "Failover auth denied to %.40s: "
                "slot %d epoch (%llu) > reqEpoch (%llu)",
                node->name, j,
                (unsigned long long) server.cluster->slots[j]->configEpoch,
                (unsigned long long) requestConfigEpoch);
        return;
    }

    /* We can vote for this slave. */
    clusterSendFailoverAuth(node);
    server.cluster->lastVoteEpoch = server.cluster->currentEpoch;
    node->slaveof->voted_time = mstime();
    redisLog(REDIS_WARNING, "Failover auth granted to %.40s for epoch %llu",
        node->name, (unsigned long long) server.cluster->currentEpoch);
}

首先得到包頭中,傳送節點的currentEpoch和configEpoch;注意,如果傳送節點為從節點,則該configEpoch是其主節點的configEpoch;

如果當前節點為從節點,或者當前節點雖然為主節點,但是沒有負責的槽位,則沒有投票資格,因此直接返回;

如果傳送者的currentEpoch小於當前節點的currentEpoch,則拒絕為其投票。因為傳送者的狀態與當前叢集狀態不一致,可能是長時間下線的節點剛剛上線,這種情況下,直接返回即可;

如果當前節點lastVoteEpoch,與當前節點的currentEpoch相等,說明本界選舉中,當前節點已經投過票了,不在重複投票,直接返回(因此,如果有兩個從節點同時發起拉票,則當前節點先收到哪個節點的包,就只給那個節點投票。注意,即使這兩個從節點分屬不同主節點,也只能有一個從節點獲得選票);

如果傳送節點是主節點;或者傳送節點雖然是從節點,但是找不到其主節點;或者傳送節點的主節點並未下線並且這不是手動強制開始的故障轉移流程,則根據不同的條件,記錄日誌後直接返回;

針對同一個下線主節點,在2*server.cluster_node_timeout時間內,只會投一次票,這並非必須的限制條件(因為之前的lastVoteEpoch判斷,已經可以避免兩個從節點同時贏得本界選舉了),但是這可以使得獲勝從節點有時間將其成為新主節點的訊息通知給其他從節點,從而避免另一個從節點發起新一輪選舉又進行一次沒必要的故障轉移;

接下來,判斷髮送節點,對其宣稱要負責的槽位,是否比之前負責這些槽位的節點,具有相等或更新的配置紀元configEpoch:針對16384個槽位,只要傳送節點宣稱要負責該槽位,就判斷當前節點記錄的,該槽位當前的負責節點的configEpoch,是否比傳送節點的configEpoch要大,若是,說明發送節點的配置資訊不是最新的,可能是一個長時間下線的節點又重新上線了,這種情況下,不能給他投票,因此直接返回;

走到這裡,說明當前節點可以給傳送節點投票了。因此,呼叫clusterSendFailoverAuth函式向傳送節點發送CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK包表示投票;然後將server.cluster->currentEpoch記錄到server.cluster->lastVoteEpoch,表示本界選舉,當前節點已經投過票了;最後記錄當前投票時間到node->slaveof->voted_time中;

         3.4、從節點統計投票、贏得選舉

         從節點收到CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK包後,就會統計投票。這部分邏輯是在函式clusterProcessPacket中處理的。這部分的程式碼如下:

    else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) {
        if (!sender) return 1;  /* We don't know that node. */
        /* We consider this vote only if the sender is a master serving
         * a non zero number of slots, and its currentEpoch is greater or
         * equal to epoch where this node started the election. */
        if (nodeIsMaster(sender) && sender->numslots > 0 &&
            senderCurrentEpoch >= server.cluster->failover_auth_epoch)
        {
            server.cluster->failover_auth_count++;
            /* Maybe we reached a quorum here, set a flag to make sure
             * we check ASAP. */
            clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER);
        }
    }

如果傳送節點是主節點,並且該主節點有負責的槽位,並且傳送節點的CurrentEpoch,大於等於當前節點發起選舉時的CurrentEpoch(否則,可能是當前節點之前發起過一輪選舉,失敗後,又發起了新一輪選舉;而現在收到的包,是針對之前那一輪選舉的投票(有可能在網路中迷路了一段時間)),滿足以上條件,表示選票有效,因此增加server.cluster->failover_auth_count的值;

         在clusterHandleSlaveFailover函式的最後一部分,從節點判斷收到了大部分主節點的投票之後,就會開始升級為主節點。這部分的程式碼如下:

void clusterHandleSlaveFailover(void) {
    
    int needed_quorum = (server.cluster->size / 2) + 1;
    ...
    /* Check if we reached the quorum. */
    if (server.cluster->failover_auth_count >= needed_quorum) {
        /* We have the quorum, we can finally failover the master. */

        redisLog(REDIS_WARNING,
            "Failover election won: I'm the new master.");

        /* Update my configEpoch to the epoch of the election. */
        if (myself->configEpoch < server.cluster->failover_auth_epoch) {
            myself->configEpoch = server.cluster->failover_auth_epoch;
            redisLog(REDIS_WARNING,
                "configEpoch set to %llu after successful failover",
                (unsigned long long) myself->configEpoch);
        }

        /* Take responsability for the cluster slots. */
        clusterFailoverReplaceYourMaster();
    } else {
        clusterLogCantFailover(REDIS_CLUSTER_CANT_FAILOVER_WAITING_VOTES);
    }
}

         計算needed_quorum,該變量表示當前從節點必須至少獲得多少選票,才能成為新的主節點;        

如果server.cluster->failover_auth_count的值大於needed_quorum,表明當前從節點已經受到了大部分節點的支援,可以成為新的主節點了。

因此,首先更新myself->configEpoch為server.cluster->failover_auth_epoch,這樣當前節點的configEpoch就成為所有叢集節點中最大的了,方便後續更新配置。這種產生新configEpoch的方式是經過協商過的,因為只有從節點贏得大部分主節點投票的時候,才會產生新的configEpoch;最後,呼叫clusterFailoverReplaceYourMaster函式,取代下線主節點,成為新的主節點,並向其他節點廣播這種變化。

clusterFailoverReplaceYourMaster函式的程式碼如下:

void clusterFailoverReplaceYourMaster(void) {
    int j;
    clusterNode *oldmaster = myself->slaveof;

    if (nodeIsMaster(myself) || oldmaster == NULL) return;

    /* 1) Turn this node into a master. */
    clusterSetNodeAsMaster(myself);
    replicationUnsetMaster();

    /* 2) Claim all the slots assigned to our master. */
    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
        if (clusterNodeGetSlotBit(oldmaster,j)) {
            clusterDelSlot(j);
            clusterAddSlot(myself,j);
        }
    }

    /* 3) Update state and save config. */
    clusterUpdateState();
    clusterSaveConfigOrDie(1);

    /* 4) Pong all the other nodes so that they can update the state
     *    accordingly and detect that we switched to master role. */
    clusterBroadcastPong(CLUSTER_BROADCAST_ALL);

    /* 5) If there was a manual failover in progress, clear the state. */
    resetManualFailover();
}

首先呼叫clusterSetNodeAsMaster,將當前從節點從其主節點的slaves陣列中刪除,將當前節點的標誌位中,清除REDIS_NODE_SLAVE標記,增加REDIS_NODE_MASTER標記,並置當前節點的主節點為NULL,因此,呼叫該函式之後,當前節點在叢集中的的角色就是主節點了;

然後呼叫replicationUnsetMaster,取消主從複製過程,將當前節點升級為主節點;

然後輪訓16384個槽位,當前節點接手老的主節點負責的槽位;

然後呼叫clusterUpdateState和clusterSaveConfigOrDie,看是否需要修改叢集狀態(由下線轉為上線),然後將配置儲存到本地配置檔案中;

然後,向所有叢集節點廣播PONG包,使得"當前節點成為新主節點並接手相應槽位"的訊息,儘快通知給其他節點;

最後,呼叫resetManualFailover,重置手動強制故障轉移的狀態。

4:更新配置

         經過故障轉移之後,某個從節點升級成了主節點,並接手原主節點所負責的槽位。接下來就需要更新配置資訊,使得其他節點能感知到,這些槽位現在由新的節點負責了。此時就是configEpoch發揮作用的時候了。

         在上一節中,從節點成為主節點,接手下線主節點的槽位後,會向所有叢集節點廣播PONG包,使得所有叢集節點能夠更新關於這幾個槽位負責節點的配置資訊。更新配置這部分邏輯是在clusterProcessPacket函式中處理的,這部分的程式碼如下:

    if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG ||
        type == CLUSTERMSG_TYPE_MEET)
    {
        ...
        /* Check for role switch: slave -> master or master -> slave. */
        if (sender) {
            if (!memcmp(hdr->slaveof,REDIS_NODE_NULL_NAME,
                sizeof(hdr->slaveof)))
            {
                /* Node is a master. */
                clusterSetNodeAsMaster(sender);
            } else {
                /* Node is a slave. */
                ...
            }
        }

        /* Update our info about served slots.
         *
         * Note: this MUST happen after we update the master/slave state
         * so that REDIS_NODE_MASTER flag will be set. */

        /* Many checks are only needed if the set of served slots this
         * instance claims is different compared to the set of slots we have
         * for it. Check this ASAP to avoid other computational expansive
         * checks later. */
        clusterNode *sender_master = NULL; /* Sender or its master if slave. */
        int dirty_slots = 0; /* Sender claimed slots don't match my view? */

        if (sender) {
            sender_master = nodeIsMaster(sender) ? sender : sender->slaveof;
            if (sender_master) {
                dirty_slots = memcmp(sender_master->slots,
                        hdr->myslots,sizeof(hdr->myslots)) != 0;
            }
        }

        /* 1) If the sender of the message is a master, and we detected that
         *    the set of slots it claims changed, scan the slots to see if we
         *    need to update our configuration. */
        if (sender && nodeIsMaster(sender) && dirty_slots)
            clusterUpdateSlotsConfigWith(sender,senderConfigEpoch,hdr->myslots);

        /* 2) We also check for the reverse condition, that is, the sender
         *    claims to serve slots we know are served by a master with a
         *    greater configEpoch. If this happens we inform the sender.
         *
         * This is useful because sometimes after a partition heals, a
         * reappearing master may be the last one to claim a given set of
         * hash slots, but with a configuration that other instances know to
         * be deprecated. Example:
         *
         * A and B are master and slave for slots 1,2,3.
         * A is partitioned away, B gets promoted.
         * B is partitioned away, and A returns available.
         *
         * Usually B would PING A publishing its set of served slots and its
         * configEpoch, but because of the partition B can't inform A of the
         * new configuration, so other nodes that have an updated table must
         * do it. In this way A will stop to act as a master (or can try to
         * failover if there are the conditions to win the election). */
        if (sender && dirty_slots) {
            int j;

            for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
                if (bitmapTestBit(hdr->myslots,j)) {
                    if (server.cluster->slots[j] == sender ||
                        server.cluster->slots[j] == NULL) continue;
                    if (server.cluster->slots[j]->configEpoch >
                        senderConfigEpoch)
                    {
                        redisLog(REDIS_VERBOSE,
                            "Node %.40s has old slots configuration, sending "
                            "an UPDATE message about %.40s",
                                sender->name, server.cluster->slots[j]->name);
                        clusterSendUpdate(sender->link,
                            server.cluster->slots[j]);

                        /* TODO: instead of exiting the loop send every other
                         * UPDATE packet for other nodes that are the new owner
                         * of sender's slots. */
                        break;
                    }
                }
            }
        }
    }   

如果傳送節點的slaveof為空,說明發送節點為主節點。則呼叫clusterSetNodeAsMaster函式。函式中,如果傳送節點已經是主節點則直接返回,如果傳送節點之前是從節點,則該函式會將其置為主節點;

         接下來,判斷髮送節點宣稱負責的槽位,是否與當前節點記錄的不同,若不同,則置dirty_slots為1。

一種情況是,如果該節點之前是從節點,則其負責的槽位為空,因此像剛剛完成故障轉移的新主節點,它在當前節點的視角中,負責的槽位就為空,但是該節點在其傳送的PONG包中,會宣稱其負責原主節點的槽位,所以這裡的dirty_slots會被置為1;

         如果傳送節點宣稱負責的槽位與當前節點記錄的不同,並且傳送節點是主節點,則說明該傳送節點可能是剛剛完成故障轉移,新上任的主節點。這種情況下,呼叫函式clusterUpdateSlotsConfigWith,更新當前節點關於傳送節點的配置資訊(這是處理senderConfigEpoch大於server.cluster->slots[j]->configEpoch的情況)。

         還有一種情況就是,之前下線的主節點,經過一段時間之後又重新上線了。而此時在其他節點眼中,它所負責的槽位已經被其他節點所接手了。因此,在重新上線的主節點發來的心跳包中,所宣稱負責的槽位與當前節點記錄的不同,所以這裡的dirty_slots也會被置為1。

這種情況下,輪訓16384個槽位,只要傳送節點宣稱負責的一個槽位,與當前節點記錄的負責該槽位的節點不一致,並且傳送節點的配置紀元configepoch更小,說明發送節點的配置資訊需要更新,因此向該節點發送UPDATE包,包中帶有最新負責該槽位的節點資訊;(這是處理senderConfigEpoch小於server.cluster->slots[j]->configEpoch的情況)

         節點在收到UPDATE包後,在clusterProcessPacket函式中,相應的處理邏輯是:

    else if (type == CLUSTERMSG_TYPE_UPDATE) {
        clusterNode *n; /* The node the update is about. */
        uint64_t reportedConfigEpoch =
                    ntohu64(hdr->data.update.nodecfg.configEpoch);

        if (!sender) return 1;  /* We don't know the sender. */
        n = clusterLookupNode(hdr->data.update.nodecfg.nodename);
        if (!n) return 1;   /* We don't know the reported node. */
        if (n->configEpoch >= reportedConfigEpoch) return 1; /* Nothing new. */

        /* If in our current config the node is a slave, set it as a master. */
        if (nodeIsSlave(n)) clusterSetNodeAsMaster(n);

        /* Update the node's configEpoch. */
        n->configEpoch = reportedConfigEpoch;
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_FSYNC_CONFIG);

        /* Check the bitmap of served slots and update our
         * config accordingly. */
        clusterUpdateSlotsConfigWith(n,reportedConfigEpoch,
            hdr->data.update.nodecfg.slots);
    } 

         首先取得包中,宣稱負責槽位的節點的configEpoch:reportedConfigEpoch;

         如果在字典server.cluster->nodes中找不到傳送節點,說明還不認識傳送節點,則直接返回;然後在字典server.cluster->nodes中尋找宣稱負責槽位的節點n;如果找不到n,則直接返回;如果當前節點記錄的n的configEpoch,比reportedConfigEpoch大,則不能更新配置,直接返回;

         如果當前節點記錄的n為從節點,則呼叫clusterSetNodeAsMaster,將其標記為主節點;然後更新n節點的configEpoch;

         最後,呼叫clusterUpdateSlotsConfigWith函式,更新當前節點關於槽位的負責節點資訊,並在滿足條件的情況下,使當前節點成為節點n的從節點;

         因此,更新配置資訊,不管哪種情況,最終都是通過clusterUpdateSlotsConfigWith函式實現的。該函式的程式碼如下:

void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoch, unsigned char *slots) {
    int j;
    clusterNode *curmaster, *newmaster = NULL;
    /* The dirty slots list is a list of slots for which we lose the ownership
     * while having still keys inside. This usually happens after a failover
     * or after a manual cluster reconfiguration operated by the admin.
     *
     * If the update message is not able to demote a master to slave (in this
     * case we'll resync with the master updating the whole key space), we
     * need to delete all the keys in the slots we lost ownership. */
    uint16_t dirty_slots[REDIS_CLUSTER_SLOTS];
    int dirty_slots_count = 0;

    /* Here we set curmaster to this node or the node this node
     * replicates to if it's a slave. In the for loop we are
     * interested to check if slots are taken away from curmaster. */
    curmaster = nodeIsMaster(myself) ? myself : myself->slaveof;

    if (sender == myself) {
        redisLog(REDIS_WARNING,"Discarding UPDATE message about myself.");
        return;
    }

    for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) {
        if (bitmapTestBit(slots,j)) {
            /* The slot is already bound to the sender of this message. */
            if (server.cluster->slots[j] == sender) continue;

            /* The slot is in importing state, it should be modified only
             * manually via redis-trib (example: a resharding is in progress
             * and the migrating side slot was already closed and is advertising
             * a new config. We still want the slot to be closed manually). */
            if (server.cluster->importing_slots_from[j]) continue;

            /* We rebind the slot to the new node claiming it if:
             * 1) The slot was unassigned or the new node claims it with a
             *    greater configEpoch.
             * 2) We are not currently importing the slot. */
            if (server.cluster->slots[j] == NULL ||
                server.cluster->slots[j]->configEpoch < senderConfigEpoch)
            {
                /* Was this slot mine, and still contains keys? Mark it as
                 * a dirty slot. */
                if (server.cluster->slots[j] == myself &&
                    countKeysInSlot(j) &&
                    sender != myself)
                {
                    dirty_slots[dirty_slots_count] = j;
                    dirty_slots_count++;
                }

                if (server.cluster->slots[j] == curmaster)
                    newmaster = sender;
                clusterDelSlot(j);
                clusterAddSlot(sender,j);
                clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                                     CLUSTER_TODO_UPDATE_STATE|
                                     CLUSTER_TODO_FSYNC_CONFIG);
            }
        }
    }

    /* If at least one slot was reassigned from a node to another node
     * with a greater configEpoch, it is possible that:
     * 1) We are a master left without slots. This means that we were
     *    failed over and we should turn into a replica of the new
     *    master.
     * 2) We are a slave and our master is left without slots. We need
     *    to replicate to the new slots owner. */
    if (newmaster && curmaster->numslots == 0) {
        redisLog(REDIS_WARNING,
            "Configuration change detected. Reconfiguring myself "
            "as a replica of %.40s", sender->name);
        clusterSetMaster(sender);
        clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
                             CLUSTER_TODO_UPDATE_STATE|
                             CLUSTER_TODO_FSYNC_CONFIG);
    } else if (dirty_slots_count) {
        /* If we are here, we received an update message which removed
         * ownership for certain slots we still have keys about, but still
         * we are serving some slots, so this master node was not demoted to
         * a slave.
         *
         * In order to maintain a consistent state between keys and slots
         * we need to remove all the keys from the slots we lost. */
        for (j = 0; j < dirty_slots_count; j++)
            delKeysInSlot(dirty_slots[j]);
    }
}

本函式處理這樣的場景:節點sender宣稱自己的configEpoch和負責的槽位資訊slots,但是這些槽位之前由其他節點負責,而sender節點的configEpoch更大,說明需要更新這些槽位的負責節點為sender。

         本函式會由多種角色的節點執行,比如是之前下線的主節點,經過一段時間後,又重新上線了,該重新上線主節點會收到其他節點發來的UPDATE包,會通過該函式更新自己的配置資訊,併成為其他節點的從節點;也可以是已下線主節點的其他從節點,收到新主節點發來的心跳包之後,通過該函式更新自己的配置資訊,併成為新上任主節點的從節點;又可以是叢集中的其他節點,收到新主節點發來的心跳包後,僅僅更新自己的配置資訊;或者是叢集剛建立時,當前節點收到其他節點宣稱負責某些槽位的包後,更新自己的配置資訊;

         函式中,首先得到當前節點的主節點curmaster,如果當前節點是主節點,則curmaster就是myself,否則,curmaster是myself->slaveof;

         如果sender就是當前節點,則直接返回;

         接下來輪訓16384個槽位,只要sender宣稱負責該槽位,則進行處理:

         如果該槽位的負責節點已經是sender,則直接處理下一個槽位;如果當前節點正在遷入該槽位,則直接處理下一個槽位;

         如果該槽位尚未有負責節點(可能是叢集剛建立時),或者該槽位的負責節點的configEpoch小於sender節點的configEpoch,則需要將該槽位改為由sender負責:如果該槽位目前是由當前節點負責,並且槽位中尚有key,這種情況,說明當前節點是下線後又重新上線的舊主節點。因此,將該槽位記錄到dirty_slots陣列中;如果該槽位現在由curmaster負責,說明當前節點要麼是下線後又重新上線的節點,要麼是下線主節點的其他從節點,兩種情況,都需要當前節點成為新主節點的從節點,因此置newmaster為sender;接下來就是呼叫clusterDelSlot和clusterAddSlot,將該槽位的負責節點改為sender;

         輪訓完所有槽位之後,如果設定了newmaster,並且curmaster負責的槽位已清空,則可以將當前節點置為sender的從節點了,因此呼叫clusterSetMaster置sender為當前節點的主節點;

         如果不滿足上面的條件,並且dirty_slots_count不為0,則輪訓陣列dirty_slots,將其中每個槽位的所有key,從資料庫中刪除。

         這種情況的場景是:下線主節點A,原來負責槽位1,2,3,經過很長一段時間,現在A又重新上線了,但是現在槽位1,2由B節點負責,而槽位3由C節點負責。A現在收到的UPDATE包,其中只有節點B負責槽位1,2的資訊(因為其他節點D收到A的包之後,發現A宣稱負責的1,2,3槽位,現在由其他節點負責了。節點D輪訓16384個槽位,只要發現槽位1的負責節點B的configEpoch大於A的configEpoch,它就只會發出一個UPDATE包,其中只帶有節點B的資訊(函式clusterProcessPacket中傳送UPDATE包的邏輯)),因此節點A收到該UPDATE包之後,只能先將槽位1和2刪除,並將其中的KEY從資料庫中刪除,只有下次收到關於節點C負責槽位3的UPDATE包之後,把槽位3也清除了,此時就符合curmaster->numslots == 0的條件了,才能把自己置為C的從節點。

參考:

http://redis.io/topics/cluster-spec