1. 程式人生 > >zookeeper原始碼淺析(二)之Leader選擇

zookeeper原始碼淺析(二)之Leader選擇

1.入口函式QuorumPeerMain主執行緒啟動 Quorumpeermain.runfromconfig()程式碼
  1. public void runFromConfig(QuorumPeerConfig config) throws IOException {    
  2.       ......    
  3.       
  4.       LOG.info("Starting quorum peer"
    );    
  5.       try {    
  6.         //對client提供讀寫的server,一般是2181埠    
  7.           ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();    
  8.           cnxnFactory.configure(config.getClientPortAddress(),    
  9.                                 config.getMaxClientCnxns());    
  10.         //zk的邏輯主執行緒,負責選舉,投票等    
  11.           quorumPeer = new QuorumPeer();    
  12.           quorumPeer.setClientPortAddress(config.getClientPortAddress());    
  13.           quorumPeer.setTxnFactory(new FileTxnSnapLog(    
  14.                       new File(config.getDataLogDir()),    
  15.                       new File(config.getDataDir())));    
  16.         //叢集機器地址    
  17.           quorumPeer.setQuorumPeers(config.getServers());    
  18.           quorumPeer.setElectionType(config.getElectionAlg());    
  19.         //本機的叢集編號    
  20.           quorumPeer.setMyid(config.getServerId());    
  21.           quorumPeer.setTickTime(config.getTickTime());    
  22.           quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());    
  23.           quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());    
  24.           quorumPeer.setInitLimit(config.getInitLimit());    
  25.           quorumPeer.setSyncLimit(config.getSyncLimit());    
  26.         //投票決定方式,預設超過半數就通過    
  27.           quorumPeer.setQuorumVerifier(config.getQuorumVerifier());    
  28.           quorumPeer.setCnxnFactory(cnxnFactory);    
  29.           quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));    
  30.           quorumPeer.setLearnerType(config.getPeerType());    
  31.         //啟動主執行緒    
  32.           quorumPeer.start();    
  33.           quorumPeer.join();    
  34.       } catch (InterruptedException e) {    
  35.           // warn, but generally this is ok    
  36.           LOG.warn("Quorum Peer interrupted", e);    
  37.       }    
  38.     }    

 2.QuorumPeer複寫Thread.start方法,啟動

Quorumpeer.start()程式碼
  1.  @Override    
  2.    public synchronized void start() {    
  3. //恢復DB,從zxid中回覆epoch變數,代表投票輪數    
  4.        loadDataBase();    
  5. //啟動針對client的IO執行緒    
  6.        cnxnFactory.start();    
  7. //選舉初始化,主要是從配置獲取選舉型別            
  8.        startLeaderElection();    
  9. //啟動    
  10.        super.start();    
  11.    }    

  3.通過QuorumPeer.loadDataBase()載入資料,初始化zkDb、currentEpoch、acceptedEpoch。

Quorumpeer.loaddatabase()程式碼
  1. private void loadDataBase() {  
  2.         try {  
  3.             //從本地檔案恢復db   
  4.             zkDb.loadDataBase();  
  5.   
  6.             // load the epochs  
  7.             //從最新的zxid恢復epoch變數,zxid64位,前32位是epoch值,後32位是zxid   
  8.             long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;  
  9.             long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);  
  10.             try {  
  11.                 currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);  
  12.             } catch(FileNotFoundException e) {  
  13.                 .....  
  14.             }  
  15.             if (epochOfZxid > currentEpoch) {  
  16.                 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);  
  17.             }  
  18.             try {  
  19.                 acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);  
  20.             } catch(FileNotFoundException e) {  
  21.                 .....  
  22.             }  
  23.             if (acceptedEpoch < currentEpoch) {  
  24.                 throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));  
  25.             }  
  26.         } catch(IOException ie) {  
  27.            .....  
  28.         }  
  29.     }  

 4.通過QuorumPeer.startLeaderElection()初始化electionAlg、currentVote。

