1. 程式人生 > >zookeeper原始碼分析之一服務端啟動過程

zookeeper原始碼分析之一服務端啟動過程

zookeeper簡介

     zookeeper是為分散式應用提供分散式協作服務的開源軟體。它提供了一組簡單的原子操作,分散式應用可以基於這些原子操作來實現更高層次的同步服務,配置維護,組管理和命名。zookeeper的設計使基於它的程式設計非常容易,若我們熟悉目錄樹結構的檔案系統,也會很容易使用zookeeper的資料模型樣式。它執行在java上,有java和c的客戶端。

  協作服務因難於獲取正確而臭名遠揚,他們特別易於出錯如競爭條件和死鎖。zookeeper的動機是減輕分散式應用中從零開始實現協作服務的壓力。

zookeeper的特點

1.簡單:zookeeper執行分散式進行通過一個共享的層次名稱空間來進行協作,該名稱空間的組織類似於標準的檔案系統。名稱空間包括資料註冊器(稱之為znode),在zookeeper看來,這類似於檔案和目錄。與典型的檔案系統設計用來儲存不同的是,zookeeper資料是存放在記憶體中,這意味著zookeeper可以實現很高的吞吐量和低延遲。

 ZooKeeper 實現在高效能,高可用性,嚴格有序的訪問方面有很大的優勢。在效能方面的優勢使它可以應用在大型的的分散式系統。在可靠性方面,避免單點故障。嚴格的順序訪問使它在客戶端可以實現複雜的同步原語。

2. 可複製:類似於分散式程序的協作,zookeeper本身很容易在一組主機(稱之為集合)中實現複製。zookeeper服務示意圖:

  組成ZooKeeper服務的一組伺服器都必須知道對方的。它們儲存了記憶體映像的狀態,以及在持久儲存中的事務日誌和快照。只要大部分的伺服器可用,ZooKeeper服務將可用。
客戶端連線到一臺ZooKeeper伺服器。客戶端維護一個TCP連線,通過它傳送請求,得到響應,得到監視事件,併發送心跳。如果TCP連線到伺服器中斷,客戶端可以連線到不同的伺服器。

3. 有序:ZooKeeper給每次更新使用數字打標記,它反映了所有zookeeper事務的順序。隨後的操作可以使用這些順序來實現更高級別的抽象,如同步原語。

4.快速:它特別快,在“讀為主”的工作中,ZooKeeper 應用程式執行在數千臺機器,它在讀遠比寫更多的時候(在10:1的比例)表現的最好。

資料模型與層次名稱空間

ZooKeeper提供的名稱空間更像是一個標準的檔案系統。一個名字是一個由一個(或)分隔的路徑元素的序列。zookeeper名稱空間的每個節點由路徑來標示。

節點和臨時節點

  不像標準的檔案系統,在ZooKeeper 名稱空間中每個節點都有與它相關的資料以及子節點。它就像這樣一個檔案系統,它允許一個檔案也可以是一個目錄。(zookeeper是用來儲存協作資料:狀態資訊,配置,位置資訊等,因此,儲存在每個節點的資料通常是很小的,在位元組到千位元組範圍。)我們使用術語znode來表明我們談論的是zookeeper資料節點。
  znodes儲存一個數據結構,該資料結構包括資料變化的版本號和時間戳,ACL的變化,這些資訊允許快取驗證和協作更新。一個znode的資料的每次變化,版本號的增加。例如,每當客戶檢索資料時,它也接收到資料的版本。
  在一個名稱空間中的每個節點儲存的資料的讀寫都是原子性的。讀獲取一個Znode所有的資料位元組;寫替換所有的資料。每個節點都有一個訪問控制列表(ACL),限制誰可以做什麼。
  zookeeper也有臨時節點的概念。這些znodes只要建立znode的會話是活躍的,它就存在的。當會話結束時,這些znode被刪除。

條件更新與監控

