1. 程式人生 > >3. SOFAJRaft原始碼分析— 是如何進行選舉的?

3. SOFAJRaft原始碼分析— 是如何進行選舉的?

開篇

在上一篇文章當中,我們講解了NodeImpl在init方法裡面會初始化話的動作,選舉也是在這個方法裡面進行的,這篇文章來從這個方法裡詳細講一下選舉的過程。

由於我這裡介紹的是如何實現的,所以請大家先看一下原理:SOFAJRaft 選舉機制剖析 | SOFAJRaft 實現原理

文章比較長,我也慢慢的寫了半個月時間~

選舉過程分析

我在這裡只把有關選舉的程式碼列舉出來,其他的程式碼暫且忽略
NodeImpl#init

public boolean init(final NodeOptions opts) {
        ....
    // Init timers
    //設定投票計時器
    this.voteTimer = new RepeatedTimer("JRaft-VoteTimer", this.options.getElectionTimeoutMs()) {

        @Override
        protected void onTrigger() {
            //處理投票超時
            handleVoteTimeout();
        }

        @Override
        protected int adjustTimeout(final int timeoutMs) {
            //在一定範圍內返回一個隨機的時間戳
            return randomTimeout(timeoutMs);
        }
    };
    //設定預投票計時器
    //當leader在規定的一段時間內沒有與 Follower 艦船進行通訊時,
    // Follower 就可以認為leader已經不能正常擔任旗艦的職責,則 Follower 可以去嘗試接替leader的角色。
    // 這段通訊超時被稱為 Election Timeout
    //候選者在發起投票之前,先發起預投票
    this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) {

        @Override
        protected void onTrigger() {
            handleElectionTimeout();
        }

        @Override
        protected int adjustTimeout(final int timeoutMs) {
            //在一定範圍內返回一個隨機的時間戳
            //為了避免同時發起選舉而導致失敗
            return randomTimeout(timeoutMs);
        }
    };
    //leader下臺的計時器
    //定時檢查是否需要重新選舉leader
    this.stepDownTimer = new RepeatedTimer("JRaft-StepDownTimer", this.options.getElectionTimeoutMs() >> 1) {

        @Override
        protected void onTrigger() {
            handleStepDownTimeout();
        }
    };
        ....
    if (!this.conf.isEmpty()) {
        //新啟動的node需要重新選舉
        stepDown(this.currTerm, false, new Status());
    }
        ....
}

在這個init方法裡面會初始化三個計時器是和選舉有關的:

  • voteTimer:這個timer負責定期的檢查,如果當前的state的狀態是候選者(STATE_CANDIDATE),那麼就會發起選舉
  • electionTimer:在一定時間內如果leader沒有與 Follower 進行通訊時,Follower 就可以認為leader已經不能正常擔任leader的職責,那麼就會進行選舉,在選舉之前會先發起預投票,如果沒有得到半數以上節點的反饋,則候選者就會識趣的放棄參選。所以這個timer負責預投票
  • stepDownTimer:定時檢查是否需要重新選舉leader,如果當前的leader沒有獲得超過半數的Follower響應,那麼這個leader就應該下臺然後重新選舉。

RepeatedTimer的分析我已經寫好了:2. SOFAJRaft原始碼分析—JRaft的定時任務排程器是怎麼做的?

我們先跟著init方法的思路往下看,一般來說this.conf裡面裝的是整個叢集的節點資訊,是不會為空的,所以會呼叫stepDown,所以先從這個方法看起。

leader下臺

private void stepDown(final long term, final boolean wakeupCandidate, final Status status) {
    LOG.debug("Node {} stepDown, term={}, newTerm={}, wakeupCandidate={}.", getNodeId(), this.currTerm, term,
        wakeupCandidate);
    //校驗一下當前節點的狀態是否有異常,或正在關閉
    if (!this.state.isActive()) {
        return;
    }
    //如果是候選者,那麼停止選舉
    if (this.state == State.STATE_CANDIDATE) {
        //呼叫voteTimer的stop方法
        stopVoteTimer();
        //如果當前狀態是leader或TRANSFERRING
    } else if (this.state.compareTo(State.STATE_TRANSFERRING) <= 0) {
        //讓啟動的stepDownTimer停止運作
        stopStepDownTimer();
        //清空選票箱中的內容
        this.ballotBox.clearPendingTasks();
        // signal fsm leader stop immediately
        if (this.state == State.STATE_LEADER) {
            //傳送leader下臺的事件給其他Follower
            onLeaderStop(status);
        }
    }
    // reset leader_id
    //重置當前節點的leader
    resetLeaderId(PeerId.emptyPeer(), status);

    // soft state in memory
    this.state = State.STATE_FOLLOWER;
    //重置Configuration的上下文
    this.confCtx.reset();
    updateLastLeaderTimestamp(Utils.monotonicMs());
    if (this.snapshotExecutor != null) {
        //停止當前的快照生成
        this.snapshotExecutor.interruptDownloadingSnapshots(term);
    }

    //設定任期為大的那個
    // meta state
    if (term > this.currTerm) {
        this.currTerm = term;
        this.votedId = PeerId.emptyPeer();
        //重設元資料資訊儲存到檔案中
        this.metaStorage.setTermAndVotedFor(term, this.votedId);
    }

    if (wakeupCandidate) {
        this.wakingCandidate = this.replicatorGroup.stopAllAndFindTheNextCandidate(this.conf);
        if (this.wakingCandidate != null) {
            Replicator.sendTimeoutNowAndStop(this.wakingCandidate, this.options.getElectionTimeoutMs());
        }
    } else {
        //把replicatorGroup裡面的所有replicator標記為stop
        this.replicatorGroup.stopAll();
    }
    //leader轉移的時候會用到
    if (this.stopTransferArg != null) {
        if (this.transferTimer != null) {
            this.transferTimer.cancel(true);
        }
        // There is at most one StopTransferTimer at the same term, it's safe to
        // mark stopTransferArg to NULL
        this.stopTransferArg = null;
    }
    //啟動
    this.electionTimer.start();
}

