1. 程式人生 > >ZK叢集如何保證資料一致性原始碼閱讀

ZK叢集如何保證資料一致性原始碼閱讀

什麼是資料一致性?

只有當服務端的ZK存在多臺時,才會出現數據一致性的問題, 服務端存在多臺伺服器,他們被劃分成了不同的角色,只有一臺Leader,多臺Follower和多臺Observer, 他們中的任意一臺都能響應客戶端的讀請求,任意一臺也都能接收寫請求, 不同的是,Follower和Observer接收到客戶端的寫請求後不能直接處理這個請求而是將這個請求轉發給Leader,由Leader發起原子廣播完成資料一致性

理論上ZK叢集中的每一個節點的作用都是相同的,他們應該和單機時一樣,各個節點存放的資料保持一致才行

Leader接收到Follower轉發過來的寫請求後發起提議,要求每一個Follower都對這次寫請求進行投票Observer不參加投票,繼續響應client的讀請求),Follower收到請求後,如果認為可以執行寫操作,就傳送給leader確認ack, 這裡存在一個過半機制,就是說,在Leader發起的這次請求中如果存在一半以上的Follower響應了ack,Leader就認為這次的寫操作通過了決議,向Follower傳送commit,讓它們把最新的操作寫進自己的檔案系統

還有新新增一臺ZK伺服器到叢集中,也面臨著資料一致性的問題,它需要去Leader中讀取同步資料

Zab協議(ZooKeeper Atomic Brocadcast)

什麼是Zab協議

Zab協議是一個分散式一致性演算法,讓ZK擁有了崩潰恢復和原子廣播的能力,進而保證叢集中的資料一致性

ZK對Zab的協議的實現架構: 主備模型,任何Learner節點接收到非事務請求查詢本地快取然後返回,任何事務操作都需要轉發給Leader,由Leader發起決議,同叢集中超過半數的Follower返回確認ack時,Leader進行廣播,要求全部節點提交事務

特性:

  • 保證在leader上提交的事務最終被所有的伺服器提交
  • 保證丟棄沒有經過半數檢驗的事務

Zab協議的作用

  • 使用一個單獨的程序,保持leader和Learner之間的socket通訊,閱讀原始碼這個Thread就是learnerHandler,任何寫請求都將由Leader在叢集中進行原子廣播事務
  • 保證了全部的變更序列在全域性被順序引用,寫操作中都需要先check然後才能寫,比如我們向create /a/b 它在建立b時,會先檢查a存在否? 而且,事務性的request存在於佇列中,先進先出,保證了他們之間的順序

Zab協議原理

  • 選舉: 在Follower中選舉中一個Leader
  • 發現: Leader中會維護一個Follower的列表並與之通訊
  • 同步: Leader會把自己的資料同步給Follower, 做到多副本儲存,體現了CAP的A和P 高可用和分割槽容錯
  • 廣播: Leader接受Follower的事務Proposal,然後將這個事務性的proposal廣播給其他learner

Zab協議內容

當整個叢集啟動過程中,或者當 Leader 伺服器出現網路中弄斷、崩潰退出或重啟等異常時,Zab協議就會 進入崩潰恢復模式,選舉產生新的Leader。

當選舉產生了新的 Leader,同時叢集中有過半的機器與該 Leader 伺服器完成了狀態同步(即資料同步)之後,Zab協議就會退出崩潰恢復模式,進入訊息廣播模式。

當Leader出現崩潰退出或者機器重啟,亦或是叢集中不存在超過半數的伺服器與Leader儲存正常通訊,Zab就會再一次進入崩潰恢復,發起新一輪Leader選舉並實現資料同步。同步完成後又會進入訊息廣播模式,接收事務請求

參考部落格-簡書 -_Zy

原始碼入口

單機版本還是叢集版本的啟動流程中,前部分幾乎是相同的,一直到QuorumPeerMain.javainitializeAndRun()方法,單機模式下執行的是ZooKeeperServerMain.main(args);, 叢集模式下,執行的是runFromConfig(config);

因此當前部落格從QuorumPeerMainrunFromConfig()開始

其中的QuorumPeer.java可以看成ZK叢集中的每一個server實體,下面程式碼大部分篇幅是在當前server的屬性完成初始化

   // todo 叢集啟動的邏輯
    public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      } 
  
      LOG.info("Starting quorum peer");
      try {

          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();

          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());

          // todo new QuorumPeer()  可以理解成, 建立了叢集中的一個server
          quorumPeer = getQuorumPeer();

          // todo 將配置檔案中解析出來的檔案原封不動的賦值給我們的new 的QuorumPeer
          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());


          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          // todo 著重看這個方法
          quorumPeer.start();

          quorumPeer.join();