ZooKeeper支援監控的概念。客戶端可以在一個znode上設定一個監控。當znode發生變化時會觸發或者移除監控。當監控觸發時,客戶端接收到一個報文,表明znode發生了變化。若客戶端和一個zookeeper伺服器的連線損壞時,客戶端接收到一個本地通知。

保障

ZooKeeper非常快速和簡單. 雖然它的目標是為建設更為複雜的服務,例如同步,它提供了一系列的保證。這些是:

  • 順序一致性----客戶端的更新將被應用於它們被髮送的命令中。

  • 原子性-- - 更新要麼成功要麼失敗,不存在部分成功或者部分失敗.

  • 單系統映像 ---- 不管連線到哪臺伺服器,客戶端看到相同的服務檢視.

  • 可靠性---- 一旦一個更新發生,直到下次一個客戶端重新了更新,否則從更新的時間後都會保持。

  • 及時性--- - 在一定時間範圍內保證系統的客戶檢視是最新的.

簡單api

zookeeper設計目標之一是提供一個簡單的程式設計介面,因此,它只支援下面這些操作:

create

在節點樹上某個位置上建立一個新的節點。

delete

刪除一個節點

exists

測試某位置的節點是否存在

get data

從一個節點讀取資料

set data

向一個節點寫入資料

get children

檢索一個節點的一組子節點

sync

等待資料傳播至一致。

實現

zookeeper元件顯示了zookeeper服務的高階元件。除了request processor,組成zookeeper服務的每個伺服器複製它的每個元件的copy。

zookeeper元件

replicated database是一個包含整個資料數的記憶體資料庫. 為了可復原,更新被寫到磁碟上,寫操作在應用到記憶體資料庫之前,先序列化到磁碟。

每個zookeeper伺服器給所有的客戶端提供服務。客戶端恰恰連線到一個伺服器來提交請求。讀請求由每個伺服器資料庫的本地複製提供服務。寫請求改變了服務的狀態,由request processor來處理。

作為通訊協議的一部分,所有客戶端的寫請求由一個單獨的伺服器處理,這個伺服器是zookeeper的leader伺服器,其餘的zookeeper伺服器叫做follower,follower從leader接收訊息並達成訊息傳輸。訊息層在失敗後替換leader並同步到連線到leader所有的follower。

ZooKeeper使用自定義的原子訊息協議. 因訊息層是原子性的, ZooKeeper 可以保證本地複製不會衝突. 當leader接收到一個寫請求,當寫操作應用到系統時,leader計算出系統的狀態,並轉化成一個捕捉新狀態的事務.

 zookeeper啟動

  服務端啟動

bin/zkServer.sh start

 其中,啟動命令如下:

start)
    echo  -n "Starting zookeeper ... "
    if [ -f "$ZOOPIDFILE" ]; then
      if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then
         echo $command already running as process `cat "$ZOOPIDFILE"`.
         exit 0
      fi
    fi
    nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \
    "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
    -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \
    -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null &
    if [ $? -eq 0 ]
    then
      if /bin/echo -n $! > "$ZOOPIDFILE"
      then
        sleep 1
        pid=$(cat "${ZOOPIDFILE}")
        if ps -p "${pid}" > /dev/null 2>&1; then
          echo STARTED
        else
          echo FAILED TO START
          exit 1
        fi
      else
        echo FAILED TO WRITE PID
        exit 1
      fi
    else
      echo SERVER DID NOT START
      exit 1
    fi
    ;;

其中:

ZOOMAIN 是啟動程式的入口,其類為:

org.apache.zookeeper.server.quorum.QuorumPeerMain

它的啟動方法為:

 /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (DatadirException e) {
            LOG.error("Unable to access datadir, exiting abnormally", e);
            System.err.println("Unable to access datadir, exiting abnormally");
            System.exit(3);
        } catch (AdminServerException e) {
            LOG.error("Unable to start AdminServer, exiting abnormally", e);
            System.err.println("Unable to start AdminServer, exiting abnormally");
            System.exit(4);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }

呼叫初始化方法及run方法:

    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

 上述程式碼主要分3部分:

 1. 解析配置檔案,預設的配置檔案為上一級目錄