一個leader的下臺需要做很多交接的工作:

  1. 如果當前的節點是個候選人(STATE_CANDIDATE),那麼這個時候會讓它暫時不要投票
  2. 如果當前的節點狀態是(STATE_TRANSFERRING)表示正在轉交leader或是leader(STATE_LEADER),那麼就需要把當前節點的stepDownTimer這個定時器給關閉
  3. 如果當前是leader(STATE_LEADER),那麼就需要告訴狀態機leader下臺了,可以在狀態機中對下臺的動作做處理
  4. 重置當前節點的leader,把當前節點的state狀態設定為Follower,重置confCtx上下文
  5. 停止當前的快照生成,設定新的任期,讓所有的複製節點停止工作
  6. 啟動electionTimer

呼叫stopVoteTimer和stopStepDownTimer方法裡面主要是呼叫相應的RepeatedTimer的stop方法,在stop方法裡面會將stopped狀態設定為ture,並將timeout設定為取消,並將這個timeout加入到cancelledTimeouts集合中去:
如果看了2. SOFAJRaft原始碼分析—JRaft的定時任務排程器是怎麼做的?這篇文章的話,那麼下面這段程式碼應該一看就明白是怎麼回事了的。

public void stop() {
    this.lock.lock();
    try {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        if (this.timeout != null) {
            this.timeout.cancel();
            this.running = false;
            this.timeout = null;
        }
    } finally {
        this.lock.unlock();
    }
}

狀態機處理LEADER_STOP事件

在呼叫NodeImpl的onLeaderStop方法中,實際上是呼叫了FSMCallerImpl的onLeaderStop方法
NodeImpl#onLeaderStop

private void onLeaderStop(final Status status) {
    this.replicatorGroup.clearFailureReplicators();
    this.fsmCaller.onLeaderStop(status);
}

FSMCallerImpl#onLeaderStop

public boolean onLeaderStop(final Status status) {
    return enqueueTask((task, sequence) -> {
          //設定當前task的狀態為LEADER_STOP
        task.type = TaskType.LEADER_STOP;
        task.status = new Status(status);
    });
}

private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
    if (this.shutdownLatch != null) {
        // Shutting down
        LOG.warn("FSMCaller is stopped, can not apply new task.");
        return false;
    }
    //使用Disruptor釋出事件
    this.taskQueue.publishEvent(tpl);
    return true;
}

這個方法裡像taskQueue佇列裡面釋出了一個LEADER_STOP事件,taskQueue是在FSMCallerImpl的init方法中被初始化的:

public boolean init(final FSMCallerOptions opts) {
    .....
    this.disruptor = DisruptorBuilder.<ApplyTask>newInstance() //
            .setEventFactory(new ApplyTaskFactory()) //
            .setRingBufferSize(opts.getDisruptorBufferSize()) //
            .setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
            .setProducerType(ProducerType.MULTI) //
            .setWaitStrategy(new BlockingWaitStrategy()) //
            .build();
    this.disruptor.handleEventsWith(new ApplyTaskHandler());
    this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
    this.taskQueue = this.disruptor.start();
    .....
}

在taskQueue中釋出了一個任務之後會交給ApplyTaskHandler進行處理

ApplyTaskHandler

private class ApplyTaskHandler implements EventHandler<ApplyTask> {
    // max committed index in current batch, reset to -1 every batch
    private long maxCommittedIndex = -1;

    @Override
    public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
        this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
    }
}

每當有任務到達taskQueue佇列的時候會呼叫ApplyTaskHandler的onEvent方法來處理事件,具體的執行邏輯由runApplyTask方法進行處理

FSMCallerImpl#runApplyTask

private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
    CountDownLatch shutdown = null;
    ...
     switch (task.type) {
         ...
        case LEADER_STOP:
            this.currTask = TaskType.LEADER_STOP;
            doLeaderStop(task.status);
            break;
        ...
    }
        ....
}

