zookeeper原始碼分析之一服務端啟動過程
zookeeper簡介
zookeeper是為分散式應用提供分散式協作服務的開源軟體。它提供了一組簡單的原子操作,分散式應用可以基於這些原子操作來實現更高層次的同步服務,配置維護,組管理和命名。zookeeper的設計使基於它的程式設計非常容易,若我們熟悉目錄樹結構的檔案系統,也會很容易使用zookeeper的資料模型樣式。它執行在java上,有java和c的客戶端。
協作服務因難於獲取正確而臭名遠揚,他們特別易於出錯如競爭條件和死鎖。zookeeper的動機是減輕分散式應用中從零開始實現協作服務的壓力。
zookeeper的特點
1.簡單:zookeeper執行分散式進行通過一個共享的層次名稱空間來進行協作,該名稱空間的組織類似於標準的檔案系統。名稱空間包括資料註冊器(稱之為znode),在zookeeper看來,這類似於檔案和目錄。與典型的檔案系統設計用來儲存不同的是,zookeeper資料是存放在記憶體中,這意味著zookeeper可以實現很高的吞吐量和低延遲。
ZooKeeper 實現在高效能,高可用性,嚴格有序的訪問方面有很大的優勢。在效能方面的優勢使它可以應用在大型的的分散式系統。在可靠性方面,避免單點故障。嚴格的順序訪問使它在客戶端可以實現複雜的同步原語。
2. 可複製:類似於分散式程序的協作,zookeeper本身很容易在一組主機(稱之為集合)中實現複製。zookeeper服務示意圖:
組成ZooKeeper服務的一組伺服器都必須知道對方的。它們儲存了記憶體映像的狀態,以及在持久儲存中的事務日誌和快照。只要大部分的伺服器可用,ZooKeeper服務將可用。
客戶端連線到一臺ZooKeeper伺服器。客戶端維護一個TCP連線,通過它傳送請求,得到響應,得到監視事件,併發送心跳。如果TCP連線到伺服器中斷,客戶端可以連線到不同的伺服器。
3. 有序:ZooKeeper給每次更新使用數字打標記,它反映了所有zookeeper事務的順序。隨後的操作可以使用這些順序來實現更高級別的抽象,如同步原語。
4.快速:它特別快,在“讀為主”的工作中,ZooKeeper 應用程式執行在數千臺機器,它在讀遠比寫更多的時候(在10:1的比例)表現的最好。
資料模型與層次名稱空間
ZooKeeper提供的名稱空間更像是一個標準的檔案系統。一個名字是一個由一個(或)分隔的路徑元素的序列。zookeeper名稱空間的每個節點由路徑來標示。
節點和臨時節點
不像標準的檔案系統,在ZooKeeper 名稱空間中每個節點都有與它相關的資料以及子節點。它就像這樣一個檔案系統,它允許一個檔案也可以是一個目錄。(zookeeper是用來儲存協作資料:狀態資訊,配置,位置資訊等,因此,儲存在每個節點的資料通常是很小的,在位元組到千位元組範圍。)我們使用術語znode來表明我們談論的是zookeeper資料節點。
znodes儲存一個數據結構,該資料結構包括資料變化的版本號和時間戳,ACL的變化,這些資訊允許快取驗證和協作更新。一個znode的資料的每次變化,版本號的增加。例如,每當客戶檢索資料時,它也接收到資料的版本。
在一個名稱空間中的每個節點儲存的資料的讀寫都是原子性的。讀獲取一個Znode所有的資料位元組;寫替換所有的資料。每個節點都有一個訪問控制列表(ACL),限制誰可以做什麼。
zookeeper也有臨時節點的概念。這些znodes只要建立znode的會話是活躍的,它就存在的。當會話結束時,這些znode被刪除。
條件更新與監控
ZooKeeper支援監控的概念。客戶端可以在一個znode上設定一個監控。當znode發生變化時會觸發或者移除監控。當監控觸發時,客戶端接收到一個報文,表明znode發生了變化。若客戶端和一個zookeeper伺服器的連線損壞時,客戶端接收到一個本地通知。
保障
ZooKeeper非常快速和簡單. 雖然它的目標是為建設更為複雜的服務,例如同步,它提供了一系列的保證。這些是:
-
順序一致性----客戶端的更新將被應用於它們被髮送的命令中。
-
原子性-- - 更新要麼成功要麼失敗,不存在部分成功或者部分失敗.
-
單系統映像 ---- 不管連線到哪臺伺服器,客戶端看到相同的服務檢視.
-
可靠性---- 一旦一個更新發生,直到下次一個客戶端重新了更新,否則從更新的時間後都會保持。
-
及時性--- - 在一定時間範圍內保證系統的客戶檢視是最新的.
簡單api
zookeeper設計目標之一是提供一個簡單的程式設計介面,因此,它只支援下面這些操作:
- create
-
在節點樹上某個位置上建立一個新的節點。
- delete
-
刪除一個節點
- exists
-
測試某位置的節點是否存在
- get data
-
從一個節點讀取資料
- set data
-
向一個節點寫入資料
- get children
-
檢索一個節點的一組子節點
- sync
-
等待資料傳播至一致。
實現
zookeeper元件顯示了zookeeper服務的高階元件。除了request processor,組成zookeeper服務的每個伺服器複製它的每個元件的copy。
zookeeper元件
replicated database是一個包含整個資料數的記憶體資料庫. 為了可復原,更新被寫到磁碟上,寫操作在應用到記憶體資料庫之前,先序列化到磁碟。
每個zookeeper伺服器給所有的客戶端提供服務。客戶端恰恰連線到一個伺服器來提交請求。讀請求由每個伺服器資料庫的本地複製提供服務。寫請求改變了服務的狀態,由request processor來處理。
作為通訊協議的一部分,所有客戶端的寫請求由一個單獨的伺服器處理,這個伺服器是zookeeper的leader伺服器,其餘的zookeeper伺服器叫做follower,follower從leader接收訊息並達成訊息傳輸。訊息層在失敗後替換leader並同步到連線到leader所有的follower。
ZooKeeper使用自定義的原子訊息協議. 因訊息層是原子性的, ZooKeeper 可以保證本地複製不會衝突. 當leader接收到一個寫請求,當寫操作應用到系統時,leader計算出系統的狀態,並轉化成一個捕捉新狀態的事務.
zookeeper啟動
服務端啟動
bin/zkServer.sh start
其中,啟動命令如下:
start) echo -n "Starting zookeeper ... " if [ -f "$ZOOPIDFILE" ]; then if kill -0 `cat "$ZOOPIDFILE"` > /dev/null 2>&1; then echo $command already running as process `cat "$ZOOPIDFILE"`. exit 0 fi fi nohup "$JAVA" $ZOO_DATADIR_AUTOCREATE "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" \ "-Dzookeeper.log.file=${ZOO_LOG_FILE}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \ -XX:+HeapDumpOnOutOfMemoryError -XX:OnOutOfMemoryError='kill -9 %p' \ -cp "$CLASSPATH" $JVMFLAGS $ZOOMAIN "$ZOOCFG" > "$_ZOO_DAEMON_OUT" 2>&1 < /dev/null & if [ $? -eq 0 ] then if /bin/echo -n $! > "$ZOOPIDFILE" then sleep 1 pid=$(cat "${ZOOPIDFILE}") if ps -p "${pid}" > /dev/null 2>&1; then echo STARTED else echo FAILED TO START exit 1 fi else echo FAILED TO WRITE PID exit 1 fi else echo SERVER DID NOT START exit 1 fi ;;
其中:
ZOOMAIN 是啟動程式的入口,其類為:
org.apache.zookeeper.server.quorum.QuorumPeerMain
它的啟動方法為:
/** * To start the replicated server specify the configuration file name on * the command line. * @param args path to the configfile */ public static void main(String[] args) { QuorumPeerMain main = new QuorumPeerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
呼叫初始化方法及run方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { QuorumPeerConfig config = new QuorumPeerConfig(); if (args.length == 1) { config.parse(args[0]); } // Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start(); if (args.length == 1 && config.isDistributed()) { runFromConfig(config); } else { LOG.warn("Either no config or no quorum defined in config, running " + " in standalone mode"); // there is only server in the quorum -- run as standalone ZooKeeperServerMain.main(args); } }
上述程式碼主要分3部分:
1. 解析配置檔案,預設的配置檔案為上一級目錄
config/zookeeper.properties或者config/zookeeper.cfg
/** * Parse a ZooKeeper configuration file * @param path the patch of the configuration file * @throws ConfigException error processing configuration */ public void parse(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } if (dynamicConfigFileStr!=null) { try { Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr); try { dynamicCfg.load(inConfig); if (dynamicCfg.getProperty("version") != null) { throw new ConfigException("dynamic file shouldn't have version inside"); } String version = getVersionFromFilename(dynamicConfigFileStr); // If there isn't any version associated with the filename, // the default version is 0. if (version != null) { dynamicCfg.setProperty("version", version); } } finally { inConfig.close(); } setupQuorumPeerConfig(dynamicCfg, false); } catch (IOException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + dynamicConfigFileStr, e); } File nextDynamicConfigFile = new File(configFileStr + nextDynamicConfigFileSuffix); if (nextDynamicConfigFile.exists()) { try { Properties dynamicConfigNextCfg = new Properties(); FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile); try { dynamicConfigNextCfg.load(inConfigNext); } finally { inConfigNext.close(); } boolean isHierarchical = false; for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) { String key = entry.getKey().toString().trim(); if (key.startsWith("group") || key.startsWith("weight")) { isHierarchical = true; break; } } lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical); } catch (IOException e) { LOG.warn("NextQuorumVerifier is initiated to null"); } } } }
2. 啟動安排清除任務
// Start and schedule the the purge task DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config .getDataDir(), config.getDataLogDir(), config .getSnapRetainCount(), config.getPurgeInterval()); purgeMgr.start();
呼叫start方法:
/** * Validates the purge configuration and schedules the purge task. Purge * task keeps the most recent <code>snapRetainCount</code> number of * snapshots and deletes the remaining for every <code>purgeInterval</code> * hour(s). * <p> * <code>purgeInterval</code> of <code>0</code> or * <code>negative integer</code> will not schedule the purge task. * </p> * * @see PurgeTxnLog#purge(File, File, int) */ public void start() { if (PurgeTaskStatus.STARTED == purgeTaskStatus) { LOG.warn("Purge task is already running."); return; } // Don't schedule the purge task with zero or negative purge interval. if (purgeInterval <= 0) { LOG.info("Purge task is not scheduled."); return; } timer = new Timer("PurgeTask", true); TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount); timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval)); purgeTaskStatus = PurgeTaskStatus.STARTED; }
從上面程式碼可以看到,清除工作啟動了一個定時器timer,PurgeTask繼承實現了TimeTask(一個可以被定時器安排執行一次或者多次的task),PurgeTask的實現如下:
static class PurgeTask extends TimerTask { private File logsDir; private File snapsDir; private int snapRetainCount; public PurgeTask(File dataDir, File snapDir, int count) { logsDir = dataDir; snapsDir = snapDir; snapRetainCount = count; } @Override public void run() { LOG.info("Purge task started."); try { PurgeTxnLog.purge(logsDir, snapsDir, snapRetainCount); } catch (Exception e) { LOG.error("Error occured while purging.", e); } LOG.info("Purge task completed."); } }
呼叫purge方法:
/** * Purges the snapshot and logs keeping the last num snapshots and the * corresponding logs. If logs are rolling or a new snapshot is created * during this process, these newest N snapshots or any data logs will be * excluded from current purging cycle. * * @param dataDir the dir that has the logs * @param snapDir the dir that has the snapshots * @param num the number of snapshots to keep * @throws IOException */ public static void purge(File dataDir, File snapDir, int num) throws IOException { if (num < 3) { throw new IllegalArgumentException(COUNT_ERR_MSG); } FileTxnSnapLog txnLog = new FileTxnSnapLog(dataDir, snapDir); List<File> snaps = txnLog.findNRecentSnapshots(num); retainNRecentSnapshots(txnLog, snaps); }
先獲取日誌檔案和快照,然後呼叫retainNRecentSnapshots方法處理:
static void retainNRecentSnapshots(FileTxnSnapLog txnLog, List<File> snaps) { // found any valid recent snapshots? if (snaps.size() == 0) return; File snapShot = snaps.get(snaps.size() -1); final long leastZxidToBeRetain = Util.getZxidFromName( snapShot.getName(), PREFIX_SNAPSHOT); class MyFileFilter implements FileFilter{ private final String prefix; MyFileFilter(String prefix){ this.prefix=prefix; } public boolean accept(File f){ if(!f.getName().startsWith(prefix + ".")) return false; long fZxid = Util.getZxidFromName(f.getName(), prefix); if (fZxid >= leastZxidToBeRetain) { return false; } return true; } } // add all non-excluded log files List<File> files = new ArrayList<File>(Arrays.asList(txnLog .getDataDir().listFiles(new MyFileFilter(PREFIX_LOG)))); // add all non-excluded snapshot files to the deletion list files.addAll(Arrays.asList(txnLog.getSnapDir().listFiles( new MyFileFilter(PREFIX_SNAPSHOT)))); // remove the old files for(File f: files) { System.out.println("Removing file: "+ DateFormat.getDateTimeInstance().format(f.lastModified())+ "\t"+f.getPath()); if(!f.delete()){ System.err.println("Failed to remove "+f.getPath()); } } }
3. 啟動zookeeper 伺服器
3.1 啟動單機
/* * Start up the ZooKeeper server. * * @param args the configfile or the port datadir [ticktime] */ public static void main(String[] args) { ZooKeeperServerMain main = new ZooKeeperServerMain(); try { main.initializeAndRun(args); } catch (IllegalArgumentException e) { LOG.error("Invalid arguments, exiting abnormally", e); LOG.info(USAGE); System.err.println(USAGE); System.exit(2); } catch (ConfigException e) { LOG.error("Invalid config, exiting abnormally", e); System.err.println("Invalid config, exiting abnormally"); System.exit(2); } catch (DatadirException e) { LOG.error("Unable to access datadir, exiting abnormally", e); System.err.println("Unable to access datadir, exiting abnormally"); System.exit(3); } catch (AdminServerException e) { LOG.error("Unable to start AdminServer, exiting abnormally", e); System.err.println("Unable to start AdminServer, exiting abnormally"); System.exit(4); } catch (Exception e) { LOG.error("Unexpected exception, exiting abnormally", e); System.exit(1); } LOG.info("Exiting normally"); System.exit(0); }
呼叫方法:
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } ServerConfig config = new ServerConfig(); if (args.length == 1) { config.parse(args[0]); } else { config.parse(args); } runFromConfig(config); }
啟動過程:
/** * Run from a ServerConfig. * @param config ServerConfig to use. * @throws IOException * @throws AdminServerException */ public void runFromConfig(ServerConfig config) throws IOException, AdminServerException { LOG.info("Starting server"); FileTxnSnapLog txnLog = null; try { // Note that this thread isn't going to be doing anything else, // so rather than spawning another thread, we will just call // run() in this thread. // create a file logger url from the command line args txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir); ZooKeeperServer zkServer = new ZooKeeperServer( txnLog, config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null); // Start Admin server adminServer = AdminServerFactory.createAdminServer(); adminServer.setZooKeeperServer(zkServer); adminServer.start(); boolean needStartZKServer = true; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); cnxnFactory.startup(zkServer); // zkServer has been started. So we don't need to start it again in secureCnxnFactory. needStartZKServer = false; } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); secureCnxnFactory.startup(zkServer, needStartZKServer); } containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor, Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)), Integer.getInteger("znode.container.maxPerMinute", 10000) ); containerManager.start(); if (cnxnFactory != null) { cnxnFactory.join(); } if (secureCnxnFactory != null) { secureCnxnFactory.join(); } if (zkServer.isRunning()) { zkServer.shutdown(); } } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Server interrupted", e); } finally { if (txnLog != null) { txnLog.close(); } } }
cnxnFactory.startup(zkServer);[NettyServerCnxnFactory]
@Override public void startup(ZooKeeperServer zks, boolean startServer) throws IOException, InterruptedException { start(); setZooKeeperServer(zks); if (startServer) { zks.startdata(); zks.startup(); } }
public synchronized void startup() { if (sessionTracker == null) { createSessionTracker(); } startSessionTracker(); setupRequestProcessors(); registerJMX(); state = State.RUNNING; notifyAll(); } protected void setupRequestProcessors() { RequestProcessor finalProcessor = new FinalRequestProcessor(this); RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor); ((SyncRequestProcessor)syncProcessor).start(); firstProcessor = new PrepRequestProcessor(this, syncProcessor); ((PrepRequestProcessor)firstProcessor).start(); }
3.2 叢集啟動
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException { try { ManagedUtil.registerLog4jMBeans(); } catch (JMException e) { LOG.warn("Unable to register log4j JMX control", e); } LOG.info("Starting quorum peer"); try { ServerCnxnFactory cnxnFactory = null; ServerCnxnFactory secureCnxnFactory = null; if (config.getClientPortAddress() != null) { cnxnFactory = ServerCnxnFactory.createFactory(); cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false); } if (config.getSecureClientPortAddress() != null) { secureCnxnFactory = ServerCnxnFactory.createFactory(); secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true); } quorumPeer = new QuorumPeer(); quorumPeer.setTxnFactory(new FileTxnSnapLog( config.getDataLogDir(), config.getDataDir())); quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled()); quorumPeer.enableLocalSessionsUpgrading( config.isLocalSessionsUpgradingEnabled()); //quorumPeer.setQuorumPeers(config.getAllMembers()); quorumPeer.setElectionType(config.getElectionAlg()); quorumPeer.setMyid(config.getServerId()); quorumPeer.setTickTime(config.getTickTime()); quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout()); quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout()); quorumPeer.setInitLimit(config.getInitLimit()); quorumPeer.setSyncLimit(config.getSyncLimit()); quorumPeer.setConfigFileName(config.getConfigFilename()); quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory())); quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false); if (config.getLastSeenQuorumVerifier()!=null) { quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false); } quorumPeer.initConfigInZKDatabase(); quorumPeer.setCnxnFactory(cnxnFactory); quorumPeer.setSecureCnxnFactory(secureCnxnFactory); quorumPeer.setLearnerType(config.getPeerType()); quorumPeer.setSyncEnabled(config.getSyncEnabled()); quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs()); quorumPeer.start(); quorumPeer.join(); } catch (InterruptedException e) { // warn, but generally this is ok LOG.warn("Quorum Peer interrupted", e); } }
從上述程式碼可以看出,QuorumPeer的start()方法和join()方法是主流程。
QuorumPeer繼承了ZooKeeperThread,ZooKeeperThread繼承自Thread,故QuorumPeer間接繼承了Thread。
@Override public synchronized void start() { if (!getView().containsKey(myid)) { throw new RuntimeException("My id " + myid + " not in the peer list"); } loadDataBase(); startServerCnxnFactory(); try { adminServer.start(); } catch (AdminServerException e) { LOG.warn("Problem starting AdminServer", e); System.out.println(e); } startLeaderElection(); super.start(); }
3.2.1. 啟動時先從記憶體資料庫中恢復資料
private void loadDataBase() { try { zkDb.loadDataBase(); // load the epochs long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid; long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); try { currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version currentEpoch = epochOfZxid; LOG.info(CURRENT_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", currentEpoch); writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch); } if (epochOfZxid > currentEpoch) { throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid); } try { acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME); } catch(FileNotFoundException e) { // pick a reasonable epoch number // this should only happen once when moving to a // new code version acceptedEpoch = epochOfZxid; LOG.info(ACCEPTED_EPOCH_FILENAME + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation", acceptedEpoch); writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch); } if (acceptedEpoch < currentEpoch) { throw new IOException("The accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch) + " is less than the current epoch, " + ZxidUtils.zxidToString(currentEpoch)); } } catch(IOException ie) { LOG.error("Unable to load database on disk", ie); throw new RuntimeException("Unable to run quorum server ", ie); } }
呼叫
/** * load the database from the disk onto memory and also add * the transactions to the committedlog in memory. * @return the last valid zxid on disk * @throws IOException */ public long loadDataBase() throws IOException { PlayBackListener listener=new PlayBackListener(){ public void onTxnLoaded(TxnHeader hdr,Record txn){ Request r = new Request(0, hdr.getCxid(),hdr.getType(), hdr, txn, hdr.getZxid()); addCommittedProposal(r); } }; long zxid = snapLog.restore(dataTree,sessionsWithTimeouts,listener); initialized = true; return zxid; } /** * maintains a list of last <i>committedLog</i> * or so committed requests. This is used for * fast follower synchronization. * @param request committed request */ public void addCommittedProposal(Request request) { WriteLock wl = logLock.writeLock(); try { wl.lock(); if (committedLog.size() > commitLogCount) { committedLog.removeFirst(); minCommittedLog = committedLog.getFirst().packet.getZxid(); } if (committedLog.isEmpty()) { minCommittedLog = request.zxid; maxCommittedLog = request.zxid; } ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); try { request.getHdr().serialize(boa, "hdr"); if (request.getTxn() != null) { request.getTxn().serialize(boa, "txn"); } baos.close(); } catch (IOException e) { LOG.error("This really should be impossible", e); } QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos.toByteArray(), null); Proposal p = new Proposal(); p.packet = pp; p.request = request; committedLog.add(p); maxCommittedLog = p.packet.getZxid(); } finally { wl.unlock(); } }
3.2.2 啟動NettyServerCnxnFactory繫結服務
@Override public void start() { LOG.info("binding to port " + localAddress); parentChannel = bootstrap.bind(localAddress); }
3.2.3 選舉演算法
synchronized public void startLeaderElection() { try { if (getPeerState() == ServerState.LOOKING) { currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); } } catch(IOException e) { RuntimeException re = new RuntimeException(e.getMessage()); re.setStackTrace(e.getStackTrace()); throw re; }