跟進quorumPeer.start()方法,原始碼如下, 主要做了如下幾件事

  • 資料恢復
  • 經過上下文的工廠,啟動這個執行緒類,使當前的server擁有接受client請求的能力(但是RequestProcessor沒有初始化,因此它能接受request,卻不能處理request)
  • 選舉Leader,在這個過程中會在Follower中選舉出一個leader,確立好叢集中的 Leader,Follower,Observer的三大角色
  • 啟動當前執行緒類QuorumPeer.java
@Override
public synchronized void start() {
    // todo 從磁碟中載入資料到記憶體中
    loadDataBase();

    // todo 啟動上下文的這個工廠,他是個執行緒類, 接受客戶端的請求
    cnxnFactory.start();

    // todo 開啟leader的選舉工作
    startLeaderElection();

    // todo 確定伺服器的角色, 啟動的就是當前類的run方法在900行
    super.start();
}

看一下QuorumPeer.java的run方法,部分原始碼如下,邏輯很清楚通過了上面的角色的選舉之後,叢集中各個節點的角色已經確定下來了,那擁有不同角色的節點就會進入下面程式碼中不同的case分支中

  • looking : 正在進行領導者的選舉
  • observer: 觀察者
  • leading : 叢集的leader
  • following: 叢集的Follower
 while (running) {
                switch (getPeerState()) {

        case LOOKING:
            LOG.info("LOOKING");

            if (Boolean.getBoolean("readonlymode.enabled")) {
                LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                // Create read-only server but don't start it immediately
                final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                        logFactory, this,
                        new ZooKeeperServer.BasicDataTreeBuilder(),
                        this.zkDb);

                Thread roZkMgr = new Thread() {
                    public void run() {
                        try {
                            // lower-bound grace period to 2 secs
                            sleep(Math.max(2000, tickTime));
                            if (ServerState.LOOKING.equals(getPeerState())) {
                                roZk.startup();
                            }
                        } catch (InterruptedException e) {
                            LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                        } catch (Exception e) {
                            LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                        }
                    }
                };
                try {
                    roZkMgr.start();
                    setBCVote(null);
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                    setPeerState(ServerState.LOOKING);
                } finally {
                    roZkMgr.interrupt();
                    roZk.shutdown();
                }
            } else {
                try {
                    setBCVote(null);
                    setCurrentVote(makeLEStrategy().lookForLeader());
                } catch (Exception e) {
                    LOG.warn("Unexpected exception", e);
                    setPeerState(ServerState.LOOKING);
                }
            }
            break;
        case OBSERVING:
            try {
                LOG.info("OBSERVING");
                setObserver(makeObserver(logFactory));
                observer.observeLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e );                        
            } finally {
                observer.shutdown();
                setObserver(null);
                setPeerState(ServerState.LOOKING);
            }
            break;
        case FOLLOWING:
            // todo server 當選follow角色
            try {
                LOG.info("FOLLOWING");
                setFollower(makeFollower(logFactory));
                follower.followLeader();
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                follower.shutdown();
                setFollower(null);
                setPeerState(ServerState.LOOKING);
            }
            break;
        case LEADING:
            // todo 伺服器成功當選成leader
            LOG.info("LEADING");
            try {
                setLeader(makeLeader(logFactory));
                // todo 跟進lead
                leader.lead();
                setLeader(null);
            } catch (Exception e) {
                LOG.warn("Unexpected exception",e);
            } finally {
                if (leader != null) {
                    leader.shutdown("Forcing shutdown");
                    setLeader(null);
                }
                setPeerState(ServerState.LOOKING);
            }
            break;
        }
    }

下面看一下,server當選成不同的角色後,後幹了什麼

總覽Leader&Follower

當選成Leader

跟進原始碼,上面程式碼片段中makeLeader() 由這個方法建立了一個Leader的封裝類

protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
        // todo 跟進它的Leader 構造方法
        return new Leader(this, new LeaderZooKeeperServer(logFactory,
                this,new ZooKeeperServer.BasicDataTreeBuilder(), this.zkDb));
    }

這是LeaderZooKeeperServer的繼承圖,可以看到其實他繼承了單機模式下的ZKServer

