1. 程式人生 > >Redis原始碼解析:28叢集(四)手動故障轉移、從節點遷移

Redis原始碼解析:28叢集(四)手動故障轉移、從節點遷移

一:手動故障轉移

         Redis叢集支援手動故障轉移。也就是向從節點發送”CLUSTER  FAILOVER”命令,使其在主節點未下線的情況下,發起故障轉移流程,升級為新的主節點,而原來的主節點降級為從節點。

         為了不丟失資料,向從節點發送”CLUSTER  FAILOVER”命令後,流程如下:

         a:從節點收到命令後,向主節點發送CLUSTERMSG_TYPE_MFSTART包;

b:主節點收到該包後,會將其所有客戶端置於阻塞狀態,也就是在10s的時間內,不再處理客戶端發來的命令;並且在其傳送的心跳包中,會帶有CLUSTERMSG_FLAG0_PAUSED標記;

c:從節點收到主節點發來的,帶CLUSTERMSG_FLAG0_PAUSED標記的心跳包後,從中獲取主節點當前的複製偏移量。從節點等到自己的複製偏移量達到該值後,才會開始執行故障轉移流程:發起選舉、統計選票、贏得選舉、升級為主節點並更新配置;

”CLUSTER  FAILOVER”命令支援兩個選項:FORCE和TAKEOVER。使用這兩個選項,可以改變上述的流程。

如果有FORCE選項,則從節點不會與主節點進行互動,主節點也不會阻塞其客戶端,而是從節點立即開始故障轉移流程:發起選舉、統計選票、贏得選舉、升級為主節點並更新配置。

如果有TAKEOVER選項,則更加簡單粗暴:從節點不再發起選舉,而是直接將自己升級為主節點,接手原主節點的槽位,增加自己的configEpoch後更新配置。

因此,使用FORCE和TAKEOVER選項,主節點可以已經下線;而不使用任何選項,只發送”CLUSTER  FAILOVER”命令的話,主節點必須線上。

在clusterCommand函式中,處理”CLUSTER  FAILOVER”命令的部分程式碼如下:

    else if (!strcasecmp(c->argv[1]->ptr,"failover") &&
               (c->argc == 2 || c->argc == 3))
    {
        /* CLUSTER FAILOVER [FORCE|TAKEOVER] */
        int force = 0, takeover = 0;

        if (c->argc == 3) {
            if (!strcasecmp(c->argv[2]->ptr,"force")) {
                force = 1;
            } else if (!strcasecmp(c->argv[2]->ptr,"takeover")) {
                takeover = 1;
                force = 1; /* Takeover also implies force. */
            } else {
                addReply(c,shared.syntaxerr);
                return;
            }
        }

        /* Check preconditions. */
        if (nodeIsMaster(myself)) {
            addReplyError(c,"You should send CLUSTER FAILOVER to a slave");
            return;
        } else if (myself->slaveof == NULL) {
            addReplyError(c,"I'm a slave but my master is unknown to me");
            return;
        } else if (!force &&
                   (nodeFailed(myself->slaveof) ||
                    myself->slaveof->link == NULL))
        {
            addReplyError(c,"Master is down or failed, "
                            "please use CLUSTER FAILOVER FORCE");
            return;
        }
        resetManualFailover();
        server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;

        if (takeover) {
            /* A takeover does not perform any initial check. It just
             * generates a new configuration epoch for this node without
             * consensus, claims the master's slots, and broadcast the new
             * configuration. */
            redisLog(REDIS_WARNING,"Taking over the master (user request).");
            clusterBumpConfigEpochWithoutConsensus();
            clusterFailoverReplaceYourMaster();
        } else if (force) {
            /* If this is a forced failover, we don't need to talk with our
             * master to agree about the offset. We just failover taking over
             * it without coordination. */
            redisLog(REDIS_WARNING,"Forced failover user request accepted.");
            server.cluster->mf_can_start = 1;
        } else {
            redisLog(REDIS_WARNING,"Manual failover user request accepted.");
            clusterSendMFStart(myself->slaveof);
        }
        addReply(c,shared.ok);
    }