config/zookeeper.properties或者config/zookeeper.cfg
  /**
     * Parse a ZooKeeper configuration file
     * @param path the patch of the configuration file
     * @throws ConfigException error processing configuration
     */
    public void parse(String path) throws ConfigException {
        LOG.info("Reading configuration from: " + path);
       
        try {
            File configFile = (new VerifyingFileFactory.Builder(LOG)
                .warnForRelativePath()
                .failForNonExistingPath()
                .build()).create(path);
                
            Properties cfg = new Properties();
            FileInputStream in = new FileInputStream(configFile);
            try {
                cfg.load(in);
                configFileStr = path;
            } finally {
                in.close();
            }
            
            parseProperties(cfg);
        } catch (IOException e) {
            throw new ConfigException("Error processing " + path, e);
        } catch (IllegalArgumentException e) {
            throw new ConfigException("Error processing " + path, e);
        }   
        
        if (dynamicConfigFileStr!=null) {
           try {           
               Properties dynamicCfg = new Properties();
               FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
               try {
                   dynamicCfg.load(inConfig);
                   if (dynamicCfg.getProperty("version") != null) {
                       throw new ConfigException("dynamic file shouldn't have version inside");
                   }

                   String version = getVersionFromFilename(dynamicConfigFileStr);
                   // If there isn't any version associated with the filename,
                   // the default version is 0.
                   if (version != null) {
                       dynamicCfg.setProperty("version", version);
                   }
               } finally {
                   inConfig.close();
               }
               setupQuorumPeerConfig(dynamicCfg, false);

           } catch (IOException e) {
               throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
           } catch (IllegalArgumentException e) {
               throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
           }        
           File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix);
           if (nextDynamicConfigFile.exists()) {
               try {           
                   Properties dynamicConfigNextCfg = new Properties();
                   FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);       
                   try {
                       dynamicConfigNextCfg.load(inConfigNext);
                   } finally {
                       inConfigNext.close();
                   }
                   boolean isHierarchical = false;
                   for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
                       String key = entry.getKey().toString().trim();  
                       if (key.startsWith("group") || key.startsWith("weight")) {
                           isHierarchical = true;
                           break;
                       }
                   }
                   lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);
               } catch (IOException e) {
                   LOG.warn("NextQuorumVerifier is initiated to null");
               }
           }
        }
    }

2. 啟動安排清除任務

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

呼叫start方法:

/**
     * Validates the purge configuration and schedules the purge task. Purge
     * task keeps the most recent <code>snapRetainCount</code> number of
     * snapshots and deletes the remaining for every <code>purgeInterval</code>
     * hour(s).
     * <p>
     * <code>purgeInterval</code> of <code>0</code> or
     * <code>negative integer</code> will not schedule the purge task.
     * </p>
     * 
     * @see PurgeTxnLog#purge(File, File, int)
     */
    public void start() {
        if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
            LOG.warn("Purge task is already running.");
            return;
        }
        // Don't schedule the purge task with zero or negative purge interval.
        if (purgeInterval <= 0) {
            LOG.info("Purge task is not scheduled.");
            return;
        }

        timer = new Timer("PurgeTask", true);
        TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);
        timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));

        purgeTaskStatus = PurgeTaskStatus.STARTED;
    }

 從上面程式碼可以看到,清除工作啟動了一個定時器timer,PurgeTask繼承實現了TimeTask(一個可以被定時器安排執行一次或者多次的task),PurgeTask的實現如下:

    static class PurgeTask extends TimerTask {
        private File logsDir;
        private File snapsDir;
        private int snapRetainCount;

        public PurgeTask(File dataDir, File snapDir, int count) {
            logsDir = dataDir;
            snapsDir = snapDir;
            snapRetainCount = count;
        }

        @Override
        public void run() {
            LOG.info("Purge task started.");
            try {
                PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount);
            } catch (Exception e) {
                LOG.error("Error occured while purging.", e);
            }
            LOG.info("Purge task completed.");
        }
    }

