1. 程式人生 > >Raft協議實戰之Redis Sentinel的選舉Leader原始碼解析

Raft協議實戰之Redis Sentinel的選舉Leader原始碼解析

http://www.blogjava.net/jinfeng_wang/archive/2016/12/14/432108.html

Raft協議是用來解決分散式系統一致性問題的協議,在很長一段時間,Paxos被認為是解決分散式系統一致性的代名詞。但是Paxos難於理解,更難以實現,諸如Google大牛們開發的分散式鎖系統Chubby都遭遇了很多坑。Raft協議設計的初衷就是容易實現,保證對於普遍的人群都可以十分舒適容易的去理解。另外,它必須能夠讓人形成直觀的認識,這樣系統的構建者才能夠在現實中進行必然的擴充套件。

本文從Redis Sentinel叢集選擇Leader的具體流程和原始碼分析,描述Raft協議中的選舉Leader演算法。關於Redis Sentinel的介紹可以參看本人的另一篇文章

《redis sentinel設計與實現》

當Sentinel叢集有Sentinel發現master客觀下線了,就會開始故障轉移流程,故障轉移流程的第一步就是在Sentinel叢集選擇一個Leader,讓Leader完成故障轉移流程。

Raft協議選舉流程

描述Raft選舉流程之前需要了解一些概念。

節點的狀態

Raft協議描述的節點共有三種狀態:Leader, Follower, Candidate。在系統執行正常的時候只有Leader和Follower兩種狀態的節點。一個Leader節點,其他的節點都是Follower。Candidate是系統執行不穩定時期的中間狀態,當一個Follower對Leader的的心跳出現異常,就會轉變成Candidate,Candidate會去競選新的Leader,它會向其他節點發送競選投票,如果大多數節點都投票給它,它就會替代原來的Leader,變成新的Leader,原來的Leader會降級成Follower。

image

term

在分散式系統中,各個節點的時間同步是一個很大的難題,但是為了識別過期時間,時間資訊又必不可少。Raft協議為了解決這個問題,引入了term(任期)的概念。Raft協議將時間切分為一個個的Term,可以認為是一種“邏輯時間”。

image

RPC

Raft協議在選舉階段互動的RPC有兩類:RequestVote和AppendEntries。

  • RequestVote是用來向其他節點發送競選投票。
  • AppendEntries是當該節點得到更多的選票後,成為Leader,向其他節點確認訊息。

選舉流程

Raft採用心跳機制觸發Leader選舉。系統啟動後,全部節點初始化為Follower,term為0.節點如果收到了RequestVote或者AppendEntries,就會保持自己的Follower身份。如果一段時間內沒收到AppendEntries訊息直到選舉超時,說明在該節點的超時時間內還沒發現Leader,Follower就會轉換成Candidate,自己開始競選Leader。一旦轉化為Candidate,該節點立即開始下面幾件事情:

  • 1、增加自己的term。
  • 2、啟動一個新的定時器。
  • 3、給自己投一票。
  • 4、向所有其他節點發送RequestVote,並等待其他節點的回覆。

如果在這過程中收到了其他節點發送的AppendEntries,就說明已經有Leader產生,自己就轉換成Follower,選舉結束。

如果在計時器超時前,節點收到多數節點的同意投票,就轉換成Leader。同時向所有其他節點發送AppendEntries,告知自己成為了Leader。

每個節點在一個term內只能投一票,採取先到先得的策略,Candidate前面說到已經投給了自己,Follower會投給第一個收到RequestVote的節點。每個Follower有一個計時器,在計時器超時時仍然沒有接受到來自Leader的心跳RPC, 則自己轉換為Candidate, 開始請求投票,就是上面的的競選Leader步驟。

如果多個Candidate發起投票,每個Candidate都沒拿到多數的投票(Split Vote),那麼就會等到計時器超時後重新成為Candidate,重複前面競選Leader步驟。

Raft協議的定時器採取隨機超時時間,這是選舉Leader的關鍵。每個節點定時器的超時時間隨機設定,隨機選取配置時間的1倍到2倍之間。由於隨機配置,所以各個Follower同時轉成Candidate的時間一般不一樣,在同一個term內,先轉為Candidate的節點會先發起投票,從而獲得多數票。多個節點同時轉換為Candidate的可能性很小。即使幾個Candidate同時發起投票,在該term內有幾個節點獲得一樣高的票數,只是這個term無法選出Leader。由於各個節點定時器的超時時間隨機生成,那麼最先進入下一個term的節點,將更有機會成為Leader。連續多次發生在一個term內節點獲得一樣高票數在理論上機率很小,實際上可以認為完全不可能發生。一般1-2個term類,Leader就會被選出來。

Sentinel的選舉流程

Sentinel叢集正常執行的時候每個節點epoch相同,當需要故障轉移的時候會在叢集中選出Leader執行故障轉移操作。Sentinel採用了Raft協議實現了Sentinel間選舉Leader的演算法,不過也不完全跟論文描述的步驟一致。Sentinel叢集執行過程中故障轉移完成,所有Sentinel又會恢復平等。Leader僅僅是故障轉移操作出現的角色。