呼叫leader.lead()方法,這個方法主要做了如下幾件事

  • 建立了StateSummary物件
    • 這個物件封裝了zxid以及currentEpoch, 其中zxid就是最後一次和znode相關的事務id,後者是當前的epoch 它有64位,高32位標記是第幾代Leader,後32位是當前代leader提交的事務次數,Follower只識別高版本的前32位為Leader
  • 針對每一個Learner都開啟了一條新的執行緒LearnerCnxAcceptor,這條執行緒負責Leader和Learner(Observer+Follower)之間的IO交流
  • LearnerCnxAcceptorrun()方法中,只要有新的連線來了,新開啟了一條新的執行緒,LearnerHander,由他負責Leader中接受每一個參議員的packet,以及監聽新連線的到來
  • leader啟動...
 void lead() throws IOException, InterruptedException {

        zk.registerJMX(new LeaderBean(this, zk), self.jmxLocalPeerBean);

        try {
            self.tick.set(0);
            zk.loadData();
            // todo  建立了  封裝有狀態比較邏輯的物件
            leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());

            // todo 建立一個新的執行緒,為了新的 followers 來連線
            cnxAcceptor = new LearnerCnxAcceptor();
            cnxAcceptor.start();
            
            readyToStart = true;
            // todo
            long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
            
            zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
            
            synchronized(this){
                lastProposed = zk.getZxid();
            }
            
            newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
                    null, null);


            if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
                LOG.info("NEWLEADER proposal has Zxid of "
                        + Long.toHexString(newLeaderProposal.packet.getZxid()));
            }
            
            waitForEpochAck(self.getId(), leaderStateSummary);
            self.setCurrentEpoch(epoch);


            try {
                waitForNewLeaderAck(self.getId(), zk.getZxid());
            } catch (InterruptedException e) {
                shutdown("Waiting for a quorum of followers, only synced with sids: [ "
                        + getSidSetString(newLeaderProposal.ackSet) + " ]");
                HashSet<Long> followerSet = new HashSet<Long>();
                for (LearnerHandler f : learners)
                    followerSet.add(f.getSid());
                    
                if (self.getQuorumVerifier().containsQuorum(followerSet)) {
                    LOG.warn("Enough followers present. "
                            + "Perhaps the initTicks need to be increased.");
                }
                Thread.sleep(self.tickTime);
                self.tick.incrementAndGet();
                return;
            }

            // todo 啟動server
            startZkServer();
            
   
            String initialZxid = System.getProperty("zookeeper.testingonly.initialZxid");
            if (initialZxid != null) {
                long zxid = Long.parseLong(initialZxid);
                zk.setZxid((zk.getZxid() & 0xffffffff00000000L) | zxid);
            }
            
            if (!System.getProperty("zookeeper.leaderServes", "yes").equals("no")) {
                self.cnxnFactory.setZooKeeperServer(zk);
            }

            boolean tickSkip = true;
    
            while (true) {
                Thread.sleep(self.tickTime / 2);
                if (!tickSkip) {
                    self.tick.incrementAndGet();
                }
                HashSet<Long> syncedSet = new HashSet<Long>();

                // lock on the followers when we use it.
                syncedSet.add(self.getId());

                for (LearnerHandler f : getLearners()) {
                    // Synced set is used to check we have a supporting quorum, so only
                    // PARTICIPANT, not OBSERVER, learners should be used
                    if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
                        syncedSet.add(f.getSid());
                    }
                    f.ping();
                }

                // check leader running status
                if (!this.isRunning()) {
                    shutdown("Unexpected internal error");
                    return;
                }

              if (!tickSkip && !self.getQuorumVerifier().containsQuorum(syncedSet)) {
                //if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
                    // Lost quorum, shutdown
                    shutdown("Not sufficient followers synced, only synced with sids: [ "
                            + getSidSetString(syncedSet) + " ]");
                    // make sure the order is the same!
                    // the leader goes to looking
                    return;
              } 
              tickSkip = !tickSkip;
            }
        } finally {
            zk.unregisterJMX(this);
        }
    }

當選成Follower

通過上面的case分支進入FOLLOWING塊,進入followerLeader方法

下面的Follower.java中的程式碼的主要邏輯:

  • 和Leader建立起連線
  • registerWithLeader()註冊進Leader
  • syncWithLeader()從Leader中同步資料並完成啟動
  • while(true){...}中接受leader傳送過來的packet,處理packet
void followLeader() throws InterruptedException {

    fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
    try {
        // todo 找出LeaderServer
        QuorumServer leaderServer = findLeader();            
        try {
            // todo 和Leader建立連線
            connectToLeader(leaderServer.addr, leaderServer.hostname);

            // todo 註冊在leader上(會往leader上傳送資料)
            //todo 這個Epoch代表當前是第幾輪選舉leader, 這個值給leader使用,由leader從接收到的最大的epoch中選出最大的,然後統一所有learner中的epoch值
            long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);

            long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
            if (newEpoch < self.getAcceptedEpoch()) {
                LOG.error("Proposed leader epoch " + ZxidUtils.zxidToString(newEpochZxid)
                        + " is less than our accepted epoch " + ZxidUtils.zxidToString(self.getAcceptedEpoch()));
                throw new IOException("Error: Epoch of leader is lower");
            }
            // todo 從leader同步資料, 同時也是在這個方法中完成初始化啟動的
            syncWithLeader(newEpochZxid);
            QuorumPacket qp = new QuorumPacket();

            // todo 在follower中開啟無線迴圈, 不停的接收服務端的pakcet,然後處理packet
            while (this.isRunning()) {
                readPacket(qp);
                // todo (接受leader傳送的提議)
                processPacket(qp);
            }
        } catch (Exception e) {
            LOG.warn("Exception when following the leader", e);
            try {
                sock.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }

            // clear pending revalidations
            pendingRevalidations.clear();
        }
    } finally {
        zk.unregisterJMX((Learner)this);
    }
}

