zookeeper原始碼淺析(一)
1.基本架構
2.ZAB協議
ZooKeeper並沒有完全採用Paxos演算法,而是使用了一種稱為ZooKeeper Atomic Broadcast(ZAB,zookeeper原子訊息廣播協議)的協議作為其資料一致性的核心演算法。
2.1選擇Leader需用半數通過才選舉成成功,同時叢集中已經有過半的機器與該Leader伺服器完成狀態同步(資料同步)才能開始服務。
2.2所有事務請求必須由一個全域性唯一的伺服器來協調處理,這樣的伺服器稱為Leader伺服器,而餘下的其他伺服器則成為Follower伺服器。Leader伺服器負責將一個客戶端事務請求轉換成一個事務Proposal(提議),並將該Proposal分發給叢集中所有的Follower伺服器。之後Leader伺服器需要等待所有Follower伺服器的反饋,一旦超過半數的Follower伺服器進行了正確反饋後,那麼Leader就會再次向所有的Follower伺服器分發Commit訊息,要求其將前一個Proposal進行提交。
3.Leader和Follower啟動過程
4.請求處理
4.1請求處理鏈
4.1.1leader請求處理鏈
4.1.2follower請求處理鏈
4.2處理流程
以creater服務端為leade為例流程如下
FollowerZooKeeperServer與LeaderZooKeeperServer處理流程的差別是FollowerRequestProcessor會將事務請求轉發給leader,SendAckRequestProcessor向leader返回事務提議正確的響應,其他的處理鏈都是一致的。SendAckRequestProcessor和AckRequestProcessor的區別是AckRequestProcessor是leader的本地呼叫。FollowerRequestProcessor的事務請求的程式碼如下
- public void run() {
- try {
- while (!finished) {
- Request request = queuedRequests.take();
- if (LOG.isTraceEnabled()) {
- ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
- 'F', request, "");
- }
- if (request == Request.requestOfDeath) {
- break;
- }
- // We want to queue the request to be processed before we submit
- // the request to the leader so that we are ready to receive
- // the response
- nextProcessor.processRequest(request);
- // We now ship the request to the leader. As with all
- // other quorum operations, sync also follows this code
- // path, but different from others, we need to keep track
- // of the sync operations this follower has pending, so we
- // add it to pendingSyncs.
- switch (request.type) {
- case OpCode.sync:
- zks.pendingSyncs.add(request);
- zks.getFollower().request(request);
- break;
- case OpCode.create:
- case OpCode.delete:
- case OpCode.setData:
- case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
- case OpCode.multi:
- zks.getFollower().request(request);
- break;
- }
- }
- } catch (Exception e) {
- LOG.error("Unexpected exception causing exit", e);
- }
- LOG.info("FollowerRequestProcessor exited loop!");
- }
5.資料同步
ZooKeeper叢集資料同步分為4類,分別為直接差異化同步(DIFF)、先回滾再差異化同步(TRUNC+DIFF)、回滾同步(TRUNC)和全量同步(SNAP)。在同步之前,leader伺服器先對peerLastZxid(該leader伺服器最好處理的ZXID)、minCommittedLog(leader伺服器提議快取佇列committedLog中的最小ZXID)、maxCommittedLog(leader伺服器提議快取佇列committedLog中的最大ZXID)進行初始化,然後通過這3個ZXID值進行判斷同步型別,並進行同步。程式碼見LearnerHandler的run方法:
Java程式碼- .....
- long peerLastZxid;
- StateSummary ss = null;
- long zxid = qp.getZxid();
- long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
- if (this.getVersion() < 0x10000) {
- // we are going to have to extrapolate the epoch information
- long epoch = ZxidUtils.getEpochFromZxid(zxid);
- ss = new StateSummary(epoch, zxid);
- // fake the message
- leader.waitForEpochAck(this.getSid(), ss);
- } else {
- byte ver[] = new byte[4];
- ByteBuffer.wrap(ver).putInt(0x10000);
- QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
- oa.writeRecord(newEpochPacket, "packet");
- bufferedOutput.flush();
- QuorumPacket ackEpochPacket = new QuorumPacket();
- ia.readRecord(ackEpochPacket, "packet");
- if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
- LOG.error(ackEpochPacket.toString()
- + " is not ACKEPOCH");
- return;
- ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
- ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
- leader.waitForEpochAck(this.getSid(), ss);
- }
- peerLastZxid = ss.getLastZxid();
- /* the default to send to the follower */
- int packetToSend = Leader.SNAP;
- long zxidToSend = 0;
- long leaderLastZxid = 0;
- /** the packets that the follower needs to get updates from **/
- long updates = peerLastZxid;
- /* we are sending the diff check if we have proposals in memory to be able to
- * send a diff to the
- */
- ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
- ReadLock rl = lock.readLock();
- try {
- rl.lock();
- final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
- final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
- LOG.info("Synchronizing with Follower sid: " + sid
- +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
- +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
- +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
- LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
- if (proposals.size() != 0) {
- LOG.debug("proposal size is {}", proposals.size());
- if ((maxCommittedLog >= peerLastZxid)
- && (minCommittedLog <= peerLastZxid)) {
- LOG.debug("Sending proposals to follower");
- // as we look through proposals, this variable keeps track of previous
- // proposal Id.
- long prevProposalZxid = minCommittedLog;
- // Keep track of whether we are about to send the first packet.
- // Before sending the first packet, we have to tell the learner
- // whether to expect a trunc or a diff
- boolean firstPacket=true;
- for (Proposal propose: proposals) {
- // skip the proposals the peer already has
- if (propose.packet.getZxid() <= peerLastZxid) {
- prevProposalZxid = propose.packet.getZxid();
- continue;
- } else {
- // If we are sending the first packet, figure out whether to trunc
- // in case the follower has some proposals that the leader doesn't
- if (firstPacket) {
- firstPacket = false;
- // Does the peer have some proposals that the leader hasn't seen yet
- if (prevProposalZxid < peerLastZxid) {
- // send a trunc message before sending the diff
- packetToSend = Leader.TRUNC;
- LOG.info("Sending TRUNC");
- zxidToSend = prevProposalZxid;
- updates = zxidToSend;
- }
- else {
- // Just send the diff
- packetToSend = Leader.DIFF;
- LOG.info("Sending diff");
- zxidToSend = maxCommittedLog;
- }
- }
- queuePacket(propose.packet);
- QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
- null, null);
- queuePacket(qcommit);
- }
- }
- } else if (peerLastZxid > maxCommittedLog) {
- LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
- Long.toHexString(maxCommittedLog),
- Long.toHexString(updates));
- packetToSend = Leader.TRUNC;
- zxidToSend = maxCommittedLog;
- updates = zxidToSend;
- } else {
- LOG.warn("Unhandled proposal scenario");
- }
- } else {
- // just let the state transfer happen
- LOG.debug("proposals is empty");
- }
- leaderLastZxid = leader.startForwarding(this, updates);
- if (peerLastZxid == leaderLastZxid) {
- LOG.debug("Leader and follower are in sync, sending empty diff. zxid=0x{}",
- Long.toHexString(leaderLastZxid));
- // We are in sync so we'll do an empty diff
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
- }
- } finally {
- rl.unlock();
- }
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- ZxidUtils.makeZxid(newEpoch, 0), null, null);
- if (getVersion() < 0x10000) {
- oa.writeRecord(newLeaderQP, "packet");
- } else {
- queuedPackets.add(newLeaderQP);
- }
- bufferedOutput.flush();
- //Need to set the zxidToSend to the latest zxid
- if (packetToSend == Leader.SNAP) {
- zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
- }
- oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
- bufferedOutput.flush();
- /* if we are not truncating or sending a diff just send a snapshot */
- if (packetToSend == Leader.SNAP) {
- LOG.info("Sending snapshot last zxid of peer is 0x"
- + Long.toHexString(peerLastZxid) + " "
- + " zxid of leader is 0x"
- + Long.toHexString(leaderLastZxid)
- + "sent zxid of db as 0x"
- + Long.toHexString(zxidToSend));
- // Dump data to peer
- leader.zk.getZKDatabase().serializeSnapshot(oa);
- oa.writeString("BenWasHere", "signature");
- }
- bufferedOutput.flush();
- // Start sending packets
- new Thread() {
- public void run() {
- Thread.currentThread().setName(
- "Sender-" + sock.getRemoteSocketAddress());
- try {
- sendPackets();
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption",e);
- }
- }
- }.start();
- /*
- * Have to wait for the first ACK, wait until
- * the leader is ready, and only then we can
- * start processing messages.
- */
- qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
- if(qp.getType() != Leader.ACK){
- LOG.error("Next packet was supposed to be an ACK");
- return;
- }
- LOG.info("Received NEWLEADER-ACK message from " + getSid());
- leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());
- .....
6.watch
6.1服務端
在請求處理鏈的最後端FinalRequestProcessor的processRequest()中會判斷是否需要處理watch。
註冊watch會呼叫DataTree.getData()方法將當前的ServerCnxn和path註冊到dataWatches或者childWatches。以getData為例程式碼如下
Java程式碼- case OpCode.getData: {
- ...
- byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
- getDataRequest.getWatch() ? cnxn : null);
- rsp = new GetDataResponse(b, stat);
- break;
觸發watch會在事務儲存資料時呼叫DataTree.processTxn時觸發,並通過呼叫WatchManager.triggerWatch()觸發當前的ServerCnxn的process方法的呼叫返回客戶端。以create為例程式碼如下:
Datatree.createnode()程式碼- public String createNode(String path, byte data[], List<ACL> acl,
- long ephemeralOwner, int parentCVersion, long zxid, long time)
- throws KeeperException.NoNodeException,
- KeeperException.NodeExistsException {
- .....
- dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
- childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
- Event.EventType.NodeChildrenChanged);
- return path;
- }
- public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
- WatchedEvent e = new WatchedEvent(type,
- KeeperState.SyncConnected, path);
- HashSet<Watcher> watchers;
- synchronized (this) {
- //watch觸發一次後就會將其移除
- watchers = watchTable.remove(path);
- if (watchers == null || watchers.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "No watchers for " + path);
- }
- return null;
- }
- for (Watcher w : watchers) {
- HashSet<String> paths = watch2Paths.get(w);
- if (paths != null) {
- paths.remove(path);
- }
- }
- }
- for (Watcher w : watchers) {
- if (supress != null && supress.contains(w)) {
- continue;
- }
- w.process(e);
- }
- return watchers;
- }
- @Override
- synchronized public void process(WatchedEvent event) {
- ReplyHeader h = new ReplyHeader(-1, -1L, 0);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "Deliver event " + event + " to 0x"
- + Long.toHexString(this.sessionId)
- + " through " + this);
- }
- // Convert WatchedEvent to a type that can be sent over the wire
- WatcherEvent e = event.getWrapper();
- sendResponse(h, e, "notification");
- }
6.2客戶端
註冊watch後,客戶端會將當前客戶端的請求設定為使用watch監聽,同時封裝一個DataWatchRegistration儲存路徑和watch的對應關係。以getData為例程式碼如下:
Zookeeper.getdata()程式碼- public byte[] getData(final String path, Watcher watcher, Stat stat)
- throws KeeperException, InterruptedException
- {
- .....
- // the watch contains the un-chroot path
- WatchRegistration wcb = null;
- if (watcher != null) {
- wcb = new DataWatchRegistration(watcher, clientPath);
- }
- ......
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.getData);
- GetDataRequest request = new GetDataRequest();
- request.setPath(serverPath);
- request.setWatch(watcher != null);
- GetDataResponse response = new GetDataResponse();
- ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
- .....
- }
- private void finishPacket(Packet p) {
- if (p.watchRegistration != null) {
- p.watchRegistration.register(p.replyHeader.getErr());
- }
- if (p.cb == null) {
- synchronized (p) {
- p.finished = true;
- p.notifyAll();
- }
- } else {
- p.finished = true;
- eventThread.queuePacket(p);
- }
- }
- public void createBB() {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
- boa.writeInt(-1, "len"); // We'll fill this in later
- if (requestHeader != null) {
- requestHeader.serialize(boa, "header");
- }
- if (request instanceof ConnectRequest) {
- request.serialize(boa, "connect");
- // append "am-I-allowed-to-be-readonly" flag
- boa.writeBool(readOnly, "readOnly");
- } else if (request != null) {
- request.serialize(boa, "request");
- }
- baos.close();
- this.bb = ByteBuffer.wrap(baos.toByteArray());
- this.bb.putInt(this.bb.capacity() - 4);
- this.bb.rewind();
- } catch (IOException e) {
- LOG.warn("Ignoring unexpected exception", e);
- }
- }
- void readResponse(ByteBuffer incomingBuffer) throws IOException {
- .....
- if (replyHdr.getXid() == -1) {
- // -1 means notification
- if (LOG.isDebugEnabled()) {
- LOG.debug("Got notification sessionid:0x"
- + Long.toHexString(sessionId));
- }
- WatcherEvent event = new WatcherEvent();
- event.deserialize(bbia, "response");