呼叫purge方法:

/**
     * Purges the snapshot and logs keeping the last num snapshots and the
     * corresponding logs. If logs are rolling or a new snapshot is created
     * during this process, these newest N snapshots or any data logs will be
     * excluded from current purging cycle.
     *
     * @param dataDir the dir that has the logs
     * @param snapDir the dir that has the snapshots
     * @param num the number of snapshots to keep
     * @throws IOException
     */
    public static void purge(File dataDir, File snapDir, int num) throws IOException {
        if (num < 3) {
            throw new IllegalArgumentException(COUNT_ERR_MSG);
        }

        FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir);

        List<File> snaps = txnLog.findNRecentSnapshots(num);
        retainNRecentSnapshots(txnLog, snaps);
    }

先獲取日誌檔案和快照,然後呼叫retainNRecentSnapshots方法處理:

 static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) {
        // found any valid recent snapshots?
        if (snaps.size() == 0)
            return;
        File snapShot = snaps.get(snaps.size() -1);
        final long leastZxidToBeRetain = Util.getZxidFromName(
                snapShot.getName(), PREFIX_SNAPSHOT);

        class MyFileFilter implements FileFilter{
            private final String prefix;
            MyFileFilter(String prefix){
                this.prefix=prefix;
            }
            public boolean accept(File f){
                if(!f.getName().startsWith(prefix + "."))
                    return false;
                long fZxid = Util.getZxidFromName(f.getName(), prefix);
                if (fZxid >= leastZxidToBeRetain) {
                    return false;
                }
                return true;
            }
        }
        // add all non-excluded log files
        List<File> files = new ArrayList<File>(Arrays.asList(txnLog
                .getDataDir().listFiles(new MyFileFilter(PREFIX_LOG))));
        // add all non-excluded snapshot files to the deletion list
        files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles(
                new MyFileFilter(PREFIX_SNAPSHOT))));
        // remove the old files
        for(File f: files)
        {
            System.out.println("Removing file: "+
                DateFormat.getDateTimeInstance().format(f.lastModified())+
                "\t"+f.getPath());
            if(!f.delete()){
                System.err.println("Failed to remove "+f.getPath());
            }
        }

    }

3. 啟動zookeeper 伺服器

    3.1 啟動單機

   /*
     * Start up the ZooKeeper server.
     *
     * @param args the configfile or the port datadir [ticktime]
     */
    public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (DatadirException e) {
            LOG.error("Unable to access datadir, exiting abnormally", e);
            System.err.println("Unable to access datadir, exiting abnormally");
            System.exit(3);
        } catch (AdminServerException e) {
            LOG.error("Unable to start AdminServer, exiting abnormally", e);
            System.err.println("Unable to start AdminServer, exiting abnormally");
            System.exit(4);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }

呼叫方法:

   protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        try {
            ManagedUtil.registerLog4jMBeans();
        } catch (JMException e) {
            LOG.warn("Unable to register log4j JMX control", e);
        }

        ServerConfig config = new ServerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        } else {
            config.parse(args);
        }

        runFromConfig(config);
    }