首先檢查命令的最後一個引數是否是FORCE或TAKEOVER;

如果當前節點是主節點;或者當前節點是從節點,但沒有主節點;或者當前從節點的主節點已經下線或者斷鏈,並且命令中沒有FORCE或TAKEOVER引數,則直接回復客戶端錯誤資訊後返回;

然後呼叫resetManualFailover,重置手動強制故障轉移的狀態;

置mf_end為當前時間加5秒,該屬性表示手動強制故障轉移流程的超時時間,也用來表示當前是否正在進行手動強制故障轉移;

如果命令最後一個引數為TAKEOVER,這表示收到命令的從節點無需經過選舉的過程,直接接手其主節點的槽位,併成為新的主節點。因此首先呼叫函式clusterBumpConfigEpochWithoutConsensus,產生新的configEpoch,以便後續更新配置;然後呼叫clusterFailoverReplaceYourMaster函式,轉變成為新的主節點,並將這種轉變廣播給叢集中所有節點;

如果命令最後一個引數是FORCE,這表示收到命令的從節點可以直接開始選舉過程,而無需達到主節點的複製偏移量之後才開始選舉過程。因此置mf_can_start為1,這樣在函式clusterHandleSlaveFailover中,即使在主節點未下線或者當前從節點的複製資料比較舊的情況下,也可以開始故障轉移流程;

如果最後一個引數不是FORCE或TAKEOVER,這表示收到命令的從節點,首先需要向主節點發送CLUSTERMSG_TYPE_MFSTART包,因此呼叫clusterSendMFStart函式,向其主節點發送該包;

         主節點收到CLUSTERMSG_TYPE_MFSTART包後,在clusterProcessPacket函式中,是這樣處理的:

    else if (type == CLUSTERMSG_TYPE_MFSTART) {
        /* This message is acceptable only if I'm a master and the sender
         * is one of my slaves. */
        if (!sender || sender->slaveof != myself) return 1;
        /* Manual failover requested from slaves. Initialize the state
         * accordingly. */
        resetManualFailover();
        server.cluster->mf_end = mstime() + REDIS_CLUSTER_MF_TIMEOUT;
        server.cluster->mf_slave = sender;
        pauseClients(mstime()+(REDIS_CLUSTER_MF_TIMEOUT*2));
        redisLog(REDIS_WARNING,"Manual failover requested by slave %.40s.",
            sender->name);
    }

         如果字典中找不到傳送節點,或者傳送節點的主節點不是當前節點,則直接返回;

         呼叫resetManualFailover,重置手動強制故障轉移的狀態;

         然後置mf_end為當前時間加5秒,該屬性表示手動強制故障轉移流程的超時時間,也用來表示當前是否正在進行手動強制故障轉移;

         然後設定mf_slave為sender,該屬性表示要進行手動強制故障轉移的從節點;

         然後呼叫pauseClients,使所有客戶端在之後的10s內阻塞;

         主節點在傳送心跳包時,在構建包頭時,如果發現當前正處於手動強制故障轉移階段,則會在包頭中增加CLUSTERMSG_FLAG0_PAUSED標記:

void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
    ...
    /* Set the message flags. */
    if (nodeIsMaster(myself) && server.cluster->mf_end)
        hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
    ...
}   

         從節點在clusterProcessPacket函式中處理收到的包,一旦發現主節點發來的,帶有CLUSTERMSG_FLAG0_PAUSED標記的包,就會將該主節點的複製偏移量記錄到server.cluster->mf_master_offset中:

int clusterProcessPacket(clusterLink *link) {
    ...
    /* Check if the sender is a known node. */
    sender = clusterLookupNode(hdr->sender);
    if (sender && !nodeInHandshake(sender)) {
        ...
        /* Update the replication offset info for this node. */
        sender->repl_offset = ntohu64(hdr->offset);
        sender->repl_offset_time = mstime();
        /* If we are a slave performing a manual failover and our master
         * sent its offset while already paused, populate the MF state. */
        if (server.cluster->mf_end &&
            nodeIsSlave(myself) &&
            myself->slaveof == sender &&
            hdr->mflags[0] & CLUSTERMSG_FLAG0_PAUSED &&
            server.cluster->mf_master_offset == 0)
        {
            server.cluster->mf_master_offset = sender->repl_offset;
            redisLog(REDIS_WARNING,
                "Received replication offset for paused "
                "master manual failover: %lld",
                server.cluster->mf_master_offset);
        }
    }
}   

         從節點在叢集定時器函式clusterCron中,會呼叫clusterHandleManualFailover函式,判斷一旦當前從節點的複製偏移量達到了server.cluster->mf_master_offset,就會置server.cluster->mf_can_start為1。這樣在接下來要呼叫的clusterHandleSlaveFailover函式中,就會立即開始故障轉移流程了。

         clusterHandleManualFailover函式的程式碼如下:

void clusterHandleManualFailover(void) {
    /* Return ASAP if no manual failover is in progress. */
    if (server.cluster->mf_end == 0) return;

    /* If mf_can_start is non-zero, the failover was already triggered so the
     * next steps are performed by clusterHandleSlaveFailover(). */
    if (server.cluster->mf_can_start) return;

    if (server.cluster->mf_master_offset == 0) return; /* Wait for offset... */

    if (server.cluster->mf_master_offset == replicationGetSlaveOffset()) {
        /* Our replication offset matches the master replication offset
         * announced after clients were paused. We can start the failover. */
        server.cluster->mf_can_start = 1;
        redisLog(REDIS_WARNING,
            "All master replication stream processed, "
            "manual failover can start.");
    }
}

不管是從節點,還是主節點,在叢集定時器函式clusterCron中,都會呼叫manualFailoverCheckTimeout函式,一旦發現手動故障轉移的超時時間已到,就會重置手動故障轉移的狀態,表示終止該過程。manualFailoverCheckTimeout函式程式碼如下:

/* If a manual failover timed out, abort it. */
void manualFailoverCheckTimeout(void) {
    if (server.cluster->mf_end && server.cluster->mf_end < mstime()) {
        redisLog(REDIS_WARNING,"Manual failover timed out.");
        resetManualFailover();
    }
}

二:從節點遷移

         在Redis叢集中,為了增強叢集的可用性,一般情況下需要為每個主節點配置若干從節點。但是這種主從關係如果是固定不變的,則經過一段時間之後,就有可能出現孤立主節點的情況,也就是一個主節點再也沒有可用於故障轉移的從節點了,一旦這樣的主節點下線,整個叢集也就不可用了。

         因此,在Redis叢集中,增加了從節點遷移的功能。簡單描述如下:一旦發現叢集中出現了孤立主節點,則某個從節點A就會自動變成該孤立主節點的從節點。該從節點A滿足這樣的條件:A的主節點具有最多的附屬從節點;A在這些附屬從節點中,節點ID是最小的(The acting slave is the slave among the masterswith the maximum number of attached slaves, that is not in FAIL state and hasthe smallest node ID)。

         該功能是在叢集定時器函式clusterCron中實現的。這部分的程式碼如下:

void clusterCron(void) {
    ...
    orphaned_masters = 0;
    max_slaves = 0;
    this_slaves = 0;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        now = mstime(); /* Use an updated time at every iteration. */
        mstime_t delay;

        if (node->flags &
            (REDIS_NODE_MYSELF|REDIS_NODE_NOADDR|REDIS_NODE_HANDSHAKE))
                continue;

        /* Orphaned master check, useful only if the current instance
         * is a slave that may migrate to another master. */
        if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) {
            int okslaves = clusterCountNonFailingSlaves(node);

            /* A master is orphaned if it is serving a non-zero number of
             * slots, have no working slaves, but used to have at least one
             * slave. */
            if (okslaves == 0 && node->numslots > 0 && node->numslaves)
                orphaned_masters++;
            if (okslaves > max_slaves) max_slaves = okslaves;
            if (nodeIsSlave(myself) && myself->slaveof == node)
                this_slaves = okslaves;
        }
        ...
    }
    ...
    if (nodeIsSlave(myself)) {
        ...
        /* If there are orphaned slaves, and we are a slave among the masters
         * with the max number of non-failing slaves, consider migrating to
         * the orphaned masters. Note that it does not make sense to try
         * a migration if there is no master with at least *two* working
         * slaves. */
        if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves)
            clusterHandleSlaveMigration(max_slaves);
    }
    ...
}  

