1. 程式人生 > >Zookeeper源碼閱讀(十四) 單機Server

Zookeeper源碼閱讀(十四) 單機Server

調用 集群 add hit ron ide 操作 段落 election

前言

前面兩篇主要說了下client-server的session相關的內容,到這裏client的內容以及client-server的連接的內容也就基本告一段落了,剩下的部分就是server部分內部的結構,zk的選舉以及server部分的工作機制等了。 這一篇主要說下單機server的啟動過程,裏面會涉及到一些server內部的工作機制和機構。

Server架構

技術分享圖片

可以看到Zookeeper的server端主要分為幾個大模塊,ZKDatabase是zk server內部的內存數據庫,內部維護了節點數據等關鍵數據,負責快照和日誌的記錄,同時也管理了session的超時和集群的選舉,其他的部分主要有負責管理session的sessiontracker,負責處理請求的請求鏈和Learner模塊(集群溝通?目前還不是特別清楚)。

單機版Server啟動

單機版Server主要流程:

技術分享圖片

從上圖中可以看到單機server啟動可以分為預啟動和初始化兩個部分。

預啟動

1. 統一由QuorumPeerMain作為啟動類

無論單機或集群,在zkServer.cmd和zkServer.sh中都配置了QuorumPeerMain作為啟動入口類。

2. 解析zoo.cfg

用過ZK的同學都知道zoo.cfg是用戶配置的zookeeper核心配置文件,ticktime,dataDir,dataLogDir,集群ip:port等都配置在其中。在實例化QuorumPeerMain對象後會去解析zoo.cfg文件。

QuorumPeerMain(Main)->QuorumPeerMain(initializeAndRun)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)

parseProperties函數太長了。。。而且都是很簡單的property文件取值的操作,可以簡單看下。

3. 創建並啟動歷史文件清理器DatadirCleanupManager

DatadirCleanupManager的start方法負責自動清理歷史的快照和事務日誌。

public void start() {
    if (PurgeTaskStatus.STARTED == purgeTaskStatus) {
        LOG.warn("Purge task is already running.");
        return;
    }
    // Don't schedule the purge task with zero or negative purge interval.
    if (purgeInterval <= 0) {
        LOG.info("Purge task is not scheduled.");
        return;
    }

    timer = new Timer("PurgeTask", true);//利用了java的Timer類來做定時任務
    TimerTask task = new PurgeTask(dataLogDir, snapDir, snapRetainCount);//PurgeTask是一個timertask
    timer.scheduleAtFixedRate(task, 0, TimeUnit.HOURS.toMillis(purgeInterval));//設置頻率

    purgeTaskStatus = PurgeTaskStatus.STARTED;
}
4. 判斷啟動模式
if (args.length == 1 && config.servers.size() > 0) {//通過解析zoo.cfg server數量來判斷是否是集群
    runFromConfig(config);//如果是集群,直接用QuorumPeerMain中集群啟動方法
} else {
    LOG.warn("Either no config or no quorum defined in config, running "
            + " in standalone mode");
    // there is only server in the quorum -- run as standalone
    ZooKeeperServerMain.main(args);//如果是單機就用單機啟動方法
}
5. 再次解析zoo.cfg

ZooKeeperServerMain(Main)->ZooKeeperServerMain(initializeAndRun)->ServerConfig(parse)->QuorumPeerConfig(parse)->QuorumPeerConfig(parseProperties)。

這裏之所以還要進行一次解析是因為這裏是調用的zookeeperserver的main方法,無法把原來解析的參數傳入。而且配置文件比較小,解析並不是特別耗資源,可以接受。

6. 創建ZookeeperServer實例

ZookeeperServer是server端的核心類,在啟動時會創建zookeeperserver的一個實例。

final ZooKeeperServer zkServer = new ZooKeeperServer();

到這裏位置就完成了所謂的預啟動,可以看出,在預啟動階段Zookeeper的server幹了下面幾件事:

  1. 清理歷史快照和log文件;
  2. 解析配置文件並進行初步的分析,判斷Server端狀態(Standalone/Cluster);
  3. 實例化ZookeeperServer。

初始化

在實例化了zookeeperserver之後,zookeeper server端的啟動過程便來到了初始化階段,這個過程也是比較長的。

首先是在runfromconfig方法中:

public void runFromConfig(ServerConfig config) throws IOException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // Note that this thread isn't going to be doing anything else,
        // so rather than spawning another thread, we will just call
        // run() in this thread.
        // create a file logger url from the command line args
        final ZooKeeperServer zkServer = new ZooKeeperServer();//1
        // Registers shutdown handler which will be used to know the
        // server error or shutdown state changes.
        final CountDownLatch shutdownLatch = new CountDownLatch(1);//2
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));

        txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(//3
                config.dataDir));
        zkServer.setTxnLogFactory(txnLog);
        zkServer.setTickTime(config.tickTime);
        zkServer.setMinSessionTimeout(config.minSessionTimeout);
        zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
        cnxnFactory = ServerCnxnFactory.createFactory();
        cnxnFactory.configure(config.getClientPortAddress(),
                config.getMaxClientCnxns());
        cnxnFactory.startup(zkServer);
        // Watch status of ZooKeeper server. It will do a graceful shutdown
        // if the server is not running or hits an internal error.
        shutdownLatch.await();
        shutdown();

        cnxnFactory.join();
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}
1. 創建服務器統計器ServerStats

在上面代碼的1處,也就是zookeeperserver的構造函數內實例化了Server的統計器ServerStats。

public ZooKeeperServer() {
    serverStats = new ServerStats(this); //ServerStats統計了基本的server的數據如收發packet數,延遲信息等。
    listener = new ZooKeeperServerListenerImpl(this);
}

簡單介紹下ServerStats:

/**
 * Basic Server Statistics
 */
 //從註釋也可以知道ServerStats是server數據的基礎類
public class ServerStats {
    private long packetsSent;//zkserver啟動後,或是最近一次充值服務端統計信息後,服務端->客戶端發送的響應次數
    private long packetsReceived;//zkserver啟動後,或是最近一次充值服務端統計信息後,服務端收到的客戶端發送的響應次數
    private long maxLatency;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的最大延時
    private long minLatency = Long.MAX_VALUE;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的最小延時
    private long totalLatency = 0;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端請求處理的總延時
    private long count = 0;//zkserver啟動後,或是最近一次充值服務端統計信息後,server端處理的客戶端請求總次數

    private final Provider provider;//provider對象提供部分統計數據,如下

    public interface Provider {
        public long getOutstandingRequests();//獲取隊列中還沒有被處理的請求數量,在zookeeperserver和finalrequestprocessor中
        public long getLastProcessedZxid();//獲得目前最新的zxid
        public String getState();//獲取服務器狀態
        public int getNumAliveConnections();//獲取存活的客戶端連接總數
    }
    
    public ServerStats(Provider provider) {//構造器
        this.provider = provider;
    }
2. 創建數據管理器FileTxnSnapLog

FileTxnSnapLog是Zookeeper上層服務器和底層數據存儲之間的對接層,提供了一系列操作數據文件的接口,如事務日誌文件和快照數據文件。Zookeeper根據zoo.cfg文件中解析出的快照數據目錄dataDir和事務日誌目錄dataLogDir來創建FileTxnSnapLog。

其實這裏的FileTxnSnapLog就是包含了第3,4篇中講的快照和日誌類FileTxnLog,FileSnap的功能類,FileTxnLog,FileSnap的生成以來dataDir和snapDir(dataLogDir)來生成。

3. 設置服務器tickTime和會話超時時間限制
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
4. 創建ServerCnxnFactory

通過配置系統屬性zookeper.serverCnxnFactory來指定使用Zookeeper自己實現的NIO還是使用Netty框架作為Zookeeper服務端網絡連接工廠。

cnxnFactory = ServerCnxnFactory.createFactory();//創建ServerCnxnFactory
cnxnFactory.configure(config.getClientPortAddress(),
        config.getMaxClientCnxns());//初始化ServerCnxnFactory
cnxnFactory.startup(zkServer);//啟動ServerCnxnFactory

之前提到過這裏利用到了反射。

static public ServerCnxnFactory createFactory() throws IOException {
    String serverCnxnFactoryName =
        System.getProperty(ZOOKEEPER_SERVER_CNXN_FACTORY);//讀取配置
    if (serverCnxnFactoryName == null) {
        serverCnxnFactoryName = NIOServerCnxnFactory.class.getName();//默認是NIO實現
    }
    try {
        ServerCnxnFactory serverCnxnFactory = (ServerCnxnFactory) Class.forName(serverCnxnFactoryName)//如果配置了,則按照配置來實例化。反射
                .getDeclaredConstructor().newInstance();
        LOG.info("Using {} as server connection factory", serverCnxnFactoryName);
        return serverCnxnFactory;
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + serverCnxnFactoryName);
        ioe.initCause(e);
        throw ioe;
    }
}
5. 初始化ServerCnxnFactory

NIOServerCnxnFactory(configure)

@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
    configureSaslLogin();

    thread = new ZooKeeperThread(this, "NIOServerCxn.Factory:" + addr);//傳入的runnable對象是ServerCnxnFactory的實現類
thread.setDaemon(true);//設置為daemon線程
    maxClientCnxns = maxcc;//NIO相關的設置
    this.ss = ServerSocketChannel.open();
    ss.socket().setReuseAddress(true);
    LOG.info("binding to port " + addr);
    ss.socket().bind(addr);
    ss.configureBlocking(false);
    ss.register(selector, SelectionKey.OP_ACCEPT);
}

可以看到,Zookeeper會初始化Thread作為ServerCnxnFactory的主線程,然後再初始化NIO服務器。而zookeeper初始化的thread傳入的runnable對象依然是ServerCnxnFactory的實現類,也就是說run的時候依然是執行ServerCnxnFactory。

6. 啟動ServerCnxnFactory主線程
@Override
public void startup(ZooKeeperServer zks) throws IOException,
        InterruptedException {
    start();//啟動主線程
    setZooKeeperServer(zks);//設置server端對象
    zks.startdata();//恢復數據等
    zks.startup();
}

其中start方法啟動了線程。

public void start() {
    // ensure thread is started once and only once
    if (thread.getState() == Thread.State.NEW) {//如果是剛啟動
        thread.start();//啟動線程
    }
}

NIOServerCnxnFactory的run方法是NIO異步連接的一些基本設置如建立連接等。

7. 恢復本地數據

每次zk啟動時,都需要從本地塊找數據文件和事務日誌文件中進行數據恢復。NIOServerCnxnFactory(startdata)->ZooKeeperServer(startdata)中進行了恢復data的操作。

8. 創建並啟動session管理器

所謂的session管理器就是前面說的sessiontracker。

public synchronized void startup() {
    if (sessionTracker == null) {
        createSessionTracker();//創建session管理器
    }
    startSessionTracker();//啟動session管理器
    setupRequestProcessors();//初始化zookeeper請求處理鏈

    registerJMX();//註冊JMX服務

    setState(State.RUNNING);
    notifyAll();
}
protected void createSessionTracker() {
    sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
            tickTime, 1, getZooKeeperServerListener());//創建sessiontrack,初始化管理器裏的sessionsWithTimeout,expirationInterval等變量,特別的是,初始化sessionId也在這裏一起做了
}
9. 初始化Zookeeper的請求處理鏈
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

技術分享圖片

zookeeper請求處理方式基於責任鏈模式,也就是說在server端有多個請求處理器一次來處理一個客戶端請求。在服務器啟動的時候,會將這些處理器串聯起來形成一個處理鏈。上圖是單機server的處理器。

10. 註冊JMX服務

ZK 服務器的信息會以JXM的方式暴露給外部。這裏還不太了解。

11. 註冊ZK服務器實例
public void submitRequest(Request si) {
    if (firstProcessor == null) {
        synchronized (this) {
            try {
                // Since all requests are passed to the request
                // processor it should wait for setting up the request
                // processor chain. The state will be updated to RUNNING
                // after the setup.
                while (state == State.INITIAL) {//如果在初始化時會wait住
                    wait(1000);
                }
            } catch (InterruptedException e) {
                LOG.warn("Une

通過上面代碼的註釋可以知道在初始化時請求提交處理會wait住,而這個函數是在NIOServerCnxnFactory(run)->NIOServerCnxn(doIO)->ZookeeperServer(processConnectRequest)->ZookeeperServer(createSession)->ZookeeperServer(submitRequest)中調用的,也即是前面在啟動ServerCnxnFactory主線程時便會在這裏wait住。

setState(State.RUNNING);
notifyAll();

而在這裏便會notify時線程可以完全工作。

思考

JMX需要再學習

請求鏈和日誌/快照清理過程

參考

http://www.cnblogs.com/leesf456/p/6105276.html

https://www.jianshu.com/p/47cb9e6d309d
https://my.oschina.net/pingpangkuangmo/blog/491673

https://my.oschina.net/xianggao/blog/537902

https://www.jianshu.com/p/76d6d674530b

從paxos到zk

Zookeeper源碼閱讀(十四) 單機Server