選舉流程

  • 1、某個Sentinel認定master客觀下線的節點後,該Sentinel會先看看自己有沒有投過票,如果自己已經投過票給其他Sentinel了,在2倍故障轉移的超時時間自己就不會成為Leader。相當於它是一個Follower。
  • 2、如果該Sentinel還沒投過票,那麼它就成為Candidate。
  • 3、和Raft協議描述的一樣,成為Candidate,Sentinel需要完成幾件事情
    • 1)更新故障轉移狀態為start
    • 2)當前epoch加1,相當於進入一個新term,在Sentinel中epoch就是Raft協議中的term。
    • 3)更新自己的超時時間為當前時間隨機加上一段時間,隨機時間為1s內的隨機毫秒數。
    • 4)向其他節點發送is-master-down-by-addr命令請求投票。命令會帶上自己的epoch。
    • 5)給自己投一票,在Sentinel中,投票的方式是把自己master結構體裡的leader和leader_epoch改成投給的Sentinel和它的epoch。
  • 4、其他Sentinel會收到Candidate的is-master-down-by-addr命令。如果Sentinel當前epoch和Candidate傳給他的epoch一樣,說明他已經把自己master結構體裡的leader和leader_epoch改成其他Candidate,相當於把票投給了其他Candidate。投過票給別的Sentinel後,在當前epoch內自己就只能成為Follower。
  • 5、Candidate會不斷的統計自己的票數,直到他發現認同他成為Leader的票數超過一半而且超過它配置的quorum(quorum可以參考《redis sentinel設計與實現》)。Sentinel比Raft協議增加了quorum,這樣一個Sentinel能否當選Leader還取決於它配置的quorum。
  • 6、如果在一個選舉時間內,Candidate沒有獲得超過一半且超過它配置的quorum的票數,自己的這次選舉就失敗了。
  • 7、如果在一個epoch內,沒有一個Candidate獲得更多的票數。那麼等待超過2倍故障轉移的超時時間後,Candidate增加epoch重新投票。
  • 8、如果某個Candidate獲得超過一半且超過它配置的quorum的票數,那麼它就成為了Leader。
  • 9、與Raft協議不同,Leader並不會把自己成為Leader的訊息發給其他Sentinel。其他Sentinel等待Leader從slave選出master後,檢測到新的master正常工作後,就會去掉客觀下線的標識,從而不需要進入故障轉移流程。

關於Sentinel超時時間的說明

Sentinel超時機制有幾個超時概念。

  • failover_start_time 下一選舉啟動的時間。預設是當前時間加上1s內的隨機毫秒數
  • failover_state_change_time 故障轉移中狀態變更的時間。
  • failover_timeout 故障轉移超時時間。預設是3分鐘。
  • election_timeout 選舉超時時間,是預設選舉超時時間和failover_timeout的最小值。預設是10s。

Follower成為Candidate後,會更新failover_start_time為當前時間加上1s內的隨機毫秒數。更新failover_state_change_time為當前時間。

Candidate的當前時間減去failover_start_time大於election_timeout,說明Candidate還沒獲得足夠的選票,此次epoch的選舉已經超時,那麼轉變成Follower。需要等到mstime() - failover_start_time < failover_timeout*2的時候才開始下一次獲得成為Candidate的機會。

如果一個Follower把某個Candidate設為自己認為的Leader,那麼它的failover_start_time會設定為當前時間加上1s內的隨機毫秒數。這樣它就進入了上面說的需要等到mstime() - failover_start_time < failover_timeout*2的時候才開始下一次獲得成為Candidate的機會。

因為每個Sentinel判斷節點客觀下線的時間不是同時開始的,一般都有先後,這樣先開始的Sentinel就更有機會贏得更多選票,另外failover_state_change_time為1s內的隨機毫秒數,這樣也把各個節點的超時時間分散開來。本人嘗試過很多次,Sentinel間的Leader選舉過程基本上一個epoch內就完成了。

Sentinel 選舉流程原始碼解析

Sentinel的選舉流程的程式碼基本都在sentinel.c檔案中,下面結合原始碼對Sentinel的選舉流程進行說明。

定時任務

    

/* ======================== SENTINEL timer handler ==========================
 * This is the "main" our Sentinel, being sentinel completely non blocking
 * in design. The function is called every second.
 * -------------------------------------------------------------------------- */


/* Perform scheduled operations for the specified Redis instance. */
// 對給定的例項執行定期操作
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {


    /* ========== MONITORING HALF ============ */
    /* ==========     監控操作    =========*/


    /* Every kind of instance */
    /* 對所有型別例項進行處理 */


    // 如果有需要的話,建立連向例項的網路連線
    sentinelReconnectInstance(ri);


    // 根據情況,向例項傳送 PING、 INFO 或者 PUBLISH 命令
    sentinelSendPeriodicCommands(ri);


    /* ============== ACTING HALF ============= */
    /* ==============  故障檢測   ============= */


    /* We don't proceed with the acting half if we are in TILT mode.
     * TILT happens when we find something odd with the time, like a
     * sudden change in the clock. */
    // 如果 Sentinel 處於 TILT 模式,那麼不執行故障檢測。
    if (sentinel.tilt) {


        // 如果 TILI 模式未解除,那麼不執行動作
        if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;


        // 時間已過,退出 TILT 模式
        sentinel.tilt = 0;
        sentinelEvent(REDIS_WARNING,"-tilt",NULL,"#tilt mode exited");
    }


    /* Every kind of instance */
    // 檢查給定例項是否進入 SDOWN 狀態
    sentinelCheckSubjectivelyDown(ri);


    /* Masters and slaves */
    if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
        /* Nothing so far. */
    }


    /* Only masters */
    /* 對主伺服器進行處理 */
    if (ri->flags & SRI_MASTER) {


        // 判斷 master 是否進入 ODOWN 狀態
        sentinelCheckObjectivelyDown(ri);


        // 如果主伺服器進入了 ODOWN 狀態,那麼開始一次故障轉移操作
        if (sentinelStartFailoverIfNeeded(ri))
            // 強制向其他 Sentinel 傳送 SENTINEL is-master-down-by-addr 命令
            // 重新整理其他 Sentinel 關於主伺服器的狀態
            sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);


        // 執行故障轉移
        sentinelFailoverStateMachine(ri);


        // 如果有需要的話,向其他 Sentinel 傳送 SENTINEL is-master-down-by-addr 命令
        // 重新整理其他 Sentinel 關於主伺服器的狀態
        // 這一句是對那些沒有進入 if(sentinelStartFailoverIfNeeded(ri)) { /* ... */ }
        // 語句的主伺服器使用的
        sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
    }
}

Sentinel會每隔100ms執行一次sentinelHandleRedisInstance函式。流程會檢查master是否進入SDOWN狀態,接著會檢查master是否進入ODOWN狀態,接著會檢視是否需要開始故障轉移,如果開始故障轉移就會向其他節點拉去投票,接下來有個故障轉移的狀態機,根據不同的failover_state,決定完成不同的操作,正常的時候failover_state為SENTINEL_FAILOVER_STATE_NONE。

向其他Sentinel獲取投票或者獲取對master存活狀態的判斷結果

 

/* Receive the SENTINEL is-master-down-by-addr reply, see the
 * sentinelAskMasterStateToOtherSentinels() function for more information. */
// 本回調函式用於處理SENTINEL 接收到其他 SENTINEL 
// 發回的 SENTINEL is-master-down-by-addr 命令的回覆
void sentinelReceiveIsMasterDownReply(redisAsyncContext *c, void *reply, void *privdata) {
    sentinelRedisInstance *ri = c->data;
    redisReply *r;


    if (ri) ri->pending_commands--;
    if (!reply || !ri) return;
    r = reply;


    /* Ignore every error or unexpected reply.
     * 忽略錯誤回覆
     * Note that if the command returns an error for any reason we'll
     * end clearing the SRI_MASTER_DOWN flag for timeout anyway. */
    if (r->type == REDIS_REPLY_ARRAY && r->elements == 3 &&
        r->element[0]->type == REDIS_REPLY_INTEGER &&
        r->element[1]->type == REDIS_REPLY_STRING &&
        r->element[2]->type == REDIS_REPLY_INTEGER)
    {
        // 更新最後一次回覆詢問的時間
        ri->last_master_down_reply_time = mstime();


        // 設定 SENTINEL 認為主伺服器的狀態
        if (r->element[0]->integer == 1) {
            // 已下線
            ri->flags |= SRI_MASTER_DOWN;
        } else {
            // 未下線
            ri->flags &= ~SRI_MASTER_DOWN;
        }


        // 如果執行 ID 不是 "*" 的話,那麼這是一個帶投票的回覆
        if (strcmp(r->element[1]->str,"*")) {
            /* If the runid in the reply is not "*" the Sentinel actually
             * replied with a vote. */
            sdsfree(ri->leader);
            // 列印日誌
            if (ri->leader_epoch != r->element[2]->integer)
                redisLog(REDIS_WARNING,
                    "%s voted for %s %llu", ri->name,
                    r->element[1]->str,
                    (unsigned long long) r->element[2]->integer);
            // 設定例項的領頭
            ri->leader = sdsnew(r->element[1]->str);
            ri->leader_epoch = r->element[2]->integer;
        }
    }
}

對於每個節點,Sentinel都會確認節點是否SDOWN,對於master,還需要確認ODOWN。sentinelAskMasterStateToOtherSentinels方法會在master進入SDOWN或者ODOWN呼叫sentinel is-master-down-by-addr命令,SDOWN時,該命令用來獲取其他Sentinel對於master的存活狀態,ODOWN是用來像其他節點投票的。SDOWN時,flags是SENTINEL_NO_FLAGS,ODOWN時,flags是SENTINEL_ASK_FORCED。

檢查是否開始故障轉移

 

/* This function checks if there are the conditions to start the failover,
 * that is:
 *
 * 這個函式檢查是否需要開始一次故障轉移操作:
 *
 * 1) Master must be in ODOWN condition.
 *    主伺服器已經計入 ODOWN 狀態。
 * 2) No failover already in progress.
 *    當前沒有針對同一主伺服器的故障轉移操作在執行。
 * 3) No failover already attempted recently.
 *    最近時間內,這個主伺服器沒有嘗試過執行故障轉移
 *    (應該是為了防止頻繁執行)。
 * 
 * We still don't know if we'll win the election so it is possible that we
 * start the failover but that we'll not be able to act.
 *
 * 雖然 Sentinel 可以發起一次故障轉移,但因為故障轉移操作是由領頭 Sentinel 執行的,
 * 所以發起故障轉移的 Sentinel 不一定就是執行故障轉移的 Sentinel 。
 *
 * Return non-zero if a failover was started. 
 *
 * 如果故障轉移操作成功開始,那麼函式返回非 0 值。
 */
int sentinelStartFailoverIfNeeded(sentinelRedisInstance *master) {


    /* We can't failover if the master is not in O_DOWN state. */
    if (!(master->flags & SRI_O_DOWN)) return 0;


    /* Failover already in progress? */
    if (master->flags & SRI_FAILOVER_IN_PROGRESS) return 0;


    /* Last failover attempt started too little time ago? */
    if (mstime() - master->failover_start_time <
        master->failover_timeout*2)
    {
        if (master->failover_delay_logged != master->failover_start_time) {
            time_t clock = (master->failover_start_time +
                            master->failover_timeout*2) / 1000;
            char ctimebuf[26];


            ctime_r(&clock,ctimebuf);
            ctimebuf[24] = '\0'; /* Remove newline. */
            master->failover_delay_logged = master->failover_start_time;
            redisLog(REDIS_WARNING,
                "Next failover delay: I will not start a failover before %s",
                ctimebuf);
        }
        return 0;
    }


    // 開始一次故障轉移
    sentinelStartFailover(master);


    return 1;
}