輪訓字典server.cluster->nodes,只要其中的節點不是當前節點,沒有處於REDIS_NODE_NOADDR或者握手狀態,就對該node節點做相應的處理:

         如果當前節點是從節點,並且node節點是主節點,並且node未被標記為下線,則首先呼叫函式clusterCountNonFailingSlaves,計算node節點未下線的從節點個數okslaves,如果node主節點的okslaves為0,並且該主節點負責的插槽數不為0,說明該node主節點是孤立主節點,因此增加orphaned_masters的值;如果該node主節點的okslaves大於max_slaves,則將max_slaves改為okslaves,因此,max_slaves記錄了所有主節點中,擁有最多未下線從節點的那個主節點的未下線從節點個數;如果當前節點正好是node主節點的從節點之一,則將okslaves記錄到this_slaves中,以上都是為後續做從節點遷移做的準備;

         輪訓完所有節點之後,如果存在孤立主節點,並且max_slaves大於等於2,並且當前節點剛好是那個擁有最多未下線從節點的主節點的眾多從節點之一,則呼叫函式clusterHandleSlaveMigration,滿足條件的情況下,進行從節點遷移,也就是將當前從節點置為某孤立主節點的從節點。

         clusterHandleSlaveMigration函式的程式碼如下:

void clusterHandleSlaveMigration(int max_slaves) {
    int j, okslaves = 0;
    clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL;
    dictIterator *di;
    dictEntry *de;

    /* Step 1: Don't migrate if the cluster state is not ok. */
    if (server.cluster->state != REDIS_CLUSTER_OK) return;

    /* Step 2: Don't migrate if my master will not be left with at least
     *         'migration-barrier' slaves after my migration. */
    if (mymaster == NULL) return;
    for (j = 0; j < mymaster->numslaves; j++)
        if (!nodeFailed(mymaster->slaves[j]) &&
            !nodeTimedOut(mymaster->slaves[j])) okslaves++;
    if (okslaves <= server.cluster_migration_barrier) return;

    /* Step 3: Idenitfy a candidate for migration, and check if among the
     * masters with the greatest number of ok slaves, I'm the one with the
     * smaller node ID.
     *
     * Note that this means that eventually a replica migration will occurr
     * since slaves that are reachable again always have their FAIL flag
     * cleared. At the same time this does not mean that there are no
     * race conditions possible (two slaves migrating at the same time), but
     * this is extremely unlikely to happen, and harmless. */
    candidate = myself;
    di = dictGetSafeIterator(server.cluster->nodes);
    while((de = dictNext(di)) != NULL) {
        clusterNode *node = dictGetVal(de);
        int okslaves;

        /* Only iterate over working masters. */
        if (nodeIsSlave(node) || nodeFailed(node)) continue;
        /* If this master never had slaves so far, don't migrate. We want
         * to migrate to a master that remained orphaned, not masters that
         * were never configured to have slaves. */
        if (node->numslaves == 0) continue;
        okslaves = clusterCountNonFailingSlaves(node);

        if (okslaves == 0 && target == NULL && node->numslots > 0)
            target = node;

        if (okslaves == max_slaves) {
            for (j = 0; j < node->numslaves; j++) {
                if (memcmp(node->slaves[j]->name,
                           candidate->name,
                           REDIS_CLUSTER_NAMELEN) < 0)
                {
                    candidate = node->slaves[j];
                }
            }
        }
    }
    dictReleaseIterator(di);

    /* Step 4: perform the migration if there is a target, and if I'm the
     * candidate. */
    if (target && candidate == myself) {
        redisLog(REDIS_WARNING,"Migrating to orphaned master %.40s",
            target->name);
        clusterSetMaster(target);
    }
}