Leader&Follower互動的細節流程

這部分的邏輯流程圖如下

Follower同Leader之間建立起Socket長連線

  • 在Follower中原始碼如下, 嘗試五次和Leader建立連線,重試五次後放棄
protected void connectToLeader(InetSocketAddress addr, String hostname)
        throws IOException, ConnectException, InterruptedException {
    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);
    for (int tries = 0; tries < 5; tries++) {
        try {
            sock.connect(addr, self.tickTime * self.syncLimit);
            sock.setTcpNoDelay(nodelay);
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }

    self.authLearner.authenticate(sock, hostname);

    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
            sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}   
  • 在Leader中等待建立連線, 每當向上面有客戶端請求和Leader建立連線,就在如下的run()邏輯中的LearnerHandler()為每一條新的連線開啟一條新的執行緒
Leader.java

@Override
public void run() {
try {
    while (!stop) {
        // todo 下面的主要邏輯就是,在當前執行緒中輪詢,只要有一條連線進來就單獨開啟一條執行緒(LearnerHandler)
        try{
            // todo 從serversocket中獲取連線
            Socket s = ss.accept();
            // start with the initLimit, once the ack is processed in LearnerHandler switch to the syncLimit
            // todo  從initlimit開始,在learnerhandler中處理ack之後,切換到synclimit
            s.setSoTimeout(self.tickTime * self.initLimit);
            s.setTcpNoDelay(nodelay);// todo 禁用delay演算法

            // todo 讀取socket中的資料
            BufferedInputStream is = new BufferedInputStream(s.getInputStream());

            // todo 建立處理所有leanner資訊的 handler,他也執行緒類
            LearnerHandler fh = new LearnerHandler(s, is, Leader.this);

            fh.start();

Follower向Leader傳送註冊訊息

protected long registerWithLeader(int pktType) throws IOException{
    /*
     * Send follower info, including last zxid and sid
     */
    long lastLoggedZxid = self.getLastLoggedZxid();
    QuorumPacket qp = new QuorumPacket();                
    qp.setType(pktType);
    qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));
    
    /*
     * Add sid to payload
     */
    LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
    ByteArrayOutputStream bsid = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
    boa.writeRecord(li, "LearnerInfo");
    qp.setData(bsid.toByteArray());
    // todo 往leader傳送資料
    writePacket(qp, true);
    readPacket(qp);        

LearnerHandler接收資料

下面的接受解析請求的邏輯,learner接收到Follower的註冊響應後首先是從請求中,將request解析出來, 然後驗證一下,如果不是Leader.FOLLOWERINFO 或者是Leader.Observer 型別的直接返回了,如果是接著往下處理

引出了epoch的概念,它全長64位,前32位代表的是第幾代Leader,因為網路或者其他原因,leader是可能掛掉的,Leader有屬於自己的一個epoch編號,從1,2..開始,一旦Leader掛了,從新選出來的Leader的epoch就會更新,肯定會比原來老leader的epoch值大, 後32位標記的就是當前leader發起的第幾次決議

看它是怎麼處理的,通過程式碼,它會選出所有的Follower中最大的epoch值,並且在此基礎上+1,作為最新的epoch值,當然這是Leader自己選出來的值,那Follower能不能同意這個值呢?,跟進leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);,它裡面使用了過半檢查機制,不滿足半數檢驗就會wait(), 那什麼時候喚醒呢? 其實只要叢集中再有其他的Follower啟動,會重複執行以上的邏輯,再次來到這個方法進行半數檢驗,就有可能喚醒

if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) {
        waitingForNewEpoch = false;
        self.setAcceptedEpoch(epoch);
        connectingFollowers.notifyAll();
    } else {
        long start = Time.currentElapsedTime();
        long cur = start;
        long end = start + self.getInitLimit()*self.getTickTime();
        while(waitingForNewEpoch && cur < end) {
            connectingFollowers.wait(end - cur);
            cur = Time.currentElapsedTime();
        }
        if (waitingForNewEpoch) {
            throw new InterruptedException("Timeout while waiting for epoch from quorum");        
        }
    }

再往後,leader向Follower傳送確認ack,包含最新的epoch+zxid,告訴Follower以後它的事務就從這個zxid開始,這個ack的header= Leader.LEADERINFO

傳送完成之後,leader開始等待Follower的響應的ack

public void run() {
try {
    leader.addLearnerHandler(this);
    tickOfNextAckDeadline = leader.self.tick.get()
            + leader.self.initLimit + leader.self.syncLimit;

    ia = BinaryInputArchive.getArchive(bufferedInput);
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    oa = BinaryOutputArchive.getArchive(bufferedOutput);

    QuorumPacket qp = new QuorumPacket();
    // todo 讀取follower傳送過來的資料
    ia.readRecord(qp, "packet");
    // todo 第一次Follower傳送的註冊請求的header = Leader.FOLLOWERINFO
    // todo leader 遇到非FOLLOWERINFO的 和 OBSERVERINFO的訊息直接返回
    if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
        LOG.error("First packet " + qp.toString()
                + " is not FOLLOWERINFO or OBSERVERINFO!");
        return;
    }
    
    .
    .
    .
    
    //獲取出Follower中最後一次epoch
    long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
    long peerLastZxid;
    StateSummary ss = null;
    long zxid = qp.getZxid();
    // todo leader用當前方法從眾多follower中選出epoch值最大的(而且還會再最大的基礎上加1)
    // todo this.getSid()指定的 learner 的myid
    // todo this.getSid()指定的 learner 的lastAcceptedEpoch
    long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
    .
    .
    .
    } else {
        byte ver[] = new byte[4];
        ByteBuffer.wrap(ver).putInt(0x10000);
        // todo leader接收到learner的資料之後,給learnner 傳送LEADERINFO型別的響應
        // todo 返回了最新的epoch
        QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
        oa.writeRecord(newEpochPacket, "packet");
        bufferedOutput.flush();
        QuorumPacket ackEpochPacket = new QuorumPacket();
        ia.readRecord(ackEpochPacket, "packet");
        if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
            LOG.error(ackEpochPacket.toString()
                    + " is not ACKEPOCH");
            return;
        }
        ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
        ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
        // todo 等待learner的響應ack
        leader.waitForEpochAck(this.getSid(), ss);
    }

Follower接收leader的資訊,併發送響應

Follower獲取到leader的相應的資訊,解析出當前leader的 leaderProtocolVersion,然後給leader傳送 header=Leader.ACKEPOCH的ack

    protected long registerWithLeader(int pktType) throws IOException{
     .
     .
     .
     .
        readPacket(qp);        
        final long newEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
        if (qp.getType() == Leader.LEADERINFO) {
            // we are connected to a 1.0 server so accept the new epoch and read the next packet
            leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
            byte epochBytes[] = new byte[4];
            final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
            if (newEpoch > self.getAcceptedEpoch()) {
                wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
                self.setAcceptedEpoch(newEpoch);
            } else if (newEpoch == self.getAcceptedEpoch()) {
                // since we have already acked an epoch equal to the leaders, we cannot ack
                // again, but we still need to send our lastZxid to the leader so that we can
                // sync with it if it does assume leadership of the epoch.
                // the -1 indicates that this reply should not count as an ack for the new epoch
                wrappedEpochBytes.putInt(-1);
            } else {
                throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
            }
            QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
            writePacket(ackNewEpoch, true);
            return ZxidUtils.makeZxid(newEpoch, 0);

Leader接收到Follower的ack後,開始同步資料的邏輯

看一下,如果Follower中最後一次的事務id和leader中的事務id值相同的話,說明沒有資料可以同步

在看單機版本的ZKServer啟動時,可以發現,在FinalRequestProcessor中存在一個commitedlog集合,這個集合中的存放著已經被持久化了的request,它的作用就是為了給當前Follower同步資料使用,因為Follower可以通過Leader最近的一次快照快速回複數據,但是快照是不定時打一次的,這就有可能出現缺失資料,所以搞了個commitedlog

用法:

  • 檢視當前Follower的zxid是不是處於commitedlog集合中,最大的和最下的zxid之間,在這之間的話就說明從當前的Follower的zxid到commitedlog中最大的zxid之間的request中,都需要執行一遍,這種方式就稱為Leader.DIFF,僅僅同步不一樣的
    • 第一點: 並沒有挨個傳送同步的請求,而是把他們放到一個集合中,統一發送
    • QuorumPacket的型別是Leader.COMMIT, Follower接收到這個commit之後,直接會提交同步這個集合中的request,完成資料的同步操作
  • 檢視Follower中最後一次的zxid比Leader中的最大的zxid事務id還大,不管37 21 直接要求Follower將超過Leader的部分trunc,說白了就是刪除掉

  • 如果Follower中最大的zxid比leader中最小的zxid還小,使用快照的同步方式

區別:

快照中資料序列化後,使用Socket傳送到Follower

給Follower傳送同步資料的命令是通過下面方法中的一條單獨的執行緒完成的

if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
            // Follower is already sync with us, send empty diff
            LOG.info("leader and follower are in sync, zxid=0x{}",
                    Long.toHexString(peerLastZxid));
            packetToSend = Leader.DIFF;
            zxidToSend = peerLastZxid;
  } else if (proposals.size() != 0) {
   .
   .
   .
  if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
    LOG.debug("Sending proposals to follower");

    // as we look through proposals, this variable keeps track of previous proposal Id.
    // todo 當我們檢視以前的建議時,這個變數存放的是之前最小的 建議id
    long prevProposalZxid = minCommittedLog;

    // Keep track of whether we are about to send the first packet.
    // todo 跟蹤我們是否要傳送第一個包
    // Before sending the first packet, we have to tell the learner
    //todo 在我們傳送第一個包之前, 我們要告訴leanner是期待一個 trunc 還是一個 diff
    // whether to expect a trunc or a diff
    boolean firstPacket=true;

    // If we are here, we can use committedLog to sync with follower. Then we only need to decide whether to send trunc or not
    // todo 當我們執行到這裡了,我們使用 committedLog 來給Follower提供資料同步
    packetToSend = Leader.DIFF;
    zxidToSend = maxCommittedLog;

    for (Proposal propose: proposals) {
        // skip the proposals the peer already has
        if (propose.packet.getZxid() <= peerLastZxid) {
            prevProposalZxid = propose.packet.getZxid();
            continue;
        } else {
            // If we are sending the first packet, figure out whether to trunc
            // in case the follower has some proposals that the leader doesn't
            // todo 當我們傳送第一個packet時, 弄明白是否trunc, 以防leader沒有Follower擁有的proposals
            if (firstPacket) {
                firstPacket = false;
                // Does the peer have some proposals that the leader hasn't seen yet
                if (prevProposalZxid < peerLastZxid) {
                    // send a trunc message before sending the diff
                    packetToSend = Leader.TRUNC;                                        
                    zxidToSend = prevProposalZxid;
                    updates = zxidToSend;
                }
            }
            // todo 放入佇列(未傳送)
            queuePacket(propose.packet);
            // todo 這一步就是leader給leanner傳送的commit響應. leanner接收到這個響應之後無須在傳送確認請求,直接同步資料
            QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                    null, null);
            queuePacket(qcommit);
        }
    }
} else if (peerLastZxid > maxCommittedLog) {
    // todo leanner最後一次提交的zxid 事務id比 leader中最大的事務id還大
    LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
            Long.toHexString(maxCommittedLog),
            Long.toHexString(updates));

    packetToSend = Leader.TRUNC;
    zxidToSend = maxCommittedLog;
    updates = zxidToSend;
} else {
    LOG.warn("Unhandled proposal scenario");
}
.
.

     bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            // todo 需要將zxidToSend 設定成最新的zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                LOG.info("Sending snapshot last zxid of peer is 0x"
                        + Long.toHexString(peerLastZxid) + " " 
                        + " zxid of leader is 0x"
                        + Long.toHexString(leaderLastZxid)
                        + "sent zxid of db as 0x" 
                        + Long.toHexString(zxidToSend));
                // Dump data to peer
                // todo 從快照中同步資料
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                // todo 快照直接通過socket傳送出去
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            // Start sending packets
            //todo 建立一條新的執行緒,用這條執行緒傳送上面存放到佇列裡面的資料
            new Thread() {
                public void run() {
                    Thread.currentThread().setName(
                            "Sender-" + sock.getRemoteSocketAddress());
                    try {
                        sendPackets();
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption",e);
                    }
                }
            }.start();
            

Follower接受到Leader不同的同步資料命名,做出不同的動作

這個方法又是超級長的,好在也不會很難讀,根據不同的type選擇不同的資料恢復方法

  • 如果是Snap,則將自己的ZKDB清空,然後載入Leader的快照
  • 如果是trunc,就將不合法的zxid的記錄全部刪除,然後重新載入
  • 如果是diff型別的,會進一步進入到while (self.isRunning()) {..}迴圈塊的case模組,將需要同步的request全部新增到集合中packetsCommitted.add(qp.getZxid());,收到服務端的 UPTODATE後才會跳出這個迴圈
  • 通過下面的程式碼檢視,Follower並沒有先消費leader傳送過來的request,因為它現在沒有完成啟動,沒法交給Processor處理,因此它需要先啟動,就在下面的zk.startup();完成啟動
  • 啟動之後,將這裡request載入到記憶體完成資料同步
protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
    QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
    QuorumPacket qp = new QuorumPacket();
    long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
    //todo 同步資料時,如果是diff這種情況, 我們不需要去生成一個快照,因為事務將在現有的快照的基礎上完成同步
    //todo 如果是 snap 或者 trunc 時,需要生成快照
    boolean snapshotNeeded = true;
    // todo 從leader中讀取出一個 packet
    readPacket(qp);
    LinkedList<Long> packetsCommitted = new LinkedList<Long>();
    LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
    synchronized (zk) {
        // todo diff
        if (qp.getType() == Leader.DIFF) {
            LOG.info("Getting a diff from the leader 0x{}", Long.toHexString(qp.getZxid()));
            // todo 修改了一下這個變數的值,這個變數的值在下面的程式碼中賦值給了 writeToTxnLog
            snapshotNeeded = false;
        }
        else if (qp.getType() == Leader.SNAP) {
            // todo 快照
            LOG.info("Getting a snapshot from leader 0x" + Long.toHexString(qp.getZxid()));
            // The leader is going to dump the database clear our own database and read
            // todo 清空我們自己的ZKDB 使用leader傳送的快照重建
            zk.getZKDatabase().clear();
            // todo leaderIs就是server傳送過來的資料,進行反序列化
            zk.getZKDatabase().deserializeSnapshot(leaderIs);

            String signature = leaderIs.readString("signature");
            if (!signature.equals("BenWasHere")) {
                LOG.error("Missing signature. Got " + signature);
                throw new IOException("Missing signature");                   
            }
            // todo 同步當前Follower中最大事務zxid
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());

        } else if (qp.getType() == Leader.TRUNC) {
            //we need to truncate the log to the lastzxid of the leader
            LOG.warn("Truncating log to get in sync with the leader 0x"
                    + Long.toHexString(qp.getZxid()));
            // TODO 刪除log資料
            boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
            if (!truncated) {
                // not able to truncate the log
                LOG.error("Not able to truncate the log "
                        + Long.toHexString(qp.getZxid()));
                System.exit(13);
            }
            zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
        }
        else {
            LOG.error("Got unexpected packet from leader "
                    + qp.getType() + " exiting ... " );
            System.exit(13);

        }
        zk.createSessionTracker();
        
        long lastQueued = 0;

        boolean isPreZAB1_0 = true;
        // todo 如果不拍攝快照,請確保事務不應用於記憶體,而是寫入事務日誌
        // todo diff模式下,snapshotNeeded=false
        //todo  writeToTxnLog = true
        boolean writeToTxnLog = !snapshotNeeded;

        outerLoop:
        while (self.isRunning()) {
            // todo 在這個迴圈中繼續讀取資料, 如果是diff的話,就會讀取到下面拿到commit case
            readPacket(qp);
            switch(qp.getType()) {
            case Leader.PROPOSAL:
                PacketInFlight pif = new PacketInFlight();
                pif.hdr = new TxnHeader();
                pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
                if (pif.hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(pif.hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
                }
                lastQueued = pif.hdr.getZxid();
                packetsNotCommitted.add(pif);
                break;
            case Leader.COMMIT:
                if (!writeToTxnLog) { //todo  diff模式下 條件為false
                    pif = packetsNotCommitted.peekFirst();
                    if (pif.hdr.getZxid() != qp.getZxid()) {
                        LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                    } else {
                        zk.processTxn(pif.hdr, pif.rec);
                        packetsNotCommitted.remove();
                    }
                } else {//todo 進入這個分支
                    // todo 讀取到的qa 新增到packetsCommitted linkedList中 , 這個佇列在下面程式碼中使用
                    packetsCommitted.add(qp.getZxid());
                }
                break;
            case Leader.INFORM:
                /*
                 * Only observer get this type of packet. We treat this
                 * as receiving PROPOSAL and COMMMIT.
                 */
                PacketInFlight packet = new PacketInFlight();
                packet.hdr = new TxnHeader();
                packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
                // Log warning message if txn comes out-of-order
                if (packet.hdr.getZxid() != lastQueued + 1) {
                    LOG.warn("Got zxid 0x"
                            + Long.toHexString(packet.hdr.getZxid())
                            + " expected 0x"
                            + Long.toHexString(lastQueued + 1));
                }
                lastQueued = packet.hdr.getZxid();
                if (!writeToTxnLog) {
                    // Apply to db directly if we haven't taken the snapshot
                    zk.processTxn(packet.hdr, packet.rec);
                } else {
                    packetsNotCommitted.add(packet);
                    packetsCommitted.add(qp.getZxid());
                }
                break;
            case Leader.UPTODATE:
                // todo 想讓下面的程式碼使用上面的佇列就得跳出這個while 迴圈
                // todo 這個while迴圈在當前case中完成跳出
                // todo 也就是說,只有獲取到Leader的uptoDate 請求時才來退出
                if (isPreZAB1_0) {
                    zk.takeSnapshot();
                    self.setCurrentEpoch(newEpoch);
                }
                self.cnxnFactory.setZooKeeperServer(zk);                
                break outerLoop;
            case Leader.NEWLEADER: // Getting NEWLEADER here instead of in discovery 
                // means this is Zab 1.0
                // Create updatingEpoch file and remove it after current
                // epoch is set. QuorumPeer.loadDataBase() uses this file to
                // detect the case where the server was terminated after
                // taking a snapshot but before setting the current epoch.
                File updating = new File(self.getTxnFactory().getSnapDir(),
                                    QuorumPeer.UPDATING_EPOCH_FILENAME);
                if (!updating.exists() && !updating.createNewFile()) {
                    throw new IOException("Failed to create " +
                                          updating.toString());
                }
                if (snapshotNeeded) {
                    zk.takeSnapshot();
                }
                self.setCurrentEpoch(newEpoch);
                if (!updating.delete()) {
                    throw new IOException("Failed to delete " +
                                          updating.toString());
                }
                writeToTxnLog = true; //Anything after this needs to go to the transaction log, not applied directly in memory
                isPreZAB1_0 = false;
                writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
                break;
            }
        }
    }
    ack.setZxid(ZxidUtils.makeZxid(newEpoch, 0));
    writePacket(ack, true);
    sock.setSoTimeout(self.tickTime * self.syncLimit);


    // todo follower 完成初始化啟動, 在跟下去就很熟悉了, 和單機啟動流程神似
    zk.startup();
    
    self.updateElectionVote(newEpoch);

   
    if (zk instanceof FollowerZooKeeperServer) {

    } else if (zk instanceof ObserverZooKeeperServer) {
     
     ///////////////////////////////////////////////////////////////////////////////
        ObserverZooKeeperServer ozk = (ObserverZooKeeperServer) zk;
        for (PacketInFlight p : packetsNotCommitted) {
            Long zxid = packetsCommitted.peekFirst();
            if (p.hdr.getZxid() != zxid) {
           
                LOG.warn("Committing " + Long.toHexString(zxid)
                        + ", but next proposal is "
                        + Long.toHexString(p.hdr.getZxid()));
                continue;
            }
            packetsCommitted.remove();
            Request request = new Request(null, p.hdr.getClientId(),
                    p.hdr.getCxid(), p.hdr.getType(), null, null);
            request.txn = p.rec;
            request.hdr = p.hdr;
            ozk.commitRequest(request);
        }
   ///////////////////////////////////////////////////////////////////////////////
    } else {
        // New server type need to handle in-flight packets
        throw new UnsupportedOperationException("Unknown server type");
    }
}