Quorumpeer.startleaderelection()程式碼
  1. synchronized public void startLeaderElection() {  
  2.         try {  
  3.                 //先投自己    
  4.             currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());  
  5.         } catch(IOException e) {  
  6.             RuntimeException re = new RuntimeException(e.getMessage());  
  7.             re.setStackTrace(e.getStackTrace());  
  8.             throw re;  
  9.         }  
  10.         //從配置中拿自己的選舉地址    
  11.         for (QuorumServer p : getView().values()) {  
  12.             if (p.id == myid) {  
  13.                 myQuorumAddr = p.addr;  
  14.                 break;  
  15.             }  
  16.         }  
  17.         .....  
  18.         this.electionAlg = createElectionAlgorithm(electionType);  
  19.     }  

 5.獲取選舉演算法,預設為FastLeaderElection演算法。從3.4.0版本開始,zookeeper廢棄了0,1,2三種演算法。

Quorumpeer.createelectionalgorithm()程式碼
  1. protected Election createElectionAlgorithm(int electionAlgorithm){  
  2.         Election le=null;  
  3.                   
  4.         //TODO: use a factory rather than a switch  
  5.         switch (electionAlgorithm) {  
  6.         case 0:  
  7.             le = new LeaderElection(this);  
  8.             break;  
  9.         case 1:  
  10.             le = new AuthFastLeaderElection(this);  
  11.             break;  
  12.         case 2:  
  13.             le = new AuthFastLeaderElection(this, true);  
  14.             break;  
  15.         case 3:  
  16.             //leader選舉IO負責類    
  17.             qcm = new QuorumCnxManager(this);  
  18.             QuorumCnxManager.Listener listener = qcm.listener;  
  19.             //啟動已繫結配置的選舉埠的選舉執行緒,等待叢集其他機器連線    
  20.             //例如配置檔案中配置了server.1=hadoop1:2888:3888則server.1的選舉埠為3888,2888是其leader和其他伺服器交換資訊的埠  
  21.             //配置檔案詳見QuorumPeerConfig.parseProperties()方法  
  22.             if(listener != null){  
  23.                 listener.start();  
  24.                 //基於TCP的選舉演算法  
  25.                 le = new FastLeaderElection(this, qcm);  
  26.             } else {  
  27.                 LOG.error("Null listener when initializing cnx manager");  
  28.             }  
  29.             break;  
  30.         default:  
  31.             assert false;  
  32.         }  
  33.         return le;  
  34.     }  
6. FastLeaderElection初始化 Fastleaderelection.starter()程式碼
  1. private void starter(QuorumPeer self, QuorumCnxManager manager) {  
  2.        this.self = self;  
  3.        proposedLeader = -1;  
  4.        proposedZxid = -1;  
  5.        //業務層傳送佇列,業務物件ToSend    
  6.        sendqueue = new LinkedBlockingQueue<ToSend>();  
  7.     //業務層接受佇列,業務物件Notificataion    
  8.        recvqueue = new LinkedBlockingQueue<Notification>();  
  9.     //Messenger包含WorkerSender和WorkerReceiver執行緒  
  10.     //WorkerSender業務層傳送執行緒,將訊息發給IO負責類QuorumCnxManager  
  11.     //WorkerReceiver業務層接受執行緒,從IO負責類QuorumCnxManager接受訊息    
  12.        this.messenger = new Messenger(manager);  
  13.    }  