sentinelStartFailoverIfNeeded方法會檢查master是否為ODOWN狀態。因為定時任務每次就會執行到該函式,所以還要確認故障轉移狀態SRI_FAILOVER_IN_PROGRESS是否已經開始。然後會看定時任務是否超時,只有以上條件都滿足才能開始故障轉移。關於定時任務是否超時,failover_start_time預設為0,它有2個地方會被修改,一個是開始故障轉移後,一個是收到其他Sentinel的投票請求。failover_start_time被修改的值為 mstime()+rand()%SENTINEL_MAX_DESYNC,這就是Raft協議說的隨機因子。SENTINEL_MAX_DESYNC是1000,相當於failover_start_time是當前時間加上1s內的隨機值,這個保證了,不同Sentinel在超時後,下次申請Leader時間的隨機。所以故障轉移開始,像Raft協議描述的“啟動一個新的定時器”,設定了failover_start_time。在投票的時候設定failover_start_time,那麼先投票,再通過ODOWN和SRI_FAILOVER_IN_PROGRESS的節點,在檢查定時任務是否超時的時候就無法通過,相當於是Raft協議中的Follower,它不會參與競爭Leader。

成為Candidate,開始競選Leader

/* Setup the master state to start a failover. */
// 設定主伺服器的狀態,開始一次故障轉移
void sentinelStartFailover(sentinelRedisInstance *master) {
    redisAssert(master->flags & SRI_MASTER);


    // 更新故障轉移狀態
    master->failover_state = SENTINEL_FAILOVER_STATE_WAIT_START;


    // 更新主伺服器狀態
    master->flags |= SRI_FAILOVER_IN_PROGRESS;


    // 更新紀元
    master->failover_epoch = ++sentinel.current_epoch;


    sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
        (unsigned long long) sentinel.current_epoch);


    sentinelEvent(REDIS_WARNING,"+try-failover",master,"%@");


    // 記錄故障轉移狀態的變更時間
    master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    master->failover_state_change_time = mstime();
}

如果Sentinel通過三重檢查,進入了sentinelStartFailover,相當於成為了Candidate,它會做以下幾件事情:

  • 1、把failover_state改成SENTINEL_FAILOVER_STATE_WAIT_START。
  • 2、把master的狀態改成故障轉移中SRI_FAILOVER_IN_PROGRESS。
  • 3、增加master的current_epoch,並賦值給failover_epoch。
  • 4、把failover_start_time改成mstime()+rand()%SENTINEL_MAX_DESYNC。
  • 5、把failover_state_change_time改成mstime()。

sentinelStartFailover完成了成為Candidate的前面兩步,接著要回到前面的定時任務sentinelHandleRedisInstance。因為sentinelStartFailoverIfNeeded返回了1,所以進入if流程,執行sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);,開始向其他Sentinel拉票。然後就進入sentinelFailoverStateMachine

Follower投票

這裡先來看下投票的原始碼。

/* =============================== FAILOVER ================================= */


/* Vote for the sentinel with 'req_runid' or return the old vote if already
 * voted for the specifed 'req_epoch' or one greater.
 *
 * 為執行 ID 為 req_runid 的 Sentinel 投上一票,有兩種額外情況可能出現:
 * 1) 如果 Sentinel 在 req_epoch 紀元已經投過票了,那麼返回之前投的票。
 * 2) 如果 Sentinel 已經為大於 req_epoch 的紀元投過票了,那麼返回更大紀元的投票。
 *
 * If a vote is not available returns NULL, otherwise return the Sentinel
 * runid and populate the leader_epoch with the epoch of the vote. 
 *
 * 如果投票暫時不可用,那麼返回 NULL 。
 * 否則返回 Sentinel 的執行 ID ,並將被投票的紀元儲存到 leader_epoch 指標的值裡面。
 */
char *sentinelVoteLeader(sentinelRedisInstance *master, uint64_t req_epoch, char *req_runid, uint64_t *leader_epoch) {
    if (req_epoch > sentinel.current_epoch) {
        sentinel.current_epoch = req_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+new-epoch",master,"%llu",
            (unsigned long long) sentinel.current_epoch);
    }


    if (master->leader_epoch < req_epoch && sentinel.current_epoch <= req_epoch)
    {
        sdsfree(master->leader);
        master->leader = sdsnew(req_runid);
        master->leader_epoch = sentinel.current_epoch;
        sentinelFlushConfig();
        sentinelEvent(REDIS_WARNING,"+vote-for-leader",master,"%s %llu",
            master->leader, (unsigned long long) master->leader_epoch);
        /* If we did not voted for ourselves, set the master failover start
         * time to now, in order to force a delay before we can start a
         * failover for the same master. */
        if (strcasecmp(master->leader,server.runid))
            master->failover_start_time = mstime()+rand()%SENTINEL_MAX_DESYNC;
    }


    *leader_epoch = master->leader_epoch;
    return master->leader ? sdsnew(master->leader) : NULL;
}

前面說到Candidate開始競選後,會把當前epoch加1,這樣就比Follower大1,Follower收到第一個Candidate的投票後,因為自己當前的epoch比Candidate小,所以把當前的epoch改成第一個Candidate的epoch,然後把自己認為的Leader設定成該Candidate。然後其他Candidate再發起對該Follower的投票時,由於這些Candidate的epoch與自己選出Leader的epoch一樣,所以不會再改變自己認為的Leader。這樣,在一個epoch內,Follower就只能投出一票,給它第一個收到投票請求的Candidate。最後有個if (strcasecmp(master->leader,server.runid)),這個是為了設定failover_start_time,這樣Follower在當前epoch內,就無法成為Candidate了。

Sentinel執行任務的狀態機

// 執行故障轉移
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
    redisAssert(ri->flags & SRI_MASTER);


    // master 未進入故障轉移狀態,直接返回
    if (!(ri->flags & SRI_FAILOVER_IN_PROGRESS)) return;


    switch(ri->failover_state) {


        // 等待故障轉移開始
        case SENTINEL_FAILOVER_STATE_WAIT_START:
            sentinelFailoverWaitStart(ri);
            break;


        // 選擇新主伺服器
        case SENTINEL_FAILOVER_STATE_SELECT_SLAVE:
            sentinelFailoverSelectSlave(ri);
            break;
        
        // 升級被選中的從伺服器為新主伺服器
        case SENTINEL_FAILOVER_STATE_SEND_SLAVEOF_NOONE:
            sentinelFailoverSendSlaveOfNoOne(ri);
            break;


        // 等待升級生效,如果升級超時,那麼重新選擇新主伺服器
        // 具體情況請看 sentinelRefreshInstanceInfo 函式
        case SENTINEL_FAILOVER_STATE_WAIT_PROMOTION:
            sentinelFailoverWaitPromotion(ri);
            break;


        // 向從伺服器傳送 SLAVEOF 命令,讓它們同步新主伺服器
        case SENTINEL_FAILOVER_STATE_RECONF_SLAVES:
            sentinelFailoverReconfNextSlave(ri);
            break;
    }
}

Sentinel處理故障轉移流程是採用狀態處理的模式,不同狀態處理不同任務,任務完成後更新狀態到下一個狀態。sentinelFailoverStateMachine函式根據failover_state決定進入什麼流程。在sentinelFailoverWaitStart函式裡面,Leader就被選出了,其他幾個狀態是Leader進行故障轉移的流程。

確認自己是否成為Leader

// 準備執行故障轉移
void sentinelFailoverWaitStart(sentinelRedisInstance *ri) {
    char *leader;
    int isleader;


    /* Check if we are the leader for the failover epoch. */
    // 獲取給定紀元的領頭 Sentinel
    leader = sentinelGetLeader(ri, ri->failover_epoch);
    // 本 Sentinel 是否為領頭 Sentinel ?
    isleader = leader && strcasecmp(leader,server.runid) == 0;
    sdsfree(leader);


    /* If I'm not the leader, and it is not a forced failover via
     * SENTINEL FAILOVER, then I can't continue with the failover. */
    // 如果本 Sentinel 不是領頭,並且這次故障遷移不是一次強制故障遷移操作
    // 那麼本 Sentinel 不做動作
    if (!isleader && !(ri->flags & SRI_FORCE_FAILOVER)) {
        int election_timeout = SENTINEL_ELECTION_TIMEOUT;


        /* The election timeout is the MIN between SENTINEL_ELECTION_TIMEOUT
         * and the configured failover timeout. */
        // 當選的時長(類似於任期)是 SENTINEL_ELECTION_TIMEOUT
        // 和 Sentinel 設定的故障遷移時長之間的較小那個值
        if (election_timeout > ri->failover_timeout)
            election_timeout = ri->failover_timeout;


        /* Abort the failover if I'm not the leader after some time. */
        // Sentinel 的當選時間已過,取消故障轉移計劃
        if (mstime() - ri->failover_start_time > election_timeout) {
            sentinelEvent(REDIS_WARNING,"-failover-abort-not-elected",ri,"%@");
            // 取消故障轉移
            sentinelAbortFailover(ri);
        }
        return;
    }


    // 本 Sentinel 作為領頭,開始執行故障遷移操作...


    sentinelEvent(REDIS_WARNING,"+elected-leader",ri,"%@");


    // 進入選擇從伺服器狀態
    ri->failover_state = SENTINEL_FAILOVER_STATE_SELECT_SLAVE;
    ri->failover_state_change_time = mstime();


    sentinelEvent(REDIS_WARNING,"+failover-state-select-slave",ri,"%@");
}

前面說到的sentinelStartFailover把failover_state設定成SENTINEL_FAILOVER_STATE_WAIT_START,於是進入sentinelFailoverWaitStart

sentinelFailoverWaitStart會先檢視leader是否已經選出。如果Leader是自己或者這是一次強制故障轉移,failover_state就設定為SENTINEL_FAILOVER_STATE_SELECT_SLAVE。強制故障轉移是通過Sentinel的SENTINEL FAILOVER <master-name>命令設定的,這裡不做討論。

如果自己當選Leader,就會進入下一個任務處理狀態,開始故障轉移流程。如果在election_timeout內還沒當選為Leader,那麼本次epoch內,Candidate就沒有當選,需要等待failover_timeout超時,進入下一次競選,或者本次epoch內,有Leader被選出,自己變會Follower。

統計投票

/* Scan all the Sentinels attached to this master to check if there
 * is a leader for the specified epoch.
 *
 * 掃描所有監視 master 的 Sentinels ,檢視是否有 Sentinels 是這個紀元的領頭。
 *
 * To be a leader for a given epoch, we should have the majorify of
 * the Sentinels we know that reported the same instance as
 * leader for the same epoch. 
 *
 * 要讓一個 Sentinel 成為本紀元的領頭,
 * 這個 Sentinel 必須讓大多數其他 Sentinel 承認它是該紀元的領頭才行。
 */
// 選舉出 master 在指定 epoch 上的領頭
char *sentinelGetLeader(sentinelRedisInstance *master, uint64_t epoch) {
    dict *counters;
    dictIterator *di;
    dictEntry *de;
    unsigned int voters = 0, voters_quorum;
    char *myvote;
    char *winner = NULL;
    uint64_t leader_epoch;
    uint64_t max_votes = 0;


    redisAssert(master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS));


    // 統計器
    counters = dictCreate(&leaderVotesDictType,NULL);


    /* Count other sentinels votes */
    // 統計其他 sentinel 的主觀 leader 投票
    di = dictGetIterator(master->sentinels);
    while((de = dictNext(di)) != NULL) {
        sentinelRedisInstance *ri = dictGetVal(de);


        // 為目標 Sentinel 選出的領頭 Sentinel 增加一票
        if (ri->leader != NULL && ri->leader_epoch == sentinel.current_epoch)
            sentinelLeaderIncr(counters,ri->leader);


        // 統計投票數量
        voters++;
    }
    dictReleaseIterator(di);


    /* Check what's the winner. For the winner to win, it needs two conditions:
     *
     * 選出領頭 leader ,它必須滿足以下兩個條件:
     *
     * 1) Absolute majority between voters (50% + 1).
     *    有多於一般的 Sentinel 支援
     * 2) And anyway at least master->quorum votes. 
     *    投票數至少要有 master->quorum 那麼多
     */
    di = dictGetIterator(counters);
    while((de = dictNext(di)) != NULL) {


        // 取出票數
        uint64_t votes = dictGetUnsignedIntegerVal(de);


        // 選出票數最大的人
        if (votes > max_votes) {
            max_votes = votes;
            winner = dictGetKey(de);
        }
    }
    dictReleaseIterator(di);


    /* Count this Sentinel vote:
     * if this Sentinel did not voted yet, either vote for the most
     * common voted sentinel, or for itself if no vote exists at all. */
    // 本 Sentinel 進行投票
    // 如果 Sentinel 之前還沒有進行投票,那麼有兩種選擇:
    // 1)如果選出了 winner (最多票數支援的 Sentinel ),那麼這個 Sentinel 也投 winner 一票
    // 2)如果沒有選出 winner ,那麼 Sentinel 投自己一票
    if (winner)
        myvote = sentinelVoteLeader(master,epoch,winner,&leader_epoch);
    else
        myvote = sentinelVoteLeader(master,epoch,server.runid,&leader_epoch);


    // 領頭 Sentinel 已選出,並且領頭的紀元和給定的紀元一樣
    if (myvote && leader_epoch == epoch) {


        // 為領頭 Sentinel 增加一票(這一票來自本 Sentinel )
        uint64_t votes = sentinelLeaderIncr(counters,myvote);


        // 如果投票之後的票數比最大票數要大,那麼更換領頭 Sentinel
        if (votes > max_votes) {
            max_votes = votes;
            winner = myvote;
        }
    }
    voters++; /* Anyway, count me as one of the voters. */


    // 如果支援領頭的投票數量不超過半數
    // 並且支援票數不超過 master 配置指定的投票數量
    // 那麼這次領頭選舉無效
    voters_quorum = voters/2+1;
    if (winner && (max_votes < voters_quorum || max_votes < master->quorum))
        winner = NULL;


    // 返回領頭 Sentinel ,或者 NULL
    winner = winner ? sdsnew(winner) : NULL;
    sdsfree(myvote);
    dictRelease(counters);
    return winner;
}

sentinelGetLeader會統計所有其他Sentinel的投票結果,如果投票結果中有個Sentinel獲得了超過半數且超過master的quorum,那麼Leader就被選出了。

Candidate第一次進入sentinelGetLeader函式的時候是還沒向其他Sentinel發起投票,winner為NULL,於是就會給自己投上一票,這就是前面Raft協議說到的,在開始競選前“3、給自己投一票“,這樣競選前的4個步驟就全部完成了。以後再進入sentinelGetLeader就可以統計其他Sentinel的投票數目。當發現有個Sentinel的投票資料超過半數且超過quorum,就會返回該Sentinel,sentinelFailoverWaitStart會判斷該Sentinel是否是自己,如果是自己,那麼自己就成為了Leader,開始進行故障轉移,不是自己,那麼等待競選超時,成為Follower。

關於Leader通知其他Sentinel自己成為Leader的說明

在Sentinel的實現裡面。關於Leader傳送競選成功的訊息給其他Sentinel,並沒有專門的邏輯。某個Sentinel成為Leader後,他就默默的幹起活。故障轉移中Leader通過獲取選出的slave的INFO資訊,發現其確認了master身份,Leader就會修改config_epoch為最新的epoch。

/* Process the INFO output from masters. */
// 從主伺服器或者從伺服器所返回的 INFO 命令的回覆中分析相關資訊
// (上面的英文註釋錯了,這個函式不僅處理主伺服器的 INFO 回覆,還處理從伺服器的 INFO 回覆)
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
    sds *lines;
    int numlines, j;
    int role = 0;


    /* The following fields must be reset to a given value in the case they
     * are not found at all in the INFO output. */
    // 將該變數重置為 0 ,避免 INFO 回覆中無該值的情況
    ri->master_link_down_time = 0;


    /* Process line by line. */
    // 對 INFO 命令的回覆進行逐行分析
    lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
    for (j = 0; j < numlines; j++) {
        sentinelRedisInstance *slave;
        sds l = lines[j];


        /* run_id:<40 hex chars>*/
        // 讀取並分析 runid
        if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {


            // 新設定 runid
            if (ri->runid == NULL) {
                ri->runid = sdsnewlen(l+7,40);
            } else {
                // RUNID 不同,說明伺服器已重啟
                if (strncmp(ri->runid,l+7,40) != 0) {
                    sentinelEvent(REDIS_NOTICE,"+reboot",ri,"%@");


                    // 釋放舊 ID ,設定新 ID
                    sdsfree(ri->runid);
                    ri->runid = sdsnewlen(l+7,40);
                }
            }
        }


        // 讀取從伺服器的 ip 和埠號
        /* old versions: slave0:<ip>,<port>,<state>
         * new versions: slave0:ip=127.0.0.1,port=9999,... */
        if ((ri->flags & SRI_MASTER) &&
            sdslen(l) >= 7 &&
            !memcmp(l,"slave",5) && isdigit(l[5]))
        {
            char *ip, *port, *end;


            if (strstr(l,"ip=") == NULL) {
                /* Old format. */
                ip = strchr(l,':'); if (!ip) continue;
                ip++; /* Now ip points to start of ip address. */
                port = strchr(ip,','); if (!port) continue;
                *port = '\0'; /* nul term for easy access. */
                port++; /* Now port points to start of port number. */
                end = strchr(port,','); if (!end) continue;
                *end = '\0'; /* nul term for easy access. */
            } else {
                /* New format. */
                ip = strstr(l,"ip="); if (!ip) continue;
                ip += 3; /* Now ip points to start of ip address. */
                port = strstr(l,"port="); if (!port) continue;
                port += 5; /* Now port points to start of port number. */
                /* Nul term both fields for easy access. */
                end = strchr(ip,','); if (end) *end = '\0';
                end = strchr(port,','); if (end) *end = '\0';
            }


            /* Check if we already have this slave into our table,
             * otherwise add it. */
            // 如果發現有新的從伺服器出現,那麼為它新增例項
            if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
                if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
                            atoi(port), ri->quorum, ri)) != NULL)
                {
                    sentinelEvent(REDIS_NOTICE,"+slave",slave,"%@");
                }
            }
        }


        /* master_link_down_since_seconds:<seconds> */
        // 讀取主從伺服器的斷線時長
        // 這個只會在例項是從伺服器,並且主從連線斷開的情況下出現
        if (sdslen(l) >= 32 &&
            !memcmp(l,"master_link_down_since_seconds",30))
        {
            ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
        }


        /* role:<role> */
        // 讀取例項的角色
        if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
        else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;


        // 處理從伺服器
        if (role == SRI_SLAVE) {


            /* master_host:<host> */
            // 讀入主伺服器的 IP
            if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
                if (ri->slave_master_host == NULL ||
                    strcasecmp(l+12,ri->slave_master_host))
                {
                    sdsfree(ri->slave_master_host);
                    ri->slave_master_host = sdsnew(l+12);
                    ri->slave_conf_change_time = mstime();
                }
            }


            /* master_port:<port> */
            // 讀入主伺服器的埠號
            if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
                int slave_master_port = atoi(l+12);


                if (ri->slave_master_port != slave_master_port) {
                    ri->slave_master_port = slave_master_port;
                    ri->slave_conf_change_time = mstime();
                }
            }
            
            /* master_link_status:<status> */
            // 讀入主伺服器的狀態
            if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
                ri->slave_master_link_status =
                    (strcasecmp(l+19,"up") == 0) ?
                    SENTINEL_MASTER_LINK_STATUS_UP :
                    SENTINEL_MASTER_LINK_STATUS_DOWN;
            }


            /* slave_priority:<priority> */
            // 讀入從伺服器的優先順序
            if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
                ri->slave_priority = atoi(l+15);


            /* slave_repl_offset:<offset> */
            // 讀入從伺服器的複製偏移量
            if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
                ri->slave_repl_offset = strtoull(l+18,NULL,10);
        }
    }


    // 更新重新整理 INFO 命令回覆的時間
    ri->info_refresh = mstime();
    sdsfreesplitres(lines,numlines);


    /* ---------------------------- Acting half -----------------------------
     * Some things will not happen if sentinel.tilt is true, but some will
     * still be processed. 
     *
     * 如果 sentinel 進入了 TILT 模式,那麼可能只有一部分動作會被執行
     */


    /* Remember when the role changed. */
    if (role != ri->role_reported) {
        ri->role_reported_time = mstime();
        ri->role_reported = role;
        if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
        /* Log the event with +role-change if the new role is coherent or
         * with -role-change if there is a mismatch with the current config. */
        sentinelEvent(REDIS_VERBOSE,
            ((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
            "+role-change" : "-role-change",
            ri, "%@ new reported role is %s",
            role == SRI_MASTER ? "master" : "slave",
            ri->flags & SRI_MASTER ? "master" : "slave");
    }


    /* None of the following conditions are processed when in tilt mode, so
     * return asap. */
    // 如果 Sentinel 正處於 TILT 模式,那麼它不能執行以下的語句。
    if (sentinel.tilt) return;


    /* Handle master -> slave role switch. */
    // 例項被 Sentinel 標識為主伺服器,但根據 INFO 命令的回覆
    // 這個例項的身份為從伺服器
    if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
        /* Nothing to do, but masters claiming to be slaves are
         * considered to be unreachable by Sentinel, so eventually
         * a failover will be triggered. */
        // 如果一個主伺服器變為從伺服器,那麼 Sentinel 將這個主伺服器看作是不可用的
    }


    /* Handle slave -> master role switch. */
    // 處理從伺服器轉變為主伺服器的情況
    if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
        /* If this is a promoted slave we can change state to the
         * failover state machine. */


        // 如果這是被選中升級為新主伺服器的從伺服器
        // 那麼更新相關的故障轉移屬性
        if ((ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
            (ri->master->failover_state ==
                SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
        {
            /* Now that we are sure the slave was reconfigured as a master
             * set the master configuration epoch to the epoch we won the
             * election to perform this failover. This will force the other
             * Sentinels to update their config (assuming there is not
             * a newer one already available). */
            // 這是一個被 Sentinel 傳送 SLAVEOF no one 之後由從伺服器變為主伺服器的例項
            // 將這個新主伺服器的配置紀元設定為 Sentinel 贏得領頭選舉的紀元
            // 這一操作會強制其他 Sentinel 更新它們自己的配置
            // (假設沒有一個更新的紀元存在的話)
            // 更新從伺服器的主伺服器(已下線)的配置紀元
            ri->master->config_epoch = ri->master->failover_epoch;
            // 設定從伺服器的主伺服器(已下線)的故障轉移狀態
            // 這個狀態會讓從伺服器開始同步新的主伺服器
            ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
            // 更新從伺服器的主伺服器(已下線)的故障轉移狀態變更時間
            ri->master->failover_state_change_time = mstime();
            // 將當前 Sentinel 狀態儲存到配置檔案裡面
            sentinelFlushConfig();
            // 傳送事件
            sentinelEvent(REDIS_WARNING,"+promoted-slave",ri,"%@");
            sentinelEvent(REDIS_WARNING,"+failover-state-reconf-slaves",
                ri->master,"%@");
            // 執行指令碼
            sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
                "start",ri->master->addr,ri->addr);


        // 這個例項由從伺服器變為了主伺服器,並且沒有進入 TILT 模式
        // (可能是因為重啟造成的,或者之前的下線主伺服器重新上線了)
        } else {
            /* A slave turned into a master. We want to force our view and
             * reconfigure as slave. Wait some time after the change before
             * going forward, to receive new configs if any. */
            // 如果一個從伺服器變為了主伺服器,那麼我們會考慮將它變回一個從伺服器


            // 將 PUBLISH 命令的傳送時間乘以 4 ,給於一定緩衝時間
            mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;


            // 如果這個例項的主伺服器運作正常
            // 並且例項在一段時間內沒有進入過 SDOWN 狀態或者 ODOWN 狀態
            // 並且例項報告它是主伺服器的時間已經超過 wait_time
            if (sentinelMasterLooksSane(ri->master) &&
               sentinelRedisInstanceNoDownFor(ri,wait_time) &&
               mstime() - ri->role_reported_time > wait_time)
            {
                // 重新將例項設定為從伺服器
                int retval = sentinelSendSlaveOf(ri,
                        ri->master->addr->ip,
                        ri->master->addr->port);
                
                // 傳送事件
                if (retval == REDIS_OK)
                    sentinelEvent(REDIS_NOTICE,"+convert-to-slave",ri,"%@");
            }
        }
    }


    /* Handle slaves replicating to a different master address. */
    // 讓從伺服器重新複製回正確的主伺服器
    if ((ri->flags & SRI_SLAVE) &&
        role == SRI_SLAVE &&
        // 從伺服器現在的主伺服器地址和 Sentinel 儲存的資訊不一致
        (ri->slave_master_port != ri->master->addr->port ||
         strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
    {
        mstime_t wait_time = ri->master->failover_timeout;


        /* Make sure the master is sane before reconfiguring this instance
         * into a slave. */
        // 1) 檢查例項的主伺服器狀態是否正常
        // 2) 檢查例項在給定時間內是否進入過 SDOWN 或者 ODOWN 狀態
        // 3) 檢查例項身份變更的時長是否已經超過了指定時長
        // 如果是的話,執行程式碼。。。
        if (sentinelMasterLooksSane(ri->master) &&
            sentinelRedisInstanceNoDownFor(ri,wait_time) &&
            mstime() - ri->slave_conf_change_time > wait_time)
        {
            // 重新將例項指向原本的主伺服器
            int retval = sentinelSendSlaveOf(ri,
                    ri->master->addr->ip,
                    ri->master->addr->port);


            if (retval == REDIS_OK)
                sentinelEvent(REDIS_NOTICE,"+fix-slave-config",ri,"%@");
        }
    }


    /* Detect if the slave that is in the process of being reconfigured
     * changed state. */
    // Sentinel 監視的例項為從伺服器,並且已經向它傳送 SLAVEOF 命令
    if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
        (ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
    {
        /* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
        // 將 SENT 狀態改為 INPROG 狀態,表示同步正在進行
        if ((ri->flags & SRI_RECONF_SENT) &&
            ri->slave_master_host &&
            strcmp(ri->slave_master_host,
                    ri->master->promoted_slave->addr->ip) == 0 &&
            ri->slave_master_port == ri->master->promoted_slave->addr->port)
        {
            ri->flags &= ~SRI_RECONF_SENT;
            ri->flags |= SRI_RECONF_INPROG;
            sentinelEvent(REDIS_NOTICE,"+slave-reconf-inprog",ri,"%@");
        }


        /* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
        // 將 INPROG 狀態改為 DONE 狀態,表示同步已完成
        if ((ri->flags & SRI_RECONF_INPROG) &&
            ri->slave_master_link_status == SENTINEL_MASTER_LINK_STATUS_UP)
        {
            ri->flags &= ~SRI_RECONF_INPROG;
            ri->flags |= SRI_RECONF_DONE;
            sentinelEvent(REDIS_NOTICE,"+slave-reconf-done",ri,"%@");
        }
    }
}

config_epoch會通過hello頻道傳送給其他Sentinel。其他Sentinel發現config_epoch更新了,就會更新最新的master地址和config_epoch。這相當於Leader把當選訊息告知了其他Sentinel。

/* Process an hello message received via Pub/Sub