如果當前叢集狀態不是REDIS_CLUSTER_OK,則直接返回;如果當前從節點沒有主節點,則直接返回;

         接下來計算,當前從節點的主節點,具有未下線從節點的個數okslaves;如果okslaves小於等於遷移閾值server.cluster_migration_barrier,則直接返回;

         接下來,開始輪訓字典server.cluster->nodes,針對其中的每一個節點node:

         如果node節點是從節點,或者處於下線狀態,則直接處理下一個節點;如果node節點沒有配置從節點,則直接處理下一個節點;

         呼叫clusterCountNonFailingSlaves函式,計算該node節點的未下線主節點數okslaves;如果okslaves為0,並且該node節點的numslots大於0,說明該主節點之前有從節點,但是都下線了,因此找到了一個孤立主節點target;

         如果okslaves等於引數max_slaves,說明該node節點就是具有最多未下線從節點的主節點,因此將當前節點的節點ID,與其所有從節點的節點ID進行比較,如果當前節點的名字更大,則將candidate置為具有更小名字的那個從節點;(其實從這裡就可以直接退出返回了)

         輪訓完所有節點後,如果找到了孤立節點,並且當前節點擁有最小的節點ID,則呼叫clusterSetMaster,將target置為當前節點的主節點,並開始主從複製流程。

三:configEpoch衝突問題

         在叢集中,負責不同槽位的主節點,具有相同的configEpoch其實是沒有問題的,但是有可能因為人為介入的原因或者BUG的問題,導致具有相同configEpoch的主節點都宣稱負責相同的槽位,這在分散式系統中是致命的問題;因此,Redis規定叢集中的所有節點,必須具有不同的configEpoch。

         當某個從節點升級為新主節點時,它會得到一個大於當前所有節點的configEpoch的新configEpoch,所以不會導致具有重複configEpoch的從節點(因為一次選舉中,不會有兩個從節點同時勝出)。但是在管理員發起的重新分片過程的最後,遷入槽位的節點會自己更新自己的configEpoch,而無需其他節點的同意;或者手動強制故障轉移過程,也會導致從節點在無需其他節點同意的情況下更新configEpoch,以上的情況都可能導致出現多個主節點具有相同configEpoch的情況。

         因此,就需要一種演算法,保證叢集中所有節點的configEpoch都不相同。這種演算法是這樣實現的:當某個主節點收到其他主節點發來的心跳包後,發現包中的configEpoch與自己的configEpoch相同,就會呼叫clusterHandleConfigEpochCollision函式,解決這種configEpoch衝突的問題。

         clusterHandleConfigEpochCollision函式的程式碼如下:

void clusterHandleConfigEpochCollision(clusterNode *sender) {
    /* Prerequisites: nodes have the same configEpoch and are both masters. */
    if (sender->configEpoch != myself->configEpoch ||
        !nodeIsMaster(sender) || !nodeIsMaster(myself)) return;
    /* Don't act if the colliding node has a smaller Node ID. */
    if (memcmp(sender->name,myself->name,REDIS_CLUSTER_NAMELEN) <= 0) return;
    /* Get the next ID available at the best of this node knowledge. */
    server.cluster->currentEpoch++;
    myself->configEpoch = server.cluster->currentEpoch;
    clusterSaveConfigOrDie(1);
    redisLog(REDIS_VERBOSE,
        "WARNING: configEpoch collision with node %.40s."
        " configEpoch set to %llu",
        sender->name,
        (unsigned long long) myself->configEpoch);
}

         如果傳送節點的configEpoch不等於當前節點的configEpoch,或者傳送節點不是主節點,或者當前節點不是主節點,則直接返回;

         如果相比於當前節點的節點ID,傳送節點的節點ID更小,則直接返回;

         因此,較小名字的節點能獲得更大的configEpoch,接下來首先增加自己的currentEpoch,然後將configEpoch賦值為currentEpoch。

         這樣,即使有多個節點具有相同的configEpoch,最終,只有具有最大節點ID的節點的configEpoch保持不變,其他節點都會增加自己的configEpoch,而且增加的值會不同,具有最小NODE ID的節點,最終具有最大的configEpoch。

參考:

http://redis.io/topics/cluster-spec