7.QuorumPeer執行緒啟動 Quorumpeer.run()程式碼
  1. @Override  
  2.     public void run() {  
  3.         .....  
  4.         try {  
  5.             /*  
  6.              * Main loop  
  7.              */  
  8.             while (running) {  
  9.                 switch (getPeerState()) {  
  10.                 //如果狀態是LOOKING,則進入選舉流程    
  11.                 case LOOKING:  
  12.                     LOG.info("LOOKING");  
  13.                         .....  
  14.                         try {  
  15.                             roZkMgr.start();  
  16.                             //選舉演算法開始選舉   
  17.                             setCurrentVote(makeLEStrategy().lookForLeader());  
  18.                         } catch (Exception e) {  
  19.                             LOG.warn("Unexpected exception",e);  
  20.                             setPeerState(ServerState.LOOKING);  
  21.                         } finally {  
  22.                             // If the thread is in the the grace period, interrupt  
  23.                             // to come out of waiting.  
  24.                             roZkMgr.interrupt();  
  25.                             roZk.shutdown();  
  26.                         }  
  27.                     } else {  
  28.                         try {  
  29.                             setCurrentVote(makeLEStrategy().lookForLeader());  
  30.                         } catch (Exception e) {  
  31.                             LOG.warn("Unexpected exception", e);  
  32.                             setPeerState(ServerState.LOOKING);  
  33.                         }  
  34.                     }  
  35.                     break;  
  36.                 //當選舉完成會改變相應的狀態,並建立相應的物件                      
  37.                 case OBSERVING:  
  38.                     try {  
  39.                         LOG.info("OBSERVING");  
  40.                         setObserver(makeObserver(logFactory));  
  41.                         observer.observeLeader();  
  42.                     } catch (Exception e) {  
  43.                         LOG.warn("Unexpected exception",e );                          
  44.                     } finally {  
  45.                         observer.shutdown();  
  46.                         setObserver(null);  
  47.                         setPeerState(ServerState.LOOKING);  
  48.                     }  
  49.                     break;  
  50.                 case FOLLOWING:  
  51.                     try {  
  52.                         LOG.info("FOLLOWING");  
  53.                         setFollower(makeFollower(logFactory));  
  54.                         follower.followLeader();  
  55.                     } catch (Exception e) {  
  56.                         LOG.warn("Unexpected exception",e);  
  57.                     } finally {  
  58.                         follower.shutdown();  
  59.                         setFollower(null);  
  60.                         setPeerState(ServerState.LOOKING);  
  61.                     }  
  62.                     break;  
  63.                 case LEADING:  
  64.                     LOG.info("LEADING");  
  65.                     try {  
  66.                         setLeader(makeLeader(logFactory));  
  67.                         leader.lead();  
  68.                         setLeader(null);  
  69.                     } catch (Exception e) {  
  70.                         LOG.warn("Unexpected exception",e);  
  71.                     } finally {  
  72.                         if (leader != null) {  
  73.                             leader.shutdown("Forcing shutdown");  
  74.                             setLeader(null);  
  75.                         }  
  76.                         setPeerState(ServerState.LOOKING);  
  77.                     }  
  78.                     break;  
  79.                 }  
  80.             }  
  81.         } finally {  
  82.           .....  
  83.         }  
  84.     }  

 8.FastLeaderElection的選舉流程