在runApplyTask方法裡會對很多的事件進行處理,我們這裡只看LEADER_STOP是怎麼做的:

在switch裡會呼叫doLeaderStop方法,這個方法會呼叫到FSMCallerImpl裡面封裝的StateMachine狀態機的onLeaderStart方法:

private void doLeaderStop(final Status status) {
    this.fsm.onLeaderStop(status);
}

這樣就可以對leader停止時進行客製化的處理了。

重置leader

接下來會呼叫resetLeaderId(PeerId.emptyPeer(), status);方法來重置leader

private void resetLeaderId(final PeerId newLeaderId, final Status status) {
    if (newLeaderId.isEmpty()) {
        //這個判斷表示如果當前節點是候選者或者是Follower,並且已經有leader了
        if (!this.leaderId.isEmpty() && this.state.compareTo(State.STATE_TRANSFERRING) > 0) {
            //向狀態機裝釋出停止跟隨該leader的事件
            this.fsmCaller.onStopFollowing(new LeaderChangeContext(this.leaderId.copy(), this.currTerm, status));
        }
        //把當前的leader設定為一個空值
        this.leaderId = PeerId.emptyPeer();
    } else {
        //如果當前節點沒有leader
        if (this.leaderId == null || this.leaderId.isEmpty()) {
            //那麼釋出要跟隨該leader的事件
            this.fsmCaller.onStartFollowing(new LeaderChangeContext(newLeaderId, this.currTerm, status));
        }
        this.leaderId = newLeaderId.copy();
    }
}

這個方法由兩個作用,如果傳入的newLeaderId不是個空的,那麼就會設定一個新的leader,並向狀態機發送一個START_FOLLOWING事件;如果傳入的newLeaderId是空的,那麼就會發送一個STOP_FOLLOWING事件,並把當前的leader置空。

啟動electionTimer,進行leader選舉

electionTimer是RepeatedTimer的實現類,在這裡我就不多說了,上一篇文章已經介紹過了。

我這裡來看看electionTimer的onTrigger方法是怎麼處理選舉事件的,electionTimer的onTrigger方法會呼叫NodeImpl的handleElectionTimeout方法,所以直接看這個方法:

NodeImpl#handleElectionTimeout

private void handleElectionTimeout() {
    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        if (this.state != State.STATE_FOLLOWER) {
            return;
        }
        //如果當前選舉沒有超時則說明此輪選舉有效
        if (isCurrentLeaderValid()) {
            return;
        }
        resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT, "Lost connection from leader %s.",
            this.leaderId));
        doUnlock = false;
        //預投票 (pre-vote) 環節
        //候選者在發起投票之前,先發起預投票,
        // 如果沒有得到半數以上節點的反饋,則候選者就會識趣的放棄參選
        preVote();
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
    }
}

在這個方法裡,首先會加上一個寫鎖,然後進行校驗,最後先發起一個預投票。

校驗的時候會校驗當前的狀態是不是follower,校驗leader和follower上次的通訊時間是不是超過了ElectionTimeoutMs,如果沒有超過,說明leader存活,沒必要發起選舉;如果通訊超時,那麼會將leader置空,然後呼叫預選舉。

NodeImpl#isCurrentLeaderValid

private boolean isCurrentLeaderValid() {
    return Utils.monotonicMs() - this.lastLeaderTimestamp < this.options.getElectionTimeoutMs();
}

用當前時間和上次leader通訊時間相減,如果小於ElectionTimeoutMs(預設1s),那麼就沒有超時,說明leader有效

預選票preVote

我們在handleElectionTimeout方法中最後呼叫了preVote方法,接下來重點看一下這個方法。

下面我將preVote拆分成幾部分來進行講解:
NodeImpl#preVote part1

private void preVote() {
    long oldTerm;
    try {
        LOG.info("Node {} term {} start preVote.", getNodeId(), this.currTerm);
        if (this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
            LOG.warn(
                "Node {} term {} doesn't do preVote when installing snapshot as the configuration may be out of date.",
                getNodeId());
            return;
        }
        //conf裡面記錄了叢集節點的資訊,如果當前的節點不包含在叢集裡說明是由問題的
        if (!this.conf.contains(this.serverId)) {
            LOG.warn("Node {} can't do preVote as it is not in conf <{}>.", getNodeId(), this.conf);
            return;
        }
        //設定一下當前的任期
        oldTerm = this.currTerm;
    } finally {
        this.writeLock.unlock();
    } 
      ....
}

這部分程式碼是一開始進到preVote這個方法首先要經過一些校驗,例如當前的節點不能再安裝快照的時候進行選舉;檢視一下當前的節點是不是在自己設定的conf裡面,conf這個屬性會包含了叢集的所有節點;最後設定一下當前的任期後解鎖。

NodeImpl#preVote part2

