1. 程式人生 > >8. SOFAJRaft原始碼分析— JRaft是如何實現日誌複製的?

8. SOFAJRaft原始碼分析— JRaft是如何實現日誌複製的?

前言

前幾天和騰訊的大佬一起吃飯聊天,說起我對SOFAJRaft的理解,我自然以為我是很懂了的,但是大佬問起了我那SOFAJRaft叢集之間的日誌是怎麼複製的?
我當時啞口無言,說不出是怎麼實現的,所以這次來分析一下SOFAJRaft中日誌複製是怎麼做的。

Leader傳送探針獲取Follower的LastLogIndex

Leader 節點在通過 Replicator 和 Follower 建立連線之後,要傳送一個 Probe 型別的探針請求,目的是知道 Follower 已經擁有的的日誌位置,以便於向 Follower 傳送後續的日誌。

大致的流程如下:

NodeImpl#becomeLeader->replicatorGroup#addReplicator->Replicator#start->Replicator#sendEmptyEntries

最後會通過呼叫Replicator的sendEmptyEntries方法來發送探針來獲取Follower的LastLogIndex

Replicator#sendEmptyEntries

private void sendEmptyEntries(final boolean isHeartbeat,
                              final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    //將叢集配置設定到rb中,例如Term,GroupId,ServerId等
    if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
        // id is unlock in installSnapshot
        installSnapshot();
        if (isHeartbeat && heartBeatClosure != null) {
            Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
                "Fail to send heartbeat to peer %s", this.options.getPeerId()));
        }
        return;
    }
    try {
        final long monotonicSendTimeMs = Utils.monotonicMs();
        final AppendEntriesRequest request = rb.build();

        if (isHeartbeat) {
            ....//省略心跳程式碼
        } else {
            //statInfo這個類沒看到哪裡有用到,
            // Sending a probe request.
            //leader傳送探針獲取Follower的LastLogIndex
            this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
            //將lastLogIndex設定為比firstLogIndex小1
            this.statInfo.firstLogIndex = this.nextIndex;
            this.statInfo.lastLogIndex = this.nextIndex - 1;
            this.appendEntriesCounter++;
            //設定當前Replicator為傳送探針
            this.state = State.Probe;
            final int stateVersion = this.version;
            //返回reqSeq,並將reqSeq加一
            final int seq = getAndIncrementReqSeq();
            final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
                request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

                    @Override
                    public void run(final Status status) {
                        onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request,
                            getResponse(), seq, stateVersion, monotonicSendTimeMs);
                    }

                });
            //Inflight 是對批量傳送出去的 logEntry 的一種抽象,他表示哪些 logEntry 已經被封裝成日誌複製 request 傳送出去了
            //這裡是將logEntry封裝到Inflight中
            addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);
        }
        LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}", this.options.getNode()
            .getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex());
    } finally {
        this.id.unlock();
    }
}

在呼叫sendEmptyEntries方法的時候,會傳入isHeartbeat為false和heartBeatClosure為null,因為我們這個方法主要是傳送探針獲取Follower的位移。
首先呼叫fillCommonFields方法,將任期,groupId,ServerId,PeerIdLogIndex等設定到rb中,如:

private boolean fillCommonFields(final AppendEntriesRequest.Builder rb, long prevLogIndex, final boolean isHeartbeat) {
    final long prevLogTerm = this.options.getLogManager().getTerm(prevLogIndex);
    ....
    rb.setTerm(this.options.getTerm());
    rb.setGroupId(this.options.getGroupId());
    rb.setServerId(this.options.getServerId().toString());
    rb.setPeerId(this.options.getPeerId().toString());
    rb.setPrevLogIndex(prevLogIndex);
    rb.setPrevLogTerm(prevLogTerm);
    rb.setCommittedIndex(this.options.getBallotBox().getLastCommittedIndex());
    return true;
}

注意prevLogIndex是nextIndex-1,表示當前的index
繼續往下走,會設定statInfo例項裡面的屬性,但是statInfo這個物件我沒看到哪裡有用到過。
然後向該Follower傳送一個AppendEntriesRequest請求,onRpcReturned負責響應請求。
傳送完請求後呼叫addInflight初始化一個Inflight例項,加入到inflights集合中,如下:

private void addInflight(final RequestType reqType, final long startIndex, final int count, final int size,
                         final int seq, final Future<Message> rpcInfly) {
    this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
    this.inflights.add(this.rpcInFly);
    this.nodeMetrics.recordSize("replicate-inflights-count", this.inflights.size());
}

Inflight 是對批量傳送出去的 logEntry 的一種抽象,他表示哪些 logEntry 已經被封裝成日誌複製 request 傳送出去了,這裡是將logEntry封裝到Inflight中。

Leader批量的傳送日誌給Follower

Replicator#sendEntries

private boolean sendEntries(final long nextSendingIndex) {
    final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
    //填寫當前Replicator的配置資訊到rb中
    if (!fillCommonFields(rb, nextSendingIndex - 1, false)) {
        // unlock id in installSnapshot
        installSnapshot();
        return false;
    }

    ByteBufferCollector dataBuf = null;
    //獲取最大的size為1024
    final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();

    //這裡使用了類似物件池的技術,避免重複建立物件
    final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
    try {
        //迴圈遍歷出所有的logEntry封裝到byteBufList和emb中
        for (int i = 0; i < maxEntriesSize; i++) {
            final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
            //nextSendingIndex代表下一個要傳送的index,i代表偏移量
            if (!prepareEntry(nextSendingIndex, i, emb, byteBufList)) {
                break;
            }
            rb.addEntries(emb.build());
        }
        //如果EntriesCount為0的話,說明LogManager裡暫時沒有新資料
        if (rb.getEntriesCount() == 0) {
            if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
                installSnapshot();
                return false;
            }
            // _id is unlock in _wait_more
            waitMoreEntries(nextSendingIndex);
            return false;
        }
        //將byteBufList裡面的資料放入到rb中
        if (byteBufList.getCapacity() > 0) {
            dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
            for (final ByteBuffer b : byteBufList) {
                dataBuf.put(b);
            }
            final ByteBuffer buf = dataBuf.getBuffer();
            buf.flip();
            rb.setData(ZeroByteStringHelper.wrap(buf));
        }
    } finally {
        //回收一下byteBufList
        RecycleUtil.recycle(byteBufList);
    }

    final AppendEntriesRequest request = rb.build();
    if (LOG.isDebugEnabled()) {
        LOG.debug(
            "Node {} send AppendEntriesRequest to {} term {} lastCommittedIndex {} prevLogIndex {} prevLogTerm {} logIndex {} count {}",
            this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(),
            request.getCommittedIndex(), request.getPrevLogIndex(), request.getPrevLogTerm(), nextSendingIndex,
            request.getEntriesCount());
    }
    //statInfo沒找到哪裡有用到過
    this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
    this.statInfo.firstLogIndex = rb.getPrevLogIndex() + 1;
    this.statInfo.lastLogIndex = rb.getPrevLogIndex() + rb.getEntriesCount();

    final Recyclable recyclable = dataBuf;
    final int v = this.version;
    final long monotonicSendTimeMs = Utils.monotonicMs();
    final int seq = getAndIncrementReqSeq();
    final Future<Message> rpcFuture = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(),
        request, -1, new RpcResponseClosureAdapter<AppendEntriesResponse>() {

            @Override
            public void run(final Status status) {
                //回收資源
                RecycleUtil.recycle(recyclable);
                onRpcReturned(Replicator.this.id, RequestType.AppendEntries, status, request, getResponse(), seq,
                    v, monotonicSendTimeMs);
            }

        });
    //新增Inflight
    addInflight(RequestType.AppendEntries, nextSendingIndex, request.getEntriesCount(), request.getData().size(),
        seq, rpcFuture);
    return true;

}
  1. 首先會呼叫fillCommonFields方法,填寫當前Replicator的配置資訊到rb中;
  2. 呼叫prepareEntry,根據當前的I和nextSendingIndex計算出當前的偏移量,然後去LogManager找到對應的LogEntry,再把LogEntry裡面的屬性設定到emb中,並把LogEntry裡面的資料加入到RecyclableByteBufferList中;
  3. 如果LogEntry裡面沒有新的資料,那麼EntriesCount會為0,那麼就返回;
  4. 遍歷byteBufList裡面的資料,將資料新增到rb中,這樣rb裡面的資料就是前面是任期、型別、資料長度等資訊,rb後面就是真正的資料;
  5. 新建AppendEntriesRequest例項傳送請求;
  6. 新增 Inflight 到佇列中。Leader 維護一個 queue,每發出一批 logEntry 就向 queue 中 新增一個代表這一批 logEntry 的 Inflight,這樣當它知道某一批 logEntry 複製失敗之後,就可以依賴 queue 中的 Inflight 把該批次 logEntry 以及後續的所有日誌重新複製給 follower。既保證日誌複製能夠完成,又保證了複製日誌的順序不變

其中RecyclableByteBufferList採用物件池進行例項化,物件池的相關資訊可以看我這篇:7. SOFAJRaft原始碼分析—如何實現一個輕量級的物件池?

下面我們詳解一下sendEntries裡面的具體方法。

prepareEntry填充emb屬性

Replicator#prepareEntry

boolean prepareEntry(final long nextSendingIndex, final int offset, final RaftOutter.EntryMeta.Builder emb,
                     final RecyclableByteBufferList dateBuffer) {
    if (dateBuffer.getCapacity() >= this.raftOptions.getMaxBodySize()) {
        return false;
    }
    //設定當前要傳送的index
    final long logIndex = nextSendingIndex + offset;
    //如果這個index已經在LogManager中找不到了,那麼直接返回
    final LogEntry entry = this.options.getLogManager().getEntry(logIndex);
    if (entry == null) {
        return false;
    }
    //下面就是把LogEntry裡面的屬性設定到emb中
    emb.setTerm(entry.getId().getTerm());
    if (entry.hasChecksum()) {
        emb.setChecksum(entry.getChecksum()); //since 1.2.6
    }
    emb.setType(entry.getType());
    if (entry.getPeers() != null) {
        Requires.requireTrue(!entry.getPeers().isEmpty(), "Empty peers at logIndex=%d", logIndex);
        for (final PeerId peer : entry.getPeers()) {
            emb.addPeers(peer.toString());
        }
        if (entry.getOldPeers() != null) {
            for (final PeerId peer : entry.getOldPeers()) {
                emb.addOldPeers(peer.toString());
            }
        }
    } else {
        Requires.requireTrue(entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION,
            "Empty peers but is ENTRY_TYPE_CONFIGURATION type at logIndex=%d", logIndex);
    }
    final int remaining = entry.getData() != null ? entry.getData().remaining() : 0;
    emb.setDataLen(remaining);
    //把LogEntry裡面的資料放入到dateBuffer中
    if (entry.getData() != null) {
        // should slice entry data
        dateBuffer.add(entry.getData().slice());
    }
    return true;
}
  1. 對比一下傳入的dateBuffer的容量是否已經超過了系統設定的容量(512 * 1024),如果超過了則返回false
  2. 根據給定的起始的index和偏移量offset計算logIndex,然後去LogManager裡面根據index獲取LogEntry,如果返回的為則說明找不到了,那麼就直接返回false,外層的if判斷會執行break跳出迴圈
  3. 然後將LogEntry裡面的屬性設定到emb物件中,最後將LogEntry裡面的資料新增到dateBuffer,這裡要做到資料和屬性分離

Follower處理Leader傳送的日誌複製請求

在leader傳送完AppendEntriesRequest請求之後,請求的資料會在Follower中被AppendEntriesRequestProcessor所處理

具體的處理方法是processRequest0

public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
                               final RpcRequestClosure done) {

    final Node node = (Node) service;

    //預設使用pipeline
    if (node.getRaftOptions().isReplicatorPipeline()) {
        final String groupId = request.getGroupId();
        final String peerId = request.getPeerId();
        //獲取請求的次數,以groupId+peerId為一個維度
        final int reqSequence = getAndIncrementSequence(groupId, peerId, done.getBizContext().getConnection());
        //Follower處理leader發過來的日誌請求
        final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
            reqSequence, groupId, peerId));
        //正常的資料只返回null,異常的資料會返回response
        if (response != null) {
            sendSequenceResponse(groupId, peerId, reqSequence, done.getAsyncContext(), done.getBizContext(),
                response);
        }
        return null;
    } else {
        return service.handleAppendEntriesRequest(request, done);
    }
}

呼叫service的handleAppendEntriesRequest會呼叫到NodeIml的handleAppendEntriesRequest方法中,handleAppendEntriesRequest方法只是異常情況和leader沒有傳送資料時才會返回,正常情況是返回null

處理響應日誌複製請求

NodeIml#handleAppendEntriesRequest

public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
    boolean doUnlock = true;
    final long startMs = Utils.monotonicMs();
    this.writeLock.lock();
    //獲取entryLog個數
    final int entriesCount = request.getEntriesCount();
    try {
        //校驗當前節點是否活躍
        if (!this.state.isActive()) {
            LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
                getNodeId(), this.state.name());
        }
        //校驗傳入的serverId是否能被正常解析
        final PeerId serverId = new PeerId();
        if (!serverId.parse(request.getServerId())) {
            LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
                request.getServerId());
            return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s.",
                request.getServerId());
        }
        //校驗任期
        // Check stale term
        if (request.getTerm() < this.currTerm) {
            LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.", getNodeId(),
                request.getServerId(), request.getTerm(), this.currTerm);
            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(this.currTerm) //
                .build();
        }

        // Check term and state to step down
        //當前節點如果不是Follower節點的話要執行StepDown操作
        checkStepDown(request.getTerm(), serverId);
        //這說明請求的節點不是當前節點的leader
        if (!serverId.equals(this.leaderId)) {
            LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
                serverId, this.currTerm, this.leaderId);
            // Increase the term by 1 and make both leaders step down to minimize the
            // loss of split brain
            stepDown(request.getTerm() + 1, false, new Status(RaftError.ELEADERCONFLICT,
                "More than one leader in the same term."));
            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(request.getTerm() + 1) //
                .build();
        }

        updateLastLeaderTimestamp(Utils.monotonicMs());

        //校驗是否正在生成快照
        if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
            LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
            return RpcResponseFactory.newResponse(RaftError.EBUSY, "Node %s:%s is installing snapshot.",
                this.groupId, this.serverId);
        }
        //傳入的是發起請求節點的nextIndex-1
        final long prevLogIndex = request.getPrevLogIndex();
        final long prevLogTerm = request.getPrevLogTerm();
        final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
        //發起請求的節點prevLogIndex對應的任期和當前節點的index所對應的任期不匹配
        if (localPrevLogTerm != prevLogTerm) {
            final long lastLogIndex = this.logManager.getLastLogIndex();

            LOG.warn(
                "Node {} reject term_unmatched AppendEntriesRequest from {}, term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
                getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm,
                lastLogIndex, entriesCount);

            return AppendEntriesResponse.newBuilder() //
                .setSuccess(false) //
                .setTerm(this.currTerm) //
                .setLastLogIndex(lastLogIndex) //
                .build();
        }
        //響應心跳或者傳送的是sendEmptyEntry
        if (entriesCount == 0) {
            // heartbeat
            final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
                .setSuccess(true) //
                .setTerm(this.currTerm)
                //  返回當前節點的最新的index
                .setLastLogIndex(this.logManager.getLastLogIndex());
            doUnlock = false;
            this.writeLock.unlock();
            // see the comments at FollowerStableClosure#run()
            this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
            return respBuilder.build();
        }

        // Parse request
        long index = prevLogIndex;
        final List<LogEntry> entries = new ArrayList<>(entriesCount);
        ByteBuffer allData = null;
        if (request.hasData()) {
            allData = request.getData().asReadOnlyByteBuffer();
        }
        //獲取所有資料
        final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
        for (int i = 0; i < entriesCount; i++) {
            final RaftOutter.EntryMeta entry = entriesList.get(i);
            index++;
            if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) {
                //給logEntry屬性設值
                final LogEntry logEntry = new LogEntry();
                logEntry.setId(new LogId(index, entry.getTerm()));
                logEntry.setType(entry.getType());
                if (entry.hasChecksum()) {
                    logEntry.setChecksum(entry.getChecksum()); // since 1.2.6
                }
                //將資料填充到logEntry
                final long dataLen = entry.getDataLen();
                if (dataLen > 0) {
                    final byte[] bs = new byte[(int) dataLen];
                    assert allData != null;
                    allData.get(bs, 0, bs.length);
                    logEntry.setData(ByteBuffer.wrap(bs));
                }

                if (entry.getPeersCount() > 0) {
                    //只有配置型別的entry才有多個Peer
                    if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                        throw new IllegalStateException(
                                "Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: "
                                        + entry.getType());
                    }

                    final List<PeerId> peers = new ArrayList<>(entry.getPeersCount());
                    for (final String peerStr : entry.getPeersList()) {
                        final PeerId peer = new PeerId();
                        peer.parse(peerStr);
                        peers.add(peer);
                    }
                    logEntry.setPeers(peers);

                    if (entry.getOldPeersCount() > 0) {
                        final List<PeerId> oldPeers = new ArrayList<>(entry.getOldPeersCount());
                        for (final String peerStr : entry.getOldPeersList()) {
                            final PeerId peer = new PeerId();
                            peer.parse(peerStr);
                            oldPeers.add(peer);
                        }
                        logEntry.setOldPeers(oldPeers);
                    }
                } else if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
                    throw new IllegalStateException(
                            "Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type");
                }

                // Validate checksum
                if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
                    long realChecksum = logEntry.checksum();
                    LOG.error(
                            "Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, " +
                             "realChecksum={}",
                            logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                            realChecksum);
                    return RpcResponseFactory.newResponse(RaftError.EINVAL,
                            "The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
                            logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
                            realChecksum);
                }

                entries.add(logEntry);
            }
        }
        //儲存日誌,並回調返回response
        final FollowerStableClosure closure = new FollowerStableClosure(request, AppendEntriesResponse.newBuilder()
            .setTerm(this.currTerm), this, done, this.currTerm);
        this.logManager.appendEntries(entries, closure);
        // update configuration after _log_manager updated its memory status
        this.conf = this.logManager.checkAndSetConfiguration(this.conf);
        return null;
    } finally {
        if (doUnlock) {
            this.writeLock.unlock();
        }
        this.metrics.recordLatency("handle-append-entries", Utils.monotonicMs() - startMs);
        this.metrics.recordSize("handle-append-entries-count", entriesCount);
    }
}

handleAppendEntriesRequest方法寫的很長,但是實際上做了很多校驗的事情,具體的處理邏輯不多

  1. 校驗當前的Node節點是否還處於活躍狀態,如果不是的話,那麼直接返回一個error的response
  2. 校驗請求的serverId的格式是否正確,不正確則返回一個error的response
  3. 校驗請求的任期是否小於當前的任期,如果是那麼返回一個AppendEntriesResponse型別的response
  4. 呼叫checkStepDown方法檢測當前節點的任期,以及狀態,是否有leader等
  5. 如果請求的serverId和當前節點的leaderId是不是同一個,用來校驗是不是leader發起的請求,如果不是返回一個AppendEntriesResponse
  6. 校驗是否正在生成快照
  7. 獲取請求的Index在當前節點中對應的LogEntry的任期是不是和請求傳入的任期相同,不同的話則返回AppendEntriesResponse
  8. 如果傳入的entriesCount為零,那麼leader傳送的可能是心跳或者傳送的是sendEmptyEntry,返回AppendEntriesResponse,並將當前任期和最新index封裝返回
  9. 請求的資料不為空,那麼遍歷所有的資料
  10. 例項化一個logEntry,並且將資料和屬性設定到logEntry例項中,最後將logEntry放入到entries集合中
  11. 呼叫logManager將資料批量提交日誌寫入 RocksDB

傳送響應給leader

最終傳送給leader的響應是通過AppendEntriesRequestProcessor的sendSequenceResponse來發送的

void sendSequenceResponse(final String groupId, final String peerId, final int seq,
                          final AsyncContext asyncContext, final BizContext bizContext, final Message msg) {
    final Connection connection = bizContext.getConnection();
    //獲取context,維度是groupId和peerId
    final PeerRequestContext ctx = getPeerRequestContext(groupId, peerId, connection);
    final PriorityQueue<SequenceMessage> respQueue = ctx.responseQueue;
    assert (respQueue != null);

    synchronized (Utils.withLockObject(respQueue)) {
        //將要響應的資料放入到優先佇列中
        respQueue.add(new SequenceMessage(asyncContext, msg, seq));
        //校驗佇列裡面的資料是否超過了256
        if (!ctx.hasTooManyPendingResponses()) {
            while (!respQueue.isEmpty()) {
                final SequenceMessage queuedPipelinedResponse = respQueue.peek();
                //如果序列對應不上,那麼就不傳送響應
                if (queuedPipelinedResponse.sequence != getNextRequiredSequence(groupId, peerId, connection)) {
                    // sequence mismatch, waiting for next response.
                    break;
                }
                respQueue.remove();
                try {
                    //傳送響應
                    queuedPipelinedResponse.sendResponse();
                } finally {
                    //序列加一
                    getAndIncrementNextRequiredSequence(groupId, peerId, connection);
                }
            }
        } else {
            LOG.warn("Closed connection to peer {}/{}, because of too many pending responses, queued={}, max={}",
                ctx.groupId, peerId, respQueue.size(), ctx.maxPendingResponses);
            connection.close();
            // Close the connection if there are too many pending responses in queue.
            removePeerRequestContext(groupId, peerId);
        }
    }
}

這個方法會將要傳送的資料依次壓入到PriorityQueue優先佇列中進行排序,然後獲取序列號最小的元素和nextRequiredSequence比較,如果不相等,那麼則是出現了亂序的情況,那麼就不傳送請求

Leader處理日誌複製的Response

Leader收到Follower發過來的Response響應之後會呼叫Replicator的onRpcReturned方法

static void onRpcReturned(final ThreadId id, final RequestType reqType, final Status status, final Message request,
                          final Message response, final int seq, final int stateVersion, final long rpcSendTime) {
    if (id == null) {
        return;
    }
    final long startTimeMs = Utils.nowMs();
    Replicator r;
    if ((r = (Replicator) id.lock()) == null) {
        return;
    }
    //檢查版本號,因為每次resetInflights都會讓version加一,所以檢查一下
    if (stateVersion != r.version) {
        LOG.debug(
            "Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
            r, stateVersion, r.version, request, response, status);
        id.unlock();
        return;
    }
    //使用優先佇列按seq排序,最小的會在第一個
    final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
    //這裡用一個優先佇列是因為響應是非同步的,seq小的可能響應比seq大慢
    holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));
    //預設holdingQueue佇列裡面的數量不能超過256
    if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
        LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
            holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
        //重新發送探針
        //清空資料
        r.resetInflights();
        r.state = State.Probe;
        r.sendEmptyEntries(false);
        return;
    }

    boolean continueSendEntries = false;

    final boolean isLogDebugEnabled = LOG.isDebugEnabled();
    StringBuilder sb = null;
    if (isLogDebugEnabled) {
        sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses,");
    }
    try {
        int processed = 0;
        while (!holdingQueue.isEmpty()) {
            //取出holdingQueue裡seq最小的資料
            final RpcResponse queuedPipelinedResponse = holdingQueue.peek();

            //如果Follower沒有響應的話就會出現次序對不上的情況,那麼就不往下走了
            //sequence mismatch, waiting for next response.
            if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
                // 如果之前存在處理,則到此直接break迴圈
                if (processed > 0) {
                    if (isLogDebugEnabled) {
                        sb.append("has processed ").append(processed).append(" responses,");
                    }
                    break;
                } else {
                    //Do not processed any responses, UNLOCK id and return.
                    continueSendEntries = false;
                    id.unlock();
                    return;
                }
            }
            //走到這裡說明seq對的上,那麼就移除優先佇列裡面seq最小的資料
            holdingQueue.remove();
            processed++;
            //獲取inflights佇列裡的第一個元素
            final Inflight inflight = r.pollInflight();
            //發起一個請求的時候會將inflight放入到佇列中
            //如果為空,那麼就忽略
            if (inflight == null) {
                // The previous in-flight requests were cleared.
                if (isLogDebugEnabled) {
                    sb.append("ignore response because request not found:").append(queuedPipelinedResponse)
                        .append(",\n");
                }
                continue;
            }
            //seq沒有對上,說明順序亂了,重置狀態
            if (inflight.seq != queuedPipelinedResponse.seq) {
                // reset state
                LOG.warn(
                    "Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
                    r, inflight.seq, queuedPipelinedResponse.seq);
                r.resetInflights();
                r.state = State.Probe;
                continueSendEntries = false;
                // 鎖住節點,根據錯誤類別等待一段時間
                r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
                return;
            }
            try {
                switch (queuedPipelinedResponse.requestType) {
                    case AppendEntries:
                        //處理日誌複製的response
                        continueSendEntries = onAppendEntriesReturned(id, inflight, queuedPipelinedResponse.status,
                            (AppendEntriesRequest) queuedPipelinedResponse.request,
                            (AppendEntriesResponse) queuedPipelinedResponse.response, rpcSendTime, startTimeMs, r);
                        break;
                    case Snapshot:
                        //處理快照的response
                        continueSendEntries = onInstallSnapshotReturned(id, r, queuedPipelinedResponse.status,
                            (InstallSnapshotRequest) queuedPipelinedResponse.request,
                            (InstallSnapshotResponse) queuedPipelinedResponse.response);
                        break;
                }
            } finally {
                if (continueSendEntries) {
                    // Success, increase the response sequence.
                    r.getAndIncrementRequiredNextSeq();
                } else {
                    // The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
                    break;
                }
            }
        }
    } finally {
        if (isLogDebugEnabled) {
            sb.append(", after processed, continue to send entries: ").append(continueSendEntries);
            LOG.debug(sb.toString());
        }
        if (continueSendEntries) {
            // unlock in sendEntries.
            r.sendEntries();
        }
    }
}
  1. 檢查版本號,因為每次resetInflights都會讓version加一,所以檢查一下是不是同一批的資料
  2. 獲取Replicator的pendingResponses佇列,然後將當前響應的資料封裝成RpcResponse例項加入到佇列中
  3. 校驗佇列裡面的元素是否大於256,大於256則清空資料重新同步
  4. 校驗holdingQueue佇列裡面的seq最小的序列資料序列和當前的requiredNextSeq是否相同,不同的話如果是剛進入迴圈那麼直接break退出迴圈
  5. 獲取inflights佇列中第一個元素,如果seq沒有對上,說明順序亂了,重置狀態
  6. 呼叫onAppendEntriesReturned方法處理日誌複製的response
  7. 如果處理成功,那麼則呼叫sendEntries繼續傳送複製日誌到Follower

Replicator#onAppendEntriesReturned

private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight inflight, final Status status,
                                               final AppendEntriesRequest request,
                                               final AppendEntriesResponse response, final long rpcSendTime,
                                               final long startTimeMs, final Replicator r) {
    //校驗資料序列有沒有錯
    if (inflight.startIndex != request.getPrevLogIndex() + 1) {
        LOG.warn(
            "Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
            r, inflight.startIndex, request.getPrevLogIndex());
        r.resetInflights();
        r.state = State.Probe;
        // unlock id in sendEmptyEntries
        r.sendEmptyEntries(false);
        return false;
    }
    //度量
    // record metrics
    if (request.getEntriesCount() > 0) {
        r.nodeMetrics.recordLatency("replicate-entries", Utils.monotonicMs() - rpcSendTime);
        r.nodeMetrics.recordSize("replicate-entries-count", request.getEntriesCount());
        r.nodeMetrics.recordSize("replicate-entries-bytes", request.getData() != null ? request.getData().size()
            : 0);
    }

    final boolean isLogDebugEnabled = LOG.isDebugEnabled();
    StringBuilder sb = null;
    if (isLogDebugEnabled) {
        sb = new StringBuilder("Node "). //
            append(r.options.getGroupId()).append(":").append(r.options.getServerId()). //
            append(" received AppendEntriesResponse from "). //
            append(r.options.getPeerId()). //
            append(" prevLogIndex=").append(request.getPrevLogIndex()). //
            append(" prevLogTerm=").append(request.getPrevLogTerm()). //
            append(" count=").append(request.getEntriesCount());
    }
    //如果follower因為崩潰,RPC呼叫失敗等原因沒有收到成功響應
    //那麼需要阻塞一段時間再進行呼叫
    if (!status.isOk()) {
        // If the follower crashes, any RPC to the follower fails immediately,
        // so we need to block the follower for a while instead of looping until
        // it comes back or be removed
        // dummy_id is unlock in block
        if (isLogDebugEnabled) {
            sb.append(" fail, sleep.");
            LOG.debug(sb.toString());
        }
        //如果註冊了Replicator狀態監聽器,那麼通知所有監聽器
        notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
        if (++r.consecutiveErrorTimes % 10 == 0) {
            LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}", r.options.getPeerId(),
                r.consecutiveErrorTimes, status);
        }
        r.resetInflights();
        r.state = State.Probe;
        // unlock in in block
        r.block(startTimeMs, status.getCode());
        return false;
    }
    r.consecutiveErrorTimes = 0;
    //響應失敗
    if (!response.getSuccess()) {
        // Leader 的切換,表明可能出現過一次網路分割槽,從新跟隨新的 Leader
        if (response.getTerm() > r.options.getTerm()) {
            if (isLogDebugEnabled) {
                sb.append(" fail, greater term ").append(response.getTerm()).append(" expect term ")
                    .append(r.options.getTerm());
                LOG.debug(sb.toString());
            }
            // 獲取當前本節點的表示物件——NodeImpl
            final NodeImpl node = r.options.getNode();
            r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
            r.destroy();
            // 調整自己的 term 任期值
            node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
                "Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
            return false;
        }
        if (isLogDebugEnabled) {
            sb.append(" fail, find nextIndex remote lastLogIndex ").append(response.getLastLogIndex())
                .append(" local nextIndex ").append(r.nextIndex);
            LOG.debug(sb.toString());
        }
        if (rpcSendTime > r.lastRpcSendTimestamp) {
            r.lastRpcSendTimestamp = rpcSendTime;
        }
        // Fail, reset the state to try again from nextIndex.
        r.resetInflights();
        //如果Follower最新的index小於下次要傳送的index,那麼設定為Follower響應的index
        // prev_log_index and prev_log_term doesn't match
        if (response.getLastLogIndex() + 1 < r.nextIndex) {
            LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex());
            // The peer contains less logs than leader
            r.nextIndex = response.getLastLogIndex() + 1;
        } else {
            // The peer contains logs from old term which should be truncated,
            // decrease _last_log_at_peer by one to test the right index to keep
            if (r.nextIndex > 1) {
                LOG.debug("logIndex={} dismatch", r.nextIndex);
                r.nextIndex--;
            } else {
                LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen",
                    r.options.getPeerId());
            }
        }
        //響應失敗需要重新獲取Follower的日誌資訊,用來重新同步
        // dummy_id is unlock in _send_heartbeat
        r.sendEmptyEntries(false);
        return false;
    }
    if (isLogDebugEnabled) {
        sb.append(", success");
        LOG.debug(sb.toString());
    }
    // success
    //響應成功檢查任期
    if (response.getTerm() != r.options.getTerm()) {
        r.resetInflights();
        r.state = State.Probe;
        LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
        id.unlock();
        return false;
    }
    if (rpcSendTime > r.lastRpcSendTimestamp) {
        r.lastRpcSendTimestamp = rpcSendTime;
    }
    // 本次提交的日誌數量
    final int entriesSize = request.getEntriesCount();
    if (entriesSize > 0) {
        // 節點確認提交
        r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1,
                r.options.getPeerId());
        }
    } else {
        // The request is probe request, change the state into Replicate.
        r.state = State.Replicate;
    }
    r.nextIndex += entriesSize;
    r.hasSucceeded = true;
    r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
    // dummy_id is unlock in _send_entries
    if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
        r.sendTimeoutNow(false, false);
    }
    return true;
}

onAppendEntriesReturned方法也非常的長,但是我們要有點耐心往下看

  1. 校驗資料序列有沒有錯
  2. 進行度量和拼接日誌操作
  3. 判斷一下返回的狀態如果不是正常的,那麼就通知監聽器,進行重置操作並阻塞一定時間後再發送
  4. 如果返回Success狀態為false,那麼校驗一下任期,因為Leader 的切換,表明可能出現過一次網路分割槽,需要重新跟隨新的 Leader;如果任期沒有問題那麼就進行重置操作,並根據Follower返回的最新的index來重新設值nextIndex
  5. 如果各種校驗都沒有問題的話,那麼進行日誌提交確認,更新最新的日誌提交位置索引