再回想一下,現在的狀態就是完成了Follower.java中的方法followLeader()

現在的階段是,server啟動完成了,資料也和leader同步了,並且在下面的這個迴圈中可以和Leader一直保持IO交流

// todo 從leader同步資料, 同時也是在這個方法中完成初始化啟動的
    syncWithLeader(newEpochZxid);
    QuorumPacket qp = new QuorumPacket();

    // todo 在follower中開啟無線迴圈, 不停的接收服務端的pakcet,然後處理packet
    while (this.isRunning()) {
        readPacket(qp);
        // todo (接受leader傳送的提議)
        processPacket(qp);
    }

Follower同步完資料,再跟Leader打交道就是 有客戶端有了寫請求,Follower需要將這個寫請求轉發leader進行廣播

Leader中就在下面的邏輯中進行處理,

learnerHandler.javarun()

      case Leader.REQUEST:
                    // todo follower 接收到client的寫請求之後,進入到這個case分支
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if(type == OpCode.sync){
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    leader.zk.submitRequest(si);
                    break;

它開啟了和Follower進行IO交流的執行緒之後,同樣會執行啟動的程式碼


總結: 在本篇部落格中,可以看到在Follower向Leader同步資料的過程中的幾個階段

  • 發現: leader發現Follower並與之建立通訊
  • 同步: Follower可以主要通過兩種方式完成和leader的資料同步工作
    • 通過Leader的快照
    • 通過leader的commitedLog中存放的包含snapshot的已經被持久化的request
  • 原子廣播: 這種情景是當Follower接收到客戶端的寫請求時,它會將這個請求轉發給Leader,因為要保證資料的一致性(原始碼就在learnerHandler的run()方法的最後的while無限迴圈中CASE: Request)
    由Leader發起原子廣播,通知叢集中的全部節點提交事務,完成資料一致性