private void preVote() {
      ....
    //返回最新的log實體類
    final LogId lastLogId = this.logManager.getLastLogId(true);

    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        // pre_vote need defense ABA after unlock&writeLock
        //因為在上面沒有重新加鎖的間隙裡可能會被別的執行緒改變了,所以這裡校驗一下
        if (oldTerm != this.currTerm) {
            LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
            return;
        }
        //初始化預投票投票箱
        this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
        for (final PeerId peer : this.conf.listPeers()) {
            //如果遍歷的節點是當前節點就跳過
            if (peer.equals(this.serverId)) {
                continue;
            }
            //失聯的節點也跳過
            if (!this.rpcService.connect(peer.getEndpoint())) {
                LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                continue;
            }
            //設定一個回撥的類
            final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
            //向被遍歷到的這個節點發送一個預投票的請求
            done.request = RequestVoteRequest.newBuilder() //
                .setPreVote(true) // it's a pre-vote request.
                .setGroupId(this.groupId) //
                .setServerId(this.serverId.toString()) //
                .setPeerId(peer.toString()) //
                .setTerm(this.currTerm + 1) // next term,注意這裡傳送過去的任期會加一
                .setLastLogIndex(lastLogId.getIndex()) //
                .setLastLogTerm(lastLogId.getTerm()) //
                .build();
            this.rpcService.preVote(peer.getEndpoint(), done.request, done);
        }
        //自己也可以投給自己
        this.prevVoteCtx.grant(this.serverId);
        if (this.prevVoteCtx.isGranted()) {
            doUnlock = false;
            electSelf();
        }
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
    }
}

這部分程式碼:

  1. 首先會獲取最新的log資訊,由LogId封裝,裡面包含兩部分,一部分是這個日誌的index和寫入這個日誌所對應當時節點的一個term任期
  2. 初始化預投票投票箱
  3. 遍歷所有的叢集節點
  4. 如果遍歷的節點是當前節點就跳過,如果遍歷的節點因為宕機或者手動下線等原因連線不上也跳過
  5. 向遍歷的節點發送一個RequestVoteRequest請求預投票給自己
  6. 最後因為自己也是叢集節點的一員,所以自己也投票給自己

初始化預投票投票箱

初始化預投票箱是呼叫了Ballot的init方法進行初始化,分別傳入新的叢集節點資訊,和老的叢集節點資訊

public boolean init(Configuration conf, Configuration oldConf) {
    this.peers.clear();
    this.oldPeers.clear();
    quorum = oldQuorum = 0;
    int index = 0;
    //初始化新的節點
    if (conf != null) {
        for (PeerId peer : conf) {
            this.peers.add(new UnfoundPeerId(peer, index++, false));
        }
    }
    //設定需要多少票數才能成為leader
    this.quorum = this.peers.size() / 2 + 1;
    ....
    return true;
}

我這裡為了使邏輯更清晰,假設沒有oldConf,省略oldConf相關設定。
這個方法裡會遍歷所有的peer節點,並將peer封裝成UnfoundPeerId插入到peers集合中;然後設定quorum屬性,這個屬性會在每獲得一個投票就減1,當減到0以下時說明獲得了足夠多的票數,就代表預投票成功。

發起預投票請求

//設定一個回撥的類
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
//向被遍歷到的這個節點發送一個預投票的請求
done.request = RequestVoteRequest.newBuilder() //
    .setPreVote(true) // it's a pre-vote request.
    .setGroupId(this.groupId) //
    .setServerId(this.serverId.toString()) //
    .setPeerId(peer.toString()) //
    .setTerm(this.currTerm + 1) // next term,注意這裡傳送過去的任期會加一
    .setLastLogIndex(lastLogId.getIndex()) //
    .setLastLogTerm(lastLogId.getTerm()) //
    .build();
this.rpcService.preVote(peer.getEndpoint(), done.request, done);

在構造RequestVoteRequest的時候,會將PreVote屬性設定為true,表示這次請求是預投票;設定當前節點為ServerId;傳給對方的任期是當前節點的任期加一。最後在傳送成功收到響應之後會回撥OnPreVoteRpcDone的run方法。

OnPreVoteRpcDone#run

public void run(final Status status) {
    NodeImpl.this.metrics.recordLatency("pre-vote", Utils.monotonicMs() - this.startMs);
    if (!status.isOk()) {
        LOG.warn("Node {} PreVote to {} error: {}.", getNodeId(), this.peer, status);
    } else {
        handlePreVoteResponse(this.peer, this.term, getResponse());
    }
}

在這個方法中如果收到正常的響應,那麼會呼叫handlePreVoteResponse方法處理響應

OnPreVoteRpcDone#handlePreVoteResponse

public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        //只有follower才可以嘗試發起選舉
        if (this.state != State.STATE_FOLLOWER) {
            LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
                getNodeId(), peerId, this.state);
            return;
        }
        
        if (term != this.currTerm) {
            LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
                peerId, term, this.currTerm);
            return;
        }
        //如果返回的任期大於當前的任期,那麼這次請求也是無效的
        if (response.getTerm() > this.currTerm) {
            LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
                response.getTerm(), this.currTerm);
            stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                "Raft node receives higher term pre_vote_response."));
            return;
        }
        LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId,
            response.getTerm(), response.getGranted());
        // check granted quorum?
        if (response.getGranted()) {
            this.prevVoteCtx.grant(peerId);
            //得到了半數以上的響應
            if (this.prevVoteCtx.isGranted()) {
                doUnlock = false;
                //進行選舉
                electSelf();
            }
        }
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
    }
}