啟動過程:

 /**
     * Run from a ServerConfig.
     * @param config ServerConfig to use.
     * @throws IOException
     * @throws AdminServerException
     */
    public void runFromConfig(ServerConfig config) throws IOException, AdminServerException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
            ZooKeeperServer zkServer = new ZooKeeperServer( txnLog,
                    config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);

            // Start Admin server
            adminServer = AdminServerFactory.createAdminServer();
            adminServer.setZooKeeperServer(zkServer);
            adminServer.start();

            boolean needStartZKServer = true;
            if (config.getClientPortAddress() != null) {
                cnxnFactory = ServerCnxnFactory.createFactory();
                cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
                cnxnFactory.startup(zkServer);
                // zkServer has been started. So we don't need to start it again in secureCnxnFactory.
                needStartZKServer = false;
            }
            if (config.getSecureClientPortAddress() != null) {
                secureCnxnFactory = ServerCnxnFactory.createFactory();
                secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
                secureCnxnFactory.startup(zkServer, needStartZKServer);
            }

            containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
                    Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
                    Integer.getInteger("znode.container.maxPerMinute", 10000)
            );
            containerManager.start();

            if (cnxnFactory != null) {
                cnxnFactory.join();
            }
            if (secureCnxnFactory != null) {
                secureCnxnFactory.join();
            }

            if (zkServer.isRunning()) {
                zkServer.shutdown();
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }
cnxnFactory.startup(zkServer);[NettyServerCnxnFactory]
    @Override
    public void startup(ZooKeeperServer zks, boolean startServer)
            throws IOException, InterruptedException {
        start();
        setZooKeeperServer(zks);
        if (startServer) {
            zks.startdata();
            zks.startup();
        }
    }
    public synchronized void startup() {
        if (sessionTracker == null) {
            createSessionTracker();
        }
        startSessionTracker();
        setupRequestProcessors();

        registerJMX();

        state = State.RUNNING;
        notifyAll();
    }

    protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this,
                finalProcessor);
        ((SyncRequestProcessor)syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
        ((PrepRequestProcessor)firstProcessor).start();
    }

3.2 叢集啟動

 public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;

          if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }

          quorumPeer = new QuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          
          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

從上述程式碼可以看出,QuorumPeer的start()方法和join()方法是主流程。

QuorumPeer繼承了ZooKeeperThread,ZooKeeperThread繼承自Thread,故QuorumPeer間接繼承了Thread。

    @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        startServerCnxnFactory();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }

3.2.1. 啟動時先從記憶體資料庫中恢復資料

 private void loadDataBase() {
        try {
            zkDb.loadDataBase();

            // load the epochs
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                currentEpoch = epochOfZxid;
                LOG.info(CURRENT_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        currentEpoch);
                writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
            }
            if (epochOfZxid > currentEpoch) {
                throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
            }
            try {
                acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
                // pick a reasonable epoch number
                // this should only happen once when moving to a
                // new code version
                acceptedEpoch = epochOfZxid;
                LOG.info(ACCEPTED_EPOCH_FILENAME
                        + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                        acceptedEpoch);
                writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);
            }
            if (acceptedEpoch < currentEpoch) {
                throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch));
            }
        } catch(IOException ie) {
            LOG.error("Unable to load database on disk", ie);
            throw new RuntimeException("Unable to run quorum server ", ie);
        }
    }

呼叫

 /**
     * load the database from the disk onto memory and also add
     * the transactions to the committedlog in memory.
     * @return the last valid zxid on disk
     * @throws IOException
     */
    public long loadDataBase() throws IOException {
        PlayBackListener listener=new PlayBackListener(){
            public void onTxnLoaded(TxnHeader hdr,Record txn){
                Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid());
                addCommittedProposal(r);
            }
        };

        long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener);
        initialized = true;
        return zxid;
    }

    /**
     * maintains a list of last <i>committedLog</i>
     *  or so committed requests. This is used for
     * fast follower synchronization.
     * @param request committed request
     */
    public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            if (committedLog.size() > commitLogCount) {
                committedLog.removeFirst();
                minCommittedLog = committedLog.getFirst().packet.getZxid();
            }
            if (committedLog.isEmpty()) {
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            try {
                request.getHdr().serialize(boa, "hdr");
                if (request.getTxn() != null) {
                    request.getTxn().serialize(boa, "txn");
                }
                baos.close();
            } catch (IOException e) {
                LOG.error("This really should be impossible", e);
            }
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
                    baos.toByteArray(), null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }

3.2.2 啟動NettyServerCnxnFactory繫結服務

    @Override
    public void start() {
        LOG.info("binding to port " + localAddress);
        parentChannel = bootstrap.bind(localAddress);
    }

3.2.3 選舉演算法

 synchronized public void startLeaderElection() {
       try {
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }