1. 程式人生 > >9. SOFAJRaft原始碼分析— Follower如何通過Snapshot快速追上Leader日誌?

9. SOFAJRaft原始碼分析— Follower如何通過Snapshot快速追上Leader日誌?

前言

引入快照機制主要是為了解決兩個問題:

  1. JRaft新節點加入後,如何快速追上最新的資料
  2. Raft 節點出現故障重新啟動後如何高效恢復到最新的資料

Snapshot 原始碼分析

生成 Raft 節點的快照檔案

如果使用者需開啟 SOFAJRaft 的 Snapshot 機制,則需要在其客戶端中設定配置引數類 NodeOptions 的“snapshotUri”屬性(即為:Snapshot 檔案的儲存路徑),配置該屬性後,預設會啟動一個定時器任務(“JRaft-SnapshotTimer”)自動去完成 Snapshot 操作,間隔時間通過配置類 NodeOptions 的“snapshotIntervalSecs”屬性指定,預設 3600 秒。定時任務啟動程式碼如下:

NodeImpl#init

this.snapshotTimer = new RepeatedTimer("JRaft-SnapshotTimer", this.options.getSnapshotIntervalSecs() * 1000) {

    @Override
    protected void onTrigger() {
        handleSnapshotTimeout();
    }
};

private void handleSnapshotTimeout() {
    this.writeLock.lock();
    try {
        if (!this.state.isActive()) {
            return;
        }
    } finally {
        this.writeLock.unlock();
    }
    // do_snapshot in another thread to avoid blocking the timer thread.
      //非同步呼叫doSnapshot
    Utils.runInThread(() -> doSnapshot(null));
}

private void doSnapshot(final Closure done) {
    if (this.snapshotExecutor != null) {
        this.snapshotExecutor.doSnapshot(done);
    } else {
        if (done != null) {
            final Status status = new Status(RaftError.EINVAL, "Snapshot is not supported");
            Utils.runClosureInThread(done, status);
        }
    }
}

最後這裡會呼叫快照執行器的doSnapshot方法,我們往下看。

SnapshotExecutorImpl#doSnapshot

public void doSnapshot(final Closure done) {
    boolean doUnlock = true;
    this.lock.lock();
    try {
        //正在停止
        if (this.stopped) {
            Utils.runClosureInThread(done, new Status(RaftError.EPERM, "Is stopped."));
            return;
        }
        //正在下載映象
        if (this.downloadingSnapshot.get() != null) {
            Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is loading another snapshot."));
            return;
        }
        //正在儲存映象
        if (this.savingSnapshot) {
            Utils.runClosureInThread(done, new Status(RaftError.EBUSY, "Is saving another snapshot."));
            return;
        }
        //當前業務狀態機已經提交的 Index 索引是否等於 Snapshot 最後儲存的日誌 Index 索引
        //如果兩個值相等則表示,業務資料沒有新增,無需再生成一次沒有意義的 Snapshot
        if (this.fsmCaller.getLastAppliedIndex() == this.lastSnapshotIndex) {
            // There might be false positive as the getLastAppliedIndex() is being
            // updated. But it's fine since we will do next snapshot saving in a
            // predictable time.
            doUnlock = false;

            this.lock.unlock();
            this.logManager.clearBufferedLogs();
            Utils.runClosureInThread(done);
            return;
        }
        //建立一個快照儲存器,用來寫資料
        final SnapshotWriter writer = this.snapshotStorage.create();
        if (writer == null) {
            Utils.runClosureInThread(done, new Status(RaftError.EIO, "Fail to create writer."));
            reportError(RaftError.EIO.getNumber(), "Fail to create snapshot writer.");
            return;
        }
        this.savingSnapshot = true;
        //封裝了回撥方法和快照儲存器
        final SaveSnapshotDone saveSnapshotDone = new SaveSnapshotDone(writer, done, null);
        //交給狀態機來儲存快照
        if (!this.fsmCaller.onSnapshotSave(saveSnapshotDone)) {
            Utils.runClosureInThread(done, new Status(RaftError.EHOSTDOWN, "The raft node is down."));
            return;
        }
        this.runningJobs.incrementAndGet();
    } finally {
        if (doUnlock) {
            this.lock.unlock();
        }
    }
}