這裡做了3重校驗,我們分別來談談:

  1. 第一重試校驗了當前的狀態,如果不是FOLLOWER那麼就不能發起選舉。因為如果是leader節點,那麼它不會選舉,只能stepdown下臺,把自己變成FOLLOWER後重新選舉;如果是CANDIDATE,那麼只能進行由FOLLOWER發起的投票,所以從功能上來說,只能FOLLOWER發起選舉。
    從Raft 的設計上來說也只能由FOLLOWER來發起選舉,所以這裡進行了校驗。
  2. 第二重校驗主要是校驗傳送請求時的任期和接受到響應時的任期還是不是一個,如果不是那麼說明已經不是上次那輪的選舉了,是一次失效的選舉
  3. 第三重校驗是校驗響應返回的任期是不是大於當前的任期,如果大於當前的任期,那麼重置當前的leader

校驗完之後響應的節點會返回一個授權,如果授權通過的話則呼叫Ballot的grant方法,表示給當前的節點投一票

Ballot#grant

public void grant(PeerId peerId) {
    this.grant(peerId, new PosHint());
}

public PosHint grant(PeerId peerId, PosHint hint) {
    UnfoundPeerId peer = findPeer(peerId, peers, hint.pos0);
    if (peer != null) {
        if (!peer.found) {
            peer.found = true;
            this.quorum--;
        }
        hint.pos0 = peer.index;
    } else {
        hint.pos0 = -1;
    }
    .... 
    return hint;
}

grant方法會根據peerId去叢集集合裡面去找被封裝的UnfoundPeerId例項,然後判斷一下,如果沒有被記錄過,那麼就將quorum減一,表示收到一票,然後將found設定為ture表示已經找過了。

在查詢UnfoundPeerId例項的時候方法裡面做了一個很有趣的設定:
首先在存入到peers集合裡面的時候是這樣的:

int index = 0;
for (PeerId peer : conf) {
    this.peers.add(new UnfoundPeerId(peer, index++, false));
}

這裡會遍歷conf,然後會存入index,index從零開始。
然後在查詢的時候會傳入peerId和posHint還有peers集合:

private UnfoundPeerId findPeer(PeerId peerId, List<UnfoundPeerId> peers, int posHint) {
    if (posHint < 0 || posHint >= peers.size() || !peers.get(posHint).peerId.equals(peerId)) {
        for (UnfoundPeerId ufp : peers) {
            if (ufp.peerId.equals(peerId)) {
                return ufp;
            }
        }
        return null;
    }

    return peers.get(posHint);
}

這裡傳入的posHint預設是-1 ,即如果是第一次傳入,那麼會遍歷整個peers集合,然後一個個比對之後返回。

因為PosHint例項會在呼叫完之後將pos0設定為peer的index,如果grant方法不是第一次呼叫,那麼在呼叫findPeer方法的時候就可以直接通過get方法獲取,不用再遍歷整個集合了。

這種寫法也可以運用到平時的程式碼中去。

呼叫了grant方法之後會呼叫Ballot的isGranted判斷一下是否達到了半數以上的響應。
Ballot#isGranted

public boolean isGranted() {
    return this.quorum <= 0 && oldQuorum <= 0;
}

即判斷一下投票箱裡面的票數是不是被減到了0。如果返回是的話,那麼就呼叫electSelf進行選舉。

選舉的方法暫時先不看,我們來看看預選舉的請求是怎麼被處理的

響應RequestVoteRequest請求

RequestVoteRequest請求的處理器是在RaftRpcServerFactory的addRaftRequestProcessors方法中被安置的,具體的處理時RequestVoteRequestProcessor。

具體的處理方法是交由processRequest0方法來處理的。

RequestVoteRequestProcessor#processRequest0

public Message processRequest0(RaftServerService service, RequestVoteRequest request, RpcRequestClosure done) {
    //如果是預選舉
    if (request.getPreVote()) {
        return service.handlePreVoteRequest(request);
    } else {
        return service.handleRequestVoteRequest(request);
    }
}

因為這個處理器可以處理選舉和預選舉,所以加了個判斷。預選舉的方法交給NodeImpl的handlePreVoteRequest來具體實現的。

NodeImpl#handlePreVoteRequest

public Message handlePreVoteRequest(final RequestVoteRequest request) {
    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        //校驗這個節點是不是正常的節點
        if (!this.state.isActive()) {
            LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
                getNodeId(), this.state.name());
        }
        final PeerId candidateId = new PeerId();
        //傳送過來的request請求攜帶的ServerId格式不能錯
        if (!candidateId.parse(request.getServerId())) {
            LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
                request.getServerId());
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
                request.getServerId());
        }
        boolean granted = false;
        // noinspection ConstantConditions
        do {
            //已經有leader的情況
            if (this.leaderId != null && !this.leaderId.isEmpty() && isCurrentLeaderValid()) {
                LOG.info(
                    "Node {} ignore PreVoteRequest from {}, term={}, currTerm={}, because the leader {}'s lease is still valid.",
                    getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, this.leaderId);
                break;
            }
            //請求的任期小於當前的任期
            if (request.getTerm() < this.currTerm) {
                LOG.info("Node {} ignore PreVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                    request.getServerId(), request.getTerm(), this.currTerm);
                // A follower replicator may not be started when this node become leader, so we must check it.
                  //那麼這個節點也可能是leader,所以校驗一下請求的節點是不是複製節點,重新加入到replicatorGroup中
                checkReplicator(candidateId);
                break;
            } else if (request.getTerm() == this.currTerm + 1) {
                // A follower replicator may not be started when this node become leader, so we must check it.
                // check replicator state
                //因為請求的任期和當前的任期相等,那麼這個節點也可能是leader,
                 // 所以校驗一下請求的節點是不是複製節點,重新加入到replicatorGroup中
                checkReplicator(candidateId);
            }
            doUnlock = false;
            this.writeLock.unlock();
            //獲取最新的日誌
            final LogId lastLogId = this.logManager.getLastLogId(true);

            doUnlock = true;
            this.writeLock.lock();
            final LogId requestLastLogId = new LogId(request.getLastLogIndex(), request.getLastLogTerm());
            //比較當前節點的日誌完整度和請求節點的日誌完整度
            granted = requestLastLogId.compareTo(lastLogId) >= 0;

            LOG.info(
                "Node {} received PreVoteRequest from {}, term={}, currTerm={}, granted={}, requestLastLogId={}, lastLogId={}.",
                getNodeId(), request.getServerId(), request.getTerm(), this.currTerm, granted, requestLastLogId,
                lastLogId);
        } while (false);//這個while蠻有意思,為了用break想盡了辦法

        return RequestVoteResponse.newBuilder() //
            .setTerm(this.currTerm) //
            .setGranted(granted) //
            .build();
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
    }
}

這個方法裡面也是蠻有意思的,寫的很長,但是邏輯很清楚。

  1. 首先呼叫isActive,看一下當前節點是不是正常的節點,不是正常節點要返回Error資訊
  2. 將請求傳過來的ServerId解析到candidateId例項中
  3. 校驗當前的節點如果有leader,並且leader有效的,那麼就直接break,返回granted為false
  4. 如果當前的任期大於請求的任期,那麼呼叫checkReplicator檢查自己是不是leader,如果是leader,那麼將當前節點從failureReplicators移除,重新加入到replicatorMap中。然後直接break
  5. 請求任期和當前任期相等的情況也要校驗,只是不用break
  6. 如果請求的日誌比當前的最新的日誌還要新,那麼返回granted為true,代表授權成功

這裡有一個有意思的地方是,因為java中只能在迴圈中goto,所以這裡使用了do-while(false)只做單次的迴圈,這樣就可以do程式碼塊裡使用break了。

下面稍微看一下checkReplicator:
NodeImpl#checkReplicator

private void checkReplicator(final PeerId candidateId) {
    if (this.state == State.STATE_LEADER) {
        this.replicatorGroup.checkReplicator(candidateId, false);
    }
}

這裡判斷一下是不是leader,然後就會呼叫ReplicatorGroupImpl的checkReplicator

ReplicatorGroupImpl#checkReplicator

private final ConcurrentMap<PeerId, ThreadId> replicatorMap      = new ConcurrentHashMap<>();

private final Set<PeerId>                     failureReplicators = new ConcurrentHashSet<>();

public void checkReplicator(final PeerId peer, final boolean lockNode) {
    //根據傳入的peer獲取相應的ThreadId
    final ThreadId rid = this.replicatorMap.get(peer);
    // noinspection StatementWithEmptyBody
    if (rid == null) {
        // Create replicator if it's not found for leader.
        final NodeImpl node = this.commonOptions.getNode();
        if (lockNode) {
            node.writeLock.lock();
        }
        try {
            //如果當前的節點是leader,並且傳入的peer在failureReplicators中,那麼重新新增到replicatorMap
            if (node.isLeader() && this.failureReplicators.contains(peer) && addReplicator(peer)) {
                this.failureReplicators.remove(peer);
            }
        } finally {
            if (lockNode) {
                node.writeLock.unlock();
            }
        }
    } else { // NOPMD
        // Unblock it right now.
        // Replicator.unBlockAndSendNow(rid);
    }
}

checkReplicator會從replicatorMap根據傳入的peer例項校驗一下是不是為空。因為replicatorMap裡面存放了叢集所有的節點。然後通過ReplicatorGroupImpl的commonOptions獲取當前的Node例項,如果當前的node例項是leader,並且如果存在失敗集合failureReplicators中的話就重新新增進replicatorMap中。

