1. 程式人生 > >Zookeeper原始碼解析——服務端啟動流程

Zookeeper原始碼解析——服務端啟動流程

一 啟動類

從bin/zkServer.cmd可以看出啟動類是org.apache.zookeeper.server.quorum.QuorumPeerMain,類結構如下:

一個普通的類,主要有個QuorumPeer(zookeeper叢集版啟動時節點用QuorumPeer表示)的變數以及啟動main函式和兩個初始化函式

二 啟動流程

QuorumPeerMain.initializeAndRun(args):解析配置,啟動資料自動清除的定時任務,叢集版則啟動叢集版程式碼。

1 解析配置

QuorumPeerConfig.parse

    public void parse
(String path) throws ConfigException { LOG.info("Reading configuration from: " + path); try { //構造器模式 File configFile = (new VerifyingFileFactory.Builder(LOG) .warnForRelativePath() .failForNonExistingPath() .build()).create(path); Properties cfg = new
Properties(); FileInputStream in = new FileInputStream(configFile); try { cfg.load(in); configFileStr = path; } finally { in.close(); } parseProperties(cfg); } catch (IOException e) { throw
new ConfigException("Error processing " + path, e); } catch (IllegalArgumentException e) { throw new ConfigException("Error processing " + path, e); } //支援動態配置 if (dynamicConfigFileStr!=null) { try { Properties dynamicCfg = new Properties(); FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr) ... setupQuorumPeerConfig(dynamicCfg, false); ... }

2 資料自動清除任務

DatadirCleanupManager,包含一個Timer定時器和PurgeTask清理任務。
首先認知下zookeeper主要存放了兩類檔案,snapshot和log,snapshot是資料的快照,log是與snapshot關聯一致的事務日誌

3 叢集版啟動程式碼

這裡開始構建QuorumPeer例項,根據配置進行set,並start

3.1 QuorumPeer幾個重要的配置屬性:

  • ServerCnxnFactory cnxnFactory:預設實現是NIOServerCnxnFactory,主要是負責和客戶端建立連線和通訊
  • FileTxnSnapLog logFactory:使用者日誌記錄和snapshot儲存,能根據它加載出資料到ZKDatabase中,同時能將ZKDatabase中的資料以及session儲存到快照日誌檔案中
  • int electionType:選舉演算法的型別。預設是3,採用的是FastLeaderElection選舉演算法,
  • long myid:就是myid檔案中寫入的數字,節點標識。
  • int tickTime:session檢查的心跳間隔
  • minSessionTimeout、maxSessionTimeout:限制客戶端給出的sessionTimeout時間
  • initLimit:初始化階段,和leader通訊超時、接受其他過半節點響應的超時設定,超時時間是initLimit*tickTime;
  • syncLimit:初始化階段後,代替initLimit作用,作為後續連線的超時設定,時間也是syncLimit*tickTime
  • ZKDatabase zkDb:儲存ZooKeeper樹形資料,是伺服器的記憶體資料庫
  • quorumConfig:用於驗證是節點是否已經認同了。預設採用的是QuorumMaj,即最簡單的數量過半即可,不考慮權重問題
  • learnerType:兩種,PARTICIPANT, OBSERVER。PARTICIPANT參與投票,可能成為Follower,也可能成為Leader。OBSERVER不參與投票,角色不會改變。
  • quorumPeers: QuorumServer包含ip、和Leader用的通訊埠、選舉投票用的埠
public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException
    {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;

          if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }

          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

3.2 quorumPeer.start()

QuorumPeer繼承自Thread,start流程如下:

Created with Raphaël 2.1.0開始載入資料庫啟動服務,監聽客戶端請求啟動Jetty管理服務,提供管理頁面開始選舉Run結束

3.2.1 載入資料庫

ZKDatabase: 類註釋說明這是zk的記憶體資料庫,包含了session、datatree和commited log 資訊

    protected DataTree dataTree;
    protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;//session的id以及對應的超時時間,session id可關聯log
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();

DataTree:


    private final ConcurrentHashMap<String, DataNode> nodes =
        new ConcurrentHashMap<String, DataNode>();//維護path對應的node
    private final WatchManager dataWatches = new WatchManager();//資料更新的觸發管理
    private final WatchManager childWatches = new WatchManager(); //自節點更新的觸發管理
    private final PathTrie pTrie = new PathTrie();//跟蹤配額節點的資訊

    private final Map<Long, HashSet<String>> ephemerals =
        new ConcurrentHashMap<Long, HashSet<String>>();//維護session的臨時節點

DataNode包含有children,data[]儲存本節點的資料,stat 是否持久化的狀態資訊

TODO WatchManager解析

QuorumPeer.loadDataBase,通過zkDb.loadDataBase,從最新的 snap中恢復zkDb,然後從最新的log,將未持久化的資訊replay。
zkDb.loadDataBase之後,可以獲得最新的zkid,進而確定選舉進入了哪一個代(epoch)

3.2.3 啟動客戶端監聽服務

預設實現:NIOServerCnxnFactory.start()

    public void start() {
        stopped = false;
        if (workerPool == null) {
            workerPool = new WorkerService(
                "NIOWorker", numWorkerThreads, false);
        }
        for(SelectorThread thread : selectorThreads) {
            if (thread.getState() == Thread.State.NEW) {
                thread.start();
            }
        }
        // ensure thread is started once and only once
        if (acceptThread.getState() == Thread.State.NEW) {
            acceptThread.start();
        }
        if (expirerThread.getState() == Thread.State.NEW) {
            expirerThread.start();
        }
    }

看下類註釋:

/**
 * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
 * NIO non-blocking socket calls. Communication between threads is handled via
 * queues.
 *
 *   - 1   accept thread, which accepts new connections and assigns to a
 *         selector thread
 *   - 1-N selector threads, each of which selects on 1/N of the connections.
 *         The reason the factory supports more than one selector thread is that
 *         with large numbers of connections, select() itself can become a
 *         performance bottleneck.
 *   - 0-M socket I/O worker threads, which perform basic socket reads and
 *         writes. If configured with 0 worker threads, the selector threads
 *         do the socket I/O directly.
 *   - 1   connection expiration thread, which closes idle connections; this is
 *         necessary to expire connections on which no session is established.
 *
 * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
 * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
 */
  • 一個初始化的工作執行緒池workerPool, 包含ArrayList<ExecutorService> workers,每個worker都是一個容量為1的固定執行緒池workers.add(Executors.newFixedThreadPool(1, new DaemonThreadFactory(threadNamePrefix, i)))

  • 有多個Selector執行緒,每個Selector執行緒只負責select各自負責的連線,避免連線數太大,使得selector變成效能瓶頸

  • 一個Accept執行緒,只處理新來的連線,將連線交給Selector

  • 一個Expire執行緒,檢查並關閉空閒的連線

start()將啟動上面所有執行緒

先看下Accept執行緒:
初始化是在NIOServerCnxnFactory.configure()。ServerSocketChannel被開啟,並繫結port,和selectorThreads 一起傳遞給了acceptThread的建構函式。acceptThread在構造時,就關注了serverSocket的OP_ACCEPT事件。並在run迴圈中,監聽新連線的事件到來,最後在doAccept()中將連線的socket傳給其中一個selectorThread(呼叫selectorThread.addAcceptedConnection)。

    public void configure(InetSocketAddress addr, int maxcc, boolean secure) throws IOException {
        ...
        this.ss = ServerSocketChannel.open();
        ss.socket().setReuseAddress(true);
        LOG.info("binding to port " + addr);
        ss.socket().bind(addr);
        ss.configureBlocking(false);
        acceptThread = new AcceptThread(ss, addr, selectorThreads);
    }

    public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
                Set<SelectorThread> selectorThreads) throws IOException {
            super("NIOServerCxnFactory.AcceptThread:" + addr);
            this.acceptSocket = ss;
            this.acceptKey =
                acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
            this.selectorThreads = Collections.unmodifiableList(
                new ArrayList<SelectorThread>(selectorThreads));
            selectorIterator = this.selectorThreads.iterator();
        }
    private boolean doAccept() {
            boolean accepted = false;
            SocketChannel sc = null;
            try {
                sc = acceptSocket.accept();
                ...
                SelectorThread selectorThread = selectorIterator.next();
                if (!selectorThread.addAcceptedConnection(sc)) {
                    throw new IOException(
                        "Unable to add connection to selector queue"
                        + (stopped ? " (shutdown in progress)" : ""));
                }

            } catch (IOException e) {
                ...
            }
            return accepted;
        }

再看下 selectorThread執行緒。
先看下run迴圈

      public void run() {
            try {  
                while (!stopped) {
                    try {
                        //接收讀寫的事件,併兼讀寫操作交給workerThread進行
                        select();
                        //處理新的連線,關注讀,並新增NIOServerCnxn
                        processAcceptedConnections();
                        //select之後會暫時不再關注對應socket的讀寫事件
                        //這裡來恢復關注。
                        processInterestOpsUpdateRequests();
                    } catch (RuntimeException e) {
                        LOG.warn("Ignoring unexpected runtime exception", e);
                    } catch (Exception e) {
                        LOG.warn("Ignoring unexpected exception", e);
                    }
                }
                ...
                //退出迴圈後對連線進行關閉清理
             }

       private void select() {
                ...
                selector.select();

                Set<SelectionKey> selected = selector.selectedKeys();
                ArrayList<SelectionKey> selectedList =
                    new ArrayList<SelectionKey>(selected);
                Collections.shuffle(selectedList);
                Iterator<SelectionKey> selectedKeys = selectedList.iterator();
                while(!stopped && selectedKeys.hasNext()) {
                    ...
                    if (key.isReadable() || key.isWritable()) {
                        handleIO(key);
                    } 
                }
                ...
        }

        private void handleIO(SelectionKey key) {
            IOWorkRequest workRequest = new IOWorkRequest(this, key);
            NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();

            // Stop selecting this key while processing on its
            // connection
            cnxn.disableSelectable();
            key.interestOps(0);
            touchCnxn(cnxn);
            workerPool.schedule(workRequest);
        }

selectorThread的addAcceptedConnection會將新來的連線socket入隊,通過wakeupSelector()結束run中的select()阻塞,使其能馬上processAcceptedConnections。

wokerPool中的執行緒對socket的資料處理,最終由ZookeeperServer.processPacket完成。這裡資料的處理參考 資料序列化一節

3.3.4 啟動管理服務

預設是Jetty提供服務,會佔用8080埠,在做單機偽叢集部署時,要禁用Jetty

3.3.5 選舉

會指定一個選舉演算法createElectionAlgorithm(3),預設是3,快速選舉,其他演算法準備過期了。FastLeaderElection fle = new FastLeaderElection(this, qcm); fle.start();

FastLeaderElection需要一個QuorumPeer節點物件,QuorumCnxManager節點連線管理器(管理與其他節點的連線),sendqueue 和recvqueue 兩個佇列的成員變數,一個Messenger,負責訊息的傳送和接收

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.stop = false;
        this.manager = manager;
        starter(self, manager);
    }

    private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }
        Messenger(QuorumCnxManager manager) {

            this.ws = new WorkerSender(manager);

            this.wsThread = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            this.wsThread.setDaemon(true);

            this.wr = new WorkerReceiver(manager);

            this.wrThread = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            this.wrThread.setDaemon(true);
        }

FastLeaderElection.start(),直接呼叫mesenger.start(),啟動訊息的傳送和接收

訊息傳送由WorkerSender實現

       class WorkerSender extends ZooKeeperThread {
            volatile boolean stop;
            QuorumCnxManager manager;

            WorkerSender(QuorumCnxManager manager){
                super("WorkerSender");
                this.stop = false;
                this.manager = manager;
            }

            public void run() {
                while (!stop) {
                    try {
                        ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
                        if(m == null) continue;

                        process(m);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
                LOG.info("WorkerSender is down");
            }

            /**
             * Called by run() once there is a new message to send.
             *
             * @param m     message to send
             */
            void process(ToSend m) {
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);

                manager.toSend(m.sid, requestBuffer);

            }
        }

訊息接收由WorkerReceiver實現

 Message response;
                while (!stop) {
                    // Sleeps on receive
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        //解包
                        ...
                        Notification n = new Notification();
                        //繼續解包,版本校驗

//如果訊息來自不參與選舉的伺服器,則直接返回自己的情況                         if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
                            Vote current = self.getCurrentVote();
                            QuorumVerifier qv = self.getQuorumVerifier();
                            ToSend notmsg = new ToSend(ToSend.mType.notification,
                                    current.getId(),
                                    current.getZxid(),
                                    logicalclock.get(),
                                    self.getPeerState(),
                                    response.sid,
                                    current.getPeerEpoch(),
                                    qv.toString().getBytes());

                            sendqueue.offer(notmsg);
                        } else {

                            /*
                             * If this server is looking, then send proposed leader
                             */

                            if(self.getPeerState() == QuorumPeer.ServerState.LOOKING){
                                recvqueue.offer(n);

                                /*
                                 * Send a notification back if the peer that sent this
                                 * message is also looking and its logical clock is
                                 * lagging behind.
                                 */
                                if((ackstate == QuorumPeer.ServerState.LOOKING)
                                        && (n.electionEpoch < logicalclock.get())){
                                    Vote v = getVote();
                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock.get(),
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            } else {
                                /*
                                 * If this server is not looking, but the one that sent the ack
                                 * is looking, then send back what it believes to be the leader.
                                 */
                                Vote current = self.getCurrentVote();
                                if(ackstate == QuorumPeer.ServerState.LOOKING){
                                    if(LOG.isDebugEnabled()){
                                        LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
                                                self.getId(),
                                                response.sid,
                                                Long.toHexString(current.getZxid()),
                                                current.getId(),
                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
                                    }

                                    QuorumVerifier qv = self.getQuorumVerifier();
                                    ToSend notmsg = new ToSend(
                                            ToSend.mType.notification,
                                            current.getId(),
                                            current.getZxid(),
                                            current.getElectionEpoch(),
                                            self.getPeerState(),
                                            response.sid,
                                            current.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);
                                }
                            }


總結下這段接收的流程:

Created with Raphaël 2.1.0訊息解碼對方在參與選舉?我在參與選舉?訊息入列,如果對方選舉輪數靠後,直接返回當前的選舉結果結束如果對方在參與選舉,則告訴他當前的選舉結果返回當前選舉結果yesnoyesno

3.2.6 Run迴圈

while(running)中根據zk狀態,作不同處理

  • LOOKING:選舉階段,不斷參與選舉(lookForLeader()),直至選舉結束(屆時完成狀態切換)
  • OBSERVING:觀察階段,嘗試與Leader通訊,獲取自己的狀態資訊
  • LEADING:Leader,構造Leader服務,並呼叫lead()後阻塞,只有Leader掛了後,再啟動則回到LOOKING狀態
  • FOLLOWING:Follower, 構造Follower服務,並呼叫followLeader()後阻塞,只有掛了,重新回到LOOKING狀態