Fastleaderelection.lookforleader()程式碼
  1. public Vote lookForLeader() throws InterruptedException {  
  2.        .....  
  3.        try {  
  4.         //收到的投票    
  5.            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();  
  6.   
  7.            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();  
  8.   
  9.            int notTimeout = finalizeWait;  
  10.   
  11.            synchronized(this){  
  12.                logicalclock++;  
  13.             //先投給自己    
  14.                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());  
  15.            }  
  16.   
  17.            LOG.info("New election. My id =  " + self.getId() +  
  18.                    ", proposed zxid=0x" + Long.toHexString(proposedZxid));  
  19.         //傳送投票,包括髮給自己         
  20.            sendNotifications();  
  21.   
  22.            /*  
  23.             * Loop in which we exchange notifications until we find a leader  
  24.             */  
  25.            //主迴圈,直到選出leader    
  26.            while ((self.getPeerState() == ServerState.LOOKING) &&  
  27.                    (!stop)){  
  28.                /*  
  29.                 * Remove next notification from queue, times out after 2 times  
  30.                 * the termination time  
  31.                 */  
  32.             //從IO執行緒裡拿到投票訊息,自己的投票也在這裡處理     
  33.                Notification n = recvqueue.poll(notTimeout,  
  34.                        TimeUnit.MILLISECONDS);  
  35.   
  36.                /*  
  37.                 * Sends more notifications if haven't received enough.  
  38.                 * Otherwise processes new notification.  
  39.                 */  
  40.             //如果空閒     
  41.                if(n == null){  
  42.                 //訊息發完了,繼續傳送,一直到選出leader為止    
  43.                    if(manager.haveDelivered()){  
  44.                        sendNotifications();  
  45.                    } else {  
  46.                     //訊息還在,可能其他server還沒啟動,嘗試連線     
  47.                        manager.connectAll();  
  48.                    }  
  49.   
  50.                    /*  
  51.                     * Exponential backoff  
  52.                     */  
  53.                  //延長超時時間    
  54.                    int tmpTimeOut = notTimeout*2;  
  55.                    notTimeout = (tmpTimeOut < maxNotificationInterval?  
  56.                            tmpTimeOut : maxNotificationInterval);  
  57.                    LOG.info("Notification time out: " + notTimeout);  
  58.                }  
  59.             //收到了投票訊息    
  60.                else if(self.getVotingView().containsKey(n.sid)) {  
  61.                    /*  
  62.                     * Only proceed if the vote comes from a replica in the  
  63.                     * voting view.  
  64.                     */  
  65.                    switch (n.state) {  
  66.                 //LOOKING訊息,則    
  67.                    case LOOKING:  
  68.                     //檢查下收到的這張選票是否可以勝出,依次比較選舉輪數epoch,事務zxid,伺服器編號server id  
  69.                        // If notification > current, replace and send messages out  
  70.                        if (n.electionEpoch > logicalclock) {  
  71.                            logicalclock = n.electionEpoch;  
  72.                            recvset.clear();  
  73.                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,  
  74.                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {  
  75.                                updateProposal(n.leader, n.zxid, n.peerEpoch);  
  76.                            } else {  
  77.                                updateProposal(getInitId(),  
  78.                                        getInitLastLoggedZxid(),  
  79.                                        getPeerEpoch());  
  80.                            }  
  81.                            sendNotifications();  
  82.                        } else if (n.electionEpoch < logicalclock) {  
  83.                            if(LOG.isDebugEnabled()){  
  84.                                LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"  
  85.                                        + Long.toHexString(n.electionEpoch)  
  86.                                        + ", logicalclock=0x" + Long.toHexString(logicalclock));  
  87.                            }  
  88.                            break;  
  89.                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,  
  90.                                proposedLeader, proposedZxid, proposedEpoch)) {  
  91.                         //勝出了,就把自己的投票修改為對方的,然後廣播訊息    
  92.                            updateProposal(n.leader, n.zxid, n.peerEpoch);  
  93.                            sendNotifications();  
  94.                        }  
  95.                        .....  
  96.                     //新增到本機投票集合,用來做選舉終結判斷   
  97.                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));  
  98.                     //選舉是否結束,預設演算法是超過半數server同意    
  99.                        if (termPredicate(recvset,  
  100.                                new Vote(proposedLeader, proposedZxid,  
  101.                                        logicalclock, proposedEpoch))) {  
  102.   
  103.                            // Verify if there is any change in the proposed leader  
  104.                            while((n = recvqueue.poll(finalizeWait,  
  105.                                    TimeUnit.MILLISECONDS)) != null){  
  106.                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,  
  107.                                        proposedLeader, proposedZxid, proposedEpoch)){  
  108.                                    recvqueue.put(n);  
  109.                                    break;  
  110.                                }  
  111.                            }  
  112.   
  113.                            /*  
  114.                             * This predicate is true once we don't read any new  
  115.                             * relevant message from the reception queue  
  116.                             */  
  117.                            if (n == null) {  
  118.                             //修改狀態,LEADING or FOLLOWING    
  119.                                self.setPeerState((proposedLeader == self.getId()) ?  
  120.                                        ServerState.LEADING: learningState());  
  121.                             //返回最終的選票結果   
  122.                                Vote endVote = new Vote(proposedLeader,  
  123.                                        proposedZxid, proposedEpoch);  
  124.                                leaveInstance(endVote);  
  125.                                return endVote;  
  126.                            }  
  127.                        }  
  128.                        break;  
  129.                 //OBSERVING機器不引數選舉        
  130.         &n