ReplicatorGroupImpl#addReplicator

public boolean addReplicator(final PeerId peer) {
    //校驗當前的任期
    Requires.requireTrue(this.commonOptions.getTerm() != 0);
    //如果replicatorMap裡面已經有這個節點了,那麼將它從failureReplicators集合中移除
    if (this.replicatorMap.containsKey(peer)) {
        this.failureReplicators.remove(peer);
        return true;
    }
    //賦值一個新的ReplicatorOptions
    final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
    //新的ReplicatorOptions新增這個PeerId
    opts.setPeerId(peer);
    final ThreadId rid = Replicator.start(opts, this.raftOptions);
    if (rid == null) {
        LOG.error("Fail to start replicator to peer={}.", peer);
        this.failureReplicators.add(peer);
        return false;
    }
    return this.replicatorMap.put(peer, rid) == null;
}

addReplicator裡面主要是做了兩件事:1. 將要加入的節點從failureReplicators集合裡移除;2. 將要加入的節點放入到replicatorMap集合中去。

投票electSelf

private void electSelf() {
    long oldTerm;
    try {
        LOG.info("Node {} start vote and grant vote self, term={}.", getNodeId(), this.currTerm);
        //1. 如果當前節點不在叢集裡面則不進行選舉
        if (!this.conf.contains(this.serverId)) {
            LOG.warn("Node {} can't do electSelf as it is not in {}.", getNodeId(), this.conf);
            return;
        }
        //2. 大概是因為要進行正式選舉了,把預選舉關掉
        if (this.state == State.STATE_FOLLOWER) {
            LOG.debug("Node {} stop election timer, term={}.", getNodeId(), this.currTerm);
            this.electionTimer.stop();
        }
        //3. 清空leader
        resetLeaderId(PeerId.emptyPeer(), new Status(RaftError.ERAFTTIMEDOUT,
            "A follower's leader_id is reset to NULL as it begins to request_vote."));
        this.state = State.STATE_CANDIDATE;
        this.currTerm++;
        this.votedId = this.serverId.copy();
        LOG.debug("Node {} start vote timer, term={} .", getNodeId(), this.currTerm);
        //4. 開始發起投票定時器,因為可能投票失敗需要迴圈發起投票
        this.voteTimer.start();
        //5. 初始化投票箱
        this.voteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
        oldTerm = this.currTerm;
    } finally {
        this.writeLock.unlock();
    }

    final LogId lastLogId = this.logManager.getLastLogId(true);

    this.writeLock.lock();
    try {
        // vote need defense ABA after unlock&writeLock
        if (oldTerm != this.currTerm) {
            LOG.warn("Node {} raise term {} when getLastLogId.", getNodeId(), this.currTerm);
            return;
        }
        //6. 遍歷所有節點
        for (final PeerId peer : this.conf.listPeers()) {
            if (peer.equals(this.serverId)) {
                continue;
            }
            if (!this.rpcService.connect(peer.getEndpoint())) {
                LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
                continue;
            }
            final OnRequestVoteRpcDone done = new OnRequestVoteRpcDone(peer, this.currTerm, this);
            done.request = RequestVoteRequest.newBuilder() //
                .setPreVote(false) // It's not a pre-vote request.
                .setGroupId(this.groupId) //
                .setServerId(this.serverId.toString()) //
                .setPeerId(peer.toString()) //
                .setTerm(this.currTerm) //
                .setLastLogIndex(lastLogId.getIndex()) //
                .setLastLogTerm(lastLogId.getTerm()) //
                .build();
            this.rpcService.requestVote(peer.getEndpoint(), done.request, done);
        }

        this.metaStorage.setTermAndVotedFor(this.currTerm, this.serverId);
        this.voteCtx.grant(this.serverId);
        if (this.voteCtx.isGranted()) {
            //7. 投票成功,那麼就晉升為leader
            becomeLeader();
        }
    } finally {
        this.writeLock.unlock();
    }
}

不要看這個方法這麼長,其實都是和前面預選舉的方法preVote重複度很高的。方法太長,所以標了號,從上面號碼來一步步講解:

  1. 對當前的節點進行校驗,如果當前節點不在叢集裡面則不進行選舉
  2. 因為是Follower發起的選舉,所以大概是因為要進行正式選舉了,把預選舉定時器關掉
  3. 清空leader再進行選舉,注意這裡會把votedId設定為當前節點,代表自己參選
  4. 開始發起投票定時器,因為可能投票失敗需要迴圈發起投票,voteTimer裡面會根據當前的CANDIDATE狀態呼叫electSelf進行選舉
  5. 呼叫init方法初始化投票箱,這裡和prevVoteCtx是一樣的
  6. 遍歷所有節點,然後向其他叢集節點發送RequestVoteRequest請求,這裡也是和preVote一樣的,請求是被RequestVoteRequestProcessor處理器處理的。
  7. 如果有超過半數以上的節點投票選中,那麼就呼叫becomeLeader晉升為leader