doSnapshot方法首先會去進行幾個校驗,然後會呼叫狀態機的onSnapshotSave方法去儲存快照

FSMCallerImpl#onSnapshotSave

public boolean onSnapshotSave(final SaveSnapshotClosure done) {
    //釋出事件到ApplyTaskHandler中處理
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.SNAPSHOT_SAVE;
        task.done = done;
    });
}

狀態機的onSnapshotSave方法會將事件釋出到Disruptor中,交給ApplyTaskHandler處理。

最後會呼叫doSnapshotSave方法進行處理

private void doSnapshotSave(final SaveSnapshotClosure done) {
    Requires.requireNonNull(done, "SaveSnapshotClosure is null");
    //設定最新的任期和index到metaBuilder中
    final long lastAppliedIndex = this.lastAppliedIndex.get();
    final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
        .setLastIncludedIndex(lastAppliedIndex) //
        .setLastIncludedTerm(this.lastAppliedTerm);
    //設定當前配置到metaBuilder
    final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
    if (confEntry == null || confEntry.isEmpty()) {
        LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
        Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "Empty conf entry for lastAppliedIndex=%s",
            lastAppliedIndex));
        return;
    }
    for (final PeerId peer : confEntry.getConf()) {
        metaBuilder.addPeers(peer.toString());
    }
    if (confEntry.getOldConf() != null) {
        for (final PeerId peer : confEntry.getOldConf()) {
            metaBuilder.addOldPeers(peer.toString());
        }
    }
    //設定元資料到done例項中
    final SnapshotWriter writer = done.start(metaBuilder.build());
    if (writer == null) {
        done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
        return;
    }
    //呼叫狀態機的例項生成快照
    this.fsm.onSnapshotSave(writer, done);
}

這個方法會將配置引數全部都設定到metaBuilder中,然後呼叫狀態機例項onSnapshotSave方法,我們這裡可以看官方的例子Counter 計數器示例:https://www.sofastack.tech/projects/sofa-jraft/counter-example/ ,看看是怎麼使用的。

CounterStateMachine#onSnapshotSave

public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {
    final long currVal = this.value.get();
    //非同步將資料落盤
    Utils.runInThread(() -> {
        final CounterSnapshotFile snapshot = new CounterSnapshotFile(writer.getPath() + File.separator + "data");
        if (snapshot.save(currVal)) {
            if (writer.addFile("data")) {
                done.run(Status.OK());
            } else {
                done.run(new Status(RaftError.EIO, "Fail to add file to writer"));
            }
        } else {
            done.run(new Status(RaftError.EIO, "Fail to save counter snapshot %s", snapshot.getPath()));
        }
    });
}

這個方法會將資料獲取之後寫到檔案內,然後在儲存快照檔案後呼叫傳入的引數 closure.run(status) 通知呼叫者儲存成功或者失敗。

由於我們這裡傳入的回撥例項是SaveSnapshotDone例項,所以會呼叫SaveSnapshotDone的run方法中:
SaveSnapshotDone

public void run(final Status status) {
    Utils.runInThread(() -> continueRun(status));
}

void continueRun(final Status st) {
    //校驗index、設定index和任期,更新狀態為已儲存快照完畢
    final int ret = onSnapshotSaveDone(st, this.meta, this.writer);
    if (ret != 0 && st.isOk()) {
        st.setError(ret, "node call onSnapshotSaveDone failed");
    }
    if (this.done != null) {
        Utils.runClosureInThread(this.done, st);
    }
}

run方法會非同步的呼叫continueRun方法,然後呼叫到onSnapshotSaveDone,裡面校驗index、設定index和任期,更新狀態為已儲存快照完畢。

安裝快照

Jraft在傳送日誌到Follower的時候會判斷一下需要傳送快照,以便讓 Follower 快速跟上 Leader 的日誌進度,不再回放很早以前的日誌資訊,即緩解了網路的吞吐量,又提升了日誌同步的效率。

Replicator#sendEntries

private boolean sendEntries(final long nextSendingIndex) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    //填寫當前Replicator的配置資訊到rb中
    if (!fillCommonFields(rb, nextSendingIndex - 1, false)) {
        // unlock id in installSnapshot
        installSnapshot();
        return false;
    }
    ....//省略
}

這裡會呼叫installSnapshot傳送rpc請求給Follower

Replicator#installSnapshot

void installSnapshot() {
    //正在安裝快照
    if (this.state == State.Snapshot) {
        LOG.warn("Replicator {} is installing snapshot, ignore the new request.", this.options.getPeerId());
        this.id.unlock();
        return;
    }
    boolean doUnlock = true;
    try {
        Requires.requireTrue(this.reader == null,
            "Replicator %s already has a snapshot reader, current state is %s", this.options.getPeerId(),
            this.state);
        //初始化SnapshotReader
        this.reader = this.options.getSnapshotStorage().open();
        //如果快照儲存功能沒有開啟,則設定錯誤資訊並返回
        if (this.reader == null) {
            final NodeImpl node = this.options.getNode();
            final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
            error.setStatus(new Status(RaftError.EIO, "Fail to open snapshot"));
            this.id.unlock();
            doUnlock = false;
            node.onError(error);
            return;
        }
        //生成一個讀uri連線,給其他節點讀取快照
        final String uri = this.reader.generateURIForCopy();
        if (uri == null) {
            final NodeImpl node = this.options.getNode();
            final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
            error.setStatus(new Status(RaftError.EIO, "Fail to generate uri for snapshot reader"));
            releaseReader();
            this.id.unlock();
            doUnlock = false;
            node.onError(error);
            return;
        }
        //獲取從檔案載入的元資料資訊
        final RaftOutter.SnapshotMeta meta = this.reader.load();
        if (meta == null) {
            final String snapshotPath = this.reader.getPath();
            final NodeImpl node = this.options.getNode();
            final RaftException error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT);
            error.setStatus(new Status(RaftError.EIO, "Fail to load meta from %s", snapshotPath));
            releaseReader();
            this.id.unlock();
            doUnlock = false;
            node.onError(error);
            return;
        }
        //設定請求引數
        final InstallSnapshotRequest.Builder rb = InstallSnapshotRequest.newBuilder();
        rb.setTerm(this.options.getTerm());
        rb.setGroupId(this.options.getGroupId());
        rb.setServerId(this.options.getServerId().toString());
        rb.setPeerId(this.options.getPeerId().toString());
        rb.setMeta(meta);
        rb.setUri(uri);

        this.statInfo.runningState = RunningState.INSTALLING_SNAPSHOT;
        this.statInfo.lastLogIncluded = meta.getLastIncludedIndex();
        this.statInfo.lastTermIncluded = meta.getLastIncludedTerm();

        final InstallSnapshotRequest request = rb.build();
        this.state = State.Snapshot;
        // noinspection NonAtomicOperationOnVolatileField
        this.installSnapshotCounter++;
        final long monotonicSendTimeMs = Utils.monotonicMs();
        final int stateVersion = this.version;
        final int seq = getAndIncrementReqSeq();
        //發起InstallSnapshotRequest請求
        final Future<Message> rpcFuture = this.rpcService.installSnapshot(this.options.getPeerId().getEndpoint(),
            request, new RpcResponseClosureAdapter<InstallSnapshotResponse>() {

                @Override
                public void run(final Status status) {
                    onRpcReturned(Replicator.this.id, RequestType.Snapshot, status, request, getResponse(), seq,
                        stateVersion, monotonicSendTimeMs);
                }
            });
        addInflight(RequestType.Snapshot, this.nextIndex, 0, 0, seq, rpcFuture);
    } finally {
        if (doUnlock) {
            this.id.unlock();
        }
    }
}

在傳送InstallSnapshotRequest請求之前,先會做幾個校驗:

  1. 校驗使用者是否設定配置引數類 NodeOptions 的“snapshotUri”屬性,如果沒有設定就不會開啟快照,返回reader就為空
  2. 是否可以返回一個獲取快照的uri
  3. 能否從獲取從檔案載入的元資料資訊
    如果上面的校驗都通過的話,那麼就會發送一個InstallSnapshotRequest請求到Follower,交給InstallSnapshotRequestProcessor處理器處理,最後會跳轉到NodeImpl的handleInstallSnapshot方法執行具體邏輯。

NodeImpl#handleInstallSnapshot

public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) {
    // 如果快照安裝執行器不存在,則丟擲異常不支援快照操作
    if (this.snapshotExecutor == null) {
        return RpcResponseFactory.newResponse(RaftError.EINVAL, "Not supported snapshot");
    }
    // 根據請求攜帶的 serverId 序列化 PeerId
    final PeerId serverId = new PeerId();
    if (!serverId.parse(request.getServerId())) {
        LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(),
         request.getServerId());
        return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s", request.getServerId());
    }

    this.writeLock.lock();
    try {
        // 判斷當前節點的狀態
        if (!this.state.isActive()) {
            LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(),
                    this.state);
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s:%s is not in active state, state %s.",
                    this.groupId, this.serverId, this.state.name());
        }
        // 判斷 request 攜帶的 term 比當前節點的 trem,比較 term 的合法性
        if (request.getTerm() < this.currTerm) {
            LOG.warn("Node {} ignore stale InstallSnapshotRequest from {}, term={}, currTerm={}.", getNodeId(),
                    request.getPeerId(), request.getTerm(), this.currTerm);
            return InstallSnapshotResponse.newBuilder() //
                    .setTerm(this.currTerm) //
                    .setSuccess(false) //
                    .build();
        }
        //判斷當前節點leader的合法性
        checkStepDown(request.getTerm(), serverId);

        if (!serverId.equals(this.leaderId)) {
            LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
                    serverId, this.currTerm, this.leaderId);
            // Increase the term by 1 and make both leaders step down to minimize the
            // loss of split brain
            stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT,
                    "More than one leader in the same term."));
            return InstallSnapshotResponse.newBuilder() //
                    .setTerm(request.getTerm() + 1) //
                    .setSuccess(false) //
                    .build();
        }

    } finally {
        this.writeLock.unlock();
    }
    final long startMs = Utils.monotonicMs();
    try {
        if (LOG.isInfoEnabled()) {
            LOG.info(
                    "Node {} received InstallSnapshotRequest from {}, lastIncludedLogIndex={}, " +
                     "lastIncludedLogTerm={}, lastLogId={}.",
                    getNodeId(), request.getServerId(), request.getMeta().getLastIncludedIndex(), request.getMeta()
                            .getLastIncludedTerm(), this.logManager.getLastLogId(false));
        }
        // 執行快照安裝
        this.snapshotExecutor.installSnapshot(request, InstallSnapshotResponse.newBuilder(), done);
        return null;
    } finally {
        this.metrics.recordLatency("install-snapshot", Utils.monotonicMs() - startMs);
    }
}

這個方法進過一系列的校驗後會呼叫快照執行器的installSnapshot執行快照安裝

SnapshotExecutorImpl#installSnapshot

public void installSnapshot(final InstallSnapshotRequest request, final InstallSnapshotResponse.Builder response,
                            final RpcRequestClosure done) {
    final SnapshotMeta meta = request.getMeta();
    // 建立一個下載快照的任務物件
    final DownloadingSnapshot ds = new DownloadingSnapshot(request, response, done);
    //DON'T access request, response, and done after this point
    //as the retry snapshot will replace this one.
    // 將下載快照任務進行註冊
    if (!registerDownloadingSnapshot(ds)) {
        LOG.warn("Fail to register downloading snapshot");
        // This RPC will be responded by the previous session
        return;
    }
    Requires.requireNonNull(this.curCopier, "curCopier");
    try {
        // 阻塞等待 copy 任務完成
        this.curCopier.join();
    } catch (final InterruptedException e) {
        // 中斷補償,如果 curCopier 任務被中斷過,表明有更新的 snapshot 在接受了,舊的 snapshot 被停止下載
        Thread.currentThread().interrupt();
        LOG.warn("Install snapshot copy job was canceled.");
        return;
    }
    // 裝載下載好的 snapshot 檔案
    loadDownloadingSnapshot(ds, meta);
}

這個方法會呼叫registerDownloadingSnapshot方法將快照進行下載註冊,然後呼叫join方法阻塞直到下載完成,然後呼叫loadDownloadingSnapshot方法裝載下載好的檔案

SnapshotExecutorImpl#loadDownloadingSnapshot

void loadDownloadingSnapshot(final DownloadingSnapshot ds, final SnapshotMeta meta) {
    SnapshotReader reader;
    this.lock.lock();
    try {
        // 獲取快照任務的結果,如果不相等則表示新的 snapshot 在接收
        if (ds != this.downloadingSnapshot.get()) {
            //It is interrupted and response by other request,just return
            return;
        }
        Requires.requireNonNull(this.curCopier, "curCopier");
        reader = this.curCopier.getReader();
        //校驗複製機狀態是否正常
        if (!this.curCopier.isOk()) {
            if (this.curCopier.getCode() == RaftError.EIO.getNumber()) {
                reportError(this.curCopier.getCode(), this.curCopier.getErrorMsg());
            }
            Utils.closeQuietly(reader);
            ds.done.run(this.curCopier);
            Utils.closeQuietly(this.curCopier);
            this.curCopier = null;
            this.downloadingSnapshot.set(null);
            this.runningJobs.countDown();
            return;
        }
        Utils.closeQuietly(this.curCopier);
        this.curCopier = null;
        //校驗reader狀態是否正常
        if (reader == null || !reader.isOk()) {
            Utils.closeQuietly(reader);
            this.downloadingSnapshot.set(null);
            ds.done.sendResponse(RpcResponseFactory.newResponse(RaftError.EINTERNAL,
                "Fail to copy snapshot from %s", ds.request.getUri()));
            this.runningJobs.countDown();
            return;
        }
        this.loadingSnapshot = true;
        this.loadingSnapshotMeta = meta;
    } finally {
        this.lock.unlock();
    }
    // 下載 snapshot 成功,進入狀態機進行 snapshot 安裝
    final InstallSnapshotDone installSnapshotDone = new InstallSnapshotDone(reader);
    // 送入狀態機執行快照安裝事件
    if (!this.fsmCaller.onSnapshotLoad(installSnapshotDone)) {
        LOG.warn("Fail to  call fsm onSnapshotLoad");
        installSnapshotDone.run(new Status(RaftError.EHOSTDOWN, "This raft node is down"));
    }
}

在進行各種校驗之後會呼叫到狀態機的onSnapshotLoad方法,執行快照安裝

FSMCallerImpl#onSnapshotLoad

public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
    return enqueueTask((task, sequence) -> {
        task.type = TaskType.SNAPSHOT_LOAD;
        task.done = done;
    });
}

onSnapshotLoad方法會發送一個狀態為TaskType.SNAPSHOT_LOAD任務到Disruptor佇列中,然後會ApplyTaskHandler中處理,最後呼叫到doSnapshotLoad方法中進行處理。

FSMCallerImpl#doSnapshotLoad

private void doSnapshotLoad(final LoadSnapshotClosure done) {
     ....//省略
    if (!this.fsm.onSnapshotLoad(reader)) {
        done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
        final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE,
            RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
        setError(e);
        return;
    }
     ....//省略
    done.run(Status.OK());
}

doSnapshotLoad方法最後呼叫到狀態機的實現的onSnapshotLoad方法上,我們這裡以CounterStateMachine為例:

CounterStateMachine#onSnapshotLoad

public boolean onSnapshotLoad(final SnapshotReader reader) {
    if (isLeader()) {
        LOG.warn("Leader is not supposed to load snapshot");
        return false;
    }
    if (reader.getFileMeta("data") == null) {
        LOG.error("Fail to find data file in {}", reader.getPath());
        return false;
    }
    final CounterSnapshotFile snapshot = new CounterSnapshotFile(reader.getPath() + File.separator + "data");
    try {
        this.value.set(snapshot.load());
        return true;
    } catch (final IOException e) {
        LOG.error("Fail to load snapshot from {}", snapshot.getPath());
        return false;
    }

}

onSnapshotLoad方法會將檔案內容加載出來然後將值設定到value中,這就表示資料載入完畢了