1. 程式人生 > >ZooKeeper系列之(五):領導者工作模式

ZooKeeper系列之(五):領導者工作模式

領導者就是Leader,是整個叢集的寫事務流程負責人。

一輪選舉結束時產生新的Leader,並且Epoch加1。同時新的Leader先將自己的zxid設定為Epoch左移32位(Epoch<32),將是叢集中最大的zxid。

Leader監聽Socket等待Follower的連線請求,每次新的Follower連線的時候都會啟動一個LearnerHandler執行緒專門處理與該Follower的互動。LearnerHandler迴圈接收Follower的訊息包,並交給Leader進行處理。

leader啟動流程:

1. Leader選舉完成之後,Peer確認了自己是Leader的身份,在QuromPeer的主執行緒中執行Leader的邏輯

2. 建立Leader物件,並建立Server繫結在QuorumAddress上,用於和其他Follower之間相互通訊

3. 呼叫Leader::lead函式,執行Leader的真正的邏輯

a) 呼叫ZooKeeperServer::loadData,從磁碟中恢復資料和session列表

b) 啟用新的epoch,zookeeper中的zxid是64位,用於唯一標示一個操作,zxid的高32位是epoch,每次Leader切換加1,低32位為序列號,每次操作加1

c) 啟動繫結在QuorumAddress上的Server,為每個Follower的連線建立一個LearnerHandler,用於和Follower做互動,這裡的邏輯另外單獨論述

d) 向所有的Follower傳送一個NEWLEADER包,宣告自己額Leader身份,並在initLimit時間內等待大多數的Follower完成和Leader的同步,併發送ACK包,表示Follower已經和Leader完成同步並可以對外提供服務

e) 這時Leader和Client之間的互動在cnxnFactory的Server中,Leader和Follower之間的互動在LearnerHandler所屬的執行緒中

f) 然後呼叫Leader::lead函式的QuromPeer執行緒在每個tickTime中都會發送ping訊息給其他的follower,follower在接收到ping訊息後會回覆一個ping訊息,並附帶上follower的session tracker裡的所有session資訊,leader收到follower的ping訊息後,根據傳回的session資訊更新自己的session資訊 。

Leader在接收到Follower的註冊請求之後(Follower呼叫connectToLeader方法),等待收到FOLLOWERINFO包:

QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if (qp.getType() == Leader.OBSERVERINFO) {
     learnerType = LearnerType.OBSERVER;
}
long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
leader.waitForEpochAck(this.getSid(), ss);
peerLastZxid = ss.getLastZxid();

1) lastAcceptedEpoch:是Follower的Epoch值。

2) Zxid:是Follower的zxid

3) newEpoch:Leader根據FOLLOWERINF的值計算出新的Epoch

4) newLeaderZxid:根據新的Epoch計算新的Leader的zxid

然後給Follower傳送LEADERINFO包,將新的zxid和Epoch告訴Follower,好讓Follower知道應該要同步哪些資料。

Leader然後傳送快照包給Follower,Follower根據快照包將本地資料庫恢復到與Leader相同。

如果Follower的事務比Leader少一些(在minCommittedLog 和maxCommittedLog之間),則不需發SNAP包,而是發DIFF包,同時將需補充的事務通過PROPOSAL和COMMIT發給Follower執行。相關邏輯在syncFollower,queueCommittedProposals,startSendingPackets等方法中實現。這部分主要程式碼如下所示:

boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, newLeaderZxid, 
leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
if (needSnap) {             
    try {
       long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
       oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
       bufferedOutput.flush();
       leader.zk.getZKDatabase().serializeSnapshot(oa);
       oa.writeString("BenWasHere", "signature");
       bufferedOutput.flush();
     } finally {
       snapshot.close();
     }
} 
startSendingPackets();

startSendingPackets將需要同步的事務傳送給Follower,事務同步完成後,Leader傳送NEWLEADER包給Follower。

然後等Follower回覆第一個ACK包。收到ACK之後呼叫Leader的waitForNewLeaderAck方法告訴Leader該Follower已經完成同步。

當Leader收到足夠多的waitForNewLeaderAck方法呼叫後(通常超過半數),知道大部分Follower已經註冊到本Leader上來了,這時候Leader才能確保正式發揮