我先來看看RequestVoteRequestProcessor怎麼處理的選舉:
在RequestVoteRequestProcessor的processRequest0會呼叫NodeImpl的handleRequestVoteRequest來處理具體的邏輯。

處理投票請求

NodeImpl#handleRequestVoteRequest

public Message handleRequestVoteRequest(final RequestVoteRequest request) {
    boolean doUnlock = true;
    this.writeLock.lock();
    try {
        //是否存活
        if (!this.state.isActive()) {
            LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
                getNodeId(), this.state.name());
        }
        final PeerId candidateId = new PeerId();
        if (!candidateId.parse(request.getServerId())) {
            LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
                request.getServerId());
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
                request.getServerId());
        }

        // noinspection ConstantConditions
        do {
            // check term
            if (request.getTerm() >= this.currTerm) {
                LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                        request.getServerId(), request.getTerm(), this.currTerm);
                //1. 如果請求的任期大於當前任期
                // increase current term, change state to follower
                if (request.getTerm() > this.currTerm) {
                    stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
                        "Raft node receives higher term RequestVoteRequest."));
                }
            } else {
                // ignore older term
                LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(),
                    request.getServerId(), request.getTerm(), this.currTerm);
                break;
            }
            doUnlock = false;
            this.writeLock.unlock();

            final LogId lastLogId = this.logManager.getLastLogId(true);

            doUnlock = true;
            this.writeLock.lock();
            // vote need ABA check after unlock&writeLock
            if (request.getTerm() != this.currTerm) {
                LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
                break;
            }
            //2. 判斷日誌完整度
            final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm())
                .compareTo(lastLogId) >= 0;
            //3. 判斷當前的節點是不是已經投過票了
            if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
                stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE,
                    "Raft node votes for some candidate, step down to restart election_timer."));
                this.votedId = candidateId.copy();
                this.metaStorage.setVotedFor(candidateId);
            }
        } while (false);

        return RequestVoteResponse.newBuilder() //
            .setTerm(this.currTerm) //
            //4.同意投票的條件是當前的任期和請求的任期一樣,並且已經將votedId設定為請求節點
            .setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
            .build();
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
    }
}

這個方法大致也和handlePreVoteRequest差不多。我這裡只分析一下我標註的。

  1. 這裡是判斷當前的任期是小於請求的任期的,並且呼叫stepDown將請求任期設定為當前的任期,將當前的狀態設定被Follower
  2. 作為一個leader來做日誌肯定是要比被請求的節點完整,所以這裡判斷一下日誌是不是比被請求的節點日誌完整
  3. 如果日誌是完整的,並且被請求的節點沒有投票給其他的候選人,那麼就將votedId設定為當前請求的節點
  4. 給請求傳送響應,同意投票的條件是當前的任期和請求的任期一樣,並且已經將votedId設定為請求節點

晉升leader

投票完畢之後如果收到的票數大於一半,那麼就會晉升為leader,呼叫becomeLeader方法。

private void becomeLeader() {
    Requires.requireTrue(this.state == State.STATE_CANDIDATE, "Illegal state: " + this.state);
    LOG.info("Node {} become leader of group, term={}, conf={}, oldConf={}.", getNodeId(), this.currTerm,
        this.conf.getConf(), this.conf.getOldConf());
    // cancel candidate vote timer
    //晉升leader之後就會把選舉的定時器關閉了
    stopVoteTimer();
    //設定當前的狀態為leader
    this.state = State.STATE_LEADER;
    this.leaderId = this.serverId.copy();
    //複製叢集中設定新的任期
    this.replicatorGroup.resetTerm(this.currTerm);
    //遍歷所有的叢集節點
    for (final PeerId peer : this.conf.listPeers()) {
        if (peer.equals(this.serverId)) {
            continue;
        }
        LOG.debug("Node {} add replicator, term={}, peer={}.", getNodeId(), this.currTerm, peer);
        //如果成為leader,那麼需要把自己的日誌資訊複製到其他節點
        if (!this.replicatorGroup.addReplicator(peer)) {
            LOG.error("Fail to add replicator, peer={}.", peer);
        }
    }
    // init commit manager
    this.ballotBox.resetPendingIndex(this.logManager.getLastLogIndex() + 1);
    // Register _conf_ctx to reject configuration changing before the first log
    // is committed.
    if (this.confCtx.isBusy()) {
        throw new IllegalStateException();
    }
    this.confCtx.flush(this.conf.getConf(), this.conf.getOldConf());
    //如果是leader了,那麼就要定時的檢查不是有資格勝任
    this.stepDownTimer.start();
}

這個方法裡面首先會停止選舉定時器,然後設定當前的狀態為leader,並設值任期,然後遍歷所有的節點將節點加入到複製叢集中,最後將stepDownTimer開啟,定時對leader進行校驗是不是又半數以上的節點響應當前的leader。

好了,到這裡就講完了,希望下次還可以see you aga