系統模型

資料模型

ZNode是ZK中資料的最小單元,每個ZNode上都可以儲存資料,同時還可以掛載子節點,形成一個層次化的名稱空間——樹.

Zk中每個資料節點都稱為ZNode,所有ZNode形成樹形結構。

事務ID

事務是指ZK改變伺服器狀態的操作,一般包括節點的建立、刪除、資料節點內容更新和客戶端會話建立與失效。 對每一個事務請求,ZK都會分配一個全域性事務ID,ZXID來表示。

節點特性

節點型別

ZK中,每個資料節點都是有生命週期的,zk中節點可以分為持久節點、臨時節點和順序節點三大類。 可以組合形成 持久節點、持久順序節點、臨時節點、臨時順序節點

狀態資訊

stat物件標識了一個數據節點的所有狀態資訊: * czxid 標識該資料節點在被建立時的事務ID * mzxid 該資料節點最後一次被更新的事務ID * ctime 建立時的時間 * mtime 最後一次被更新的時間 * version 資料節點的版本號 * cversion 子節點的版本號 * aversion 節點的ACL版本號 * ephemeralOwner 建立該臨時節點的會話的sessionID * dataLength 資料內容的長度 * numChildren 當前節點的子節點個數 * pzxid 子節點列表最後一次被修改的事務ID

版本——保證分散式資料原子性操作

ZK為資料節點引入了版本的概念,每個資料節點都具有三中型別的版本資訊,對資料節點的任何更新操作都會引起版本號的變化。 * version 當前資料節點資料內容的版本號 * cversion 當前資料節點子節點的版本號 * aversion 當前資料節點的ACL變更版本號

ZK的版本號值得是對資料節點的修改次數,即使前後兩次變更使得資料沒有變化,版本號也會增加。

Watcher——資料變更通知

ZK提供了資料的分佈-訂閱功能。ZK中引入Watcher機制實現這種分散式的通知功能。ZK允許客戶端向服務端註冊一個監聽,當服務端的一些指定事件觸發了這個Watcher,那麼就指向客戶端傳送一個事件通知來實現分散式的通知功能。 ![IMAGE](quiver-image-url/00BAF71A323A184EC09F78D695B6DE90.jpg =684x315)

Watcher介面

在ZK中,介面類Watcher用於表示一個標準的事件處理處理器,其定義了事件通知相關的邏輯,包含KeepState和EventType兩個列舉類,分別代表通知狀態和事件型別。

Watcher 事件

 @InterfaceAudience.Public
        public enum KeeperState {
            /** Unused, this state is never generated by the server */
            @Deprecated
            Unknown (-1),

            /** The client is in the disconnected state - it is not connected
             * to any server in the ensemble. */
            Disconnected (0),

            /** Unused, this state is never generated by the server */
            @Deprecated
            NoSyncConnected (1),

            /** The client is in the connected state - it is connected
             * to a server in the ensemble (one of the servers specified
             * in the host connection parameter during ZooKeeper client
             * creation). */
            SyncConnected (3),

            /**
             * Auth failed state
             */
            AuthFailed (4),

            /**
             * The client is connected to a read-only server, that is the
             * server which is not currently connected to the majority.
             * The only operations allowed after receiving this state is
             * read operations.
             * This state is generated for read-only clients only since
             * read/write clients aren't allowed to connect to r/o servers.
             */
            ConnectedReadOnly (5),

            /**
              * SaslAuthenticated: used to notify clients that they are SASL-authenticated,
              * so that they can perform Zookeeper actions with their SASL-authorized permissions.
              */
            SaslAuthenticated(6),

            /** The serving cluster has expired this session. The ZooKeeper
             * client connection (the session) is no longer valid. You must
             * create a new client connection (instantiate a new ZooKeeper
             * instance) if you with to access the ensemble. */
            Expired (-112);

            private final int intValue;     // Integer representation of value
                                            // for sending over wire

            KeeperState(int intValue) {
                this.intValue = intValue;
            }

            public int getIntValue() {
                return intValue;
            }

            public static KeeperState fromInt(int intValue) {
                switch(intValue) {
                    case   -1: return KeeperState.Unknown;
                    case    0: return KeeperState.Disconnected;
                    case    1: return KeeperState.NoSyncConnected;
                    case    3: return KeeperState.SyncConnected;
                    case    4: return KeeperState.AuthFailed;
                    case    5: return KeeperState.ConnectedReadOnly;
                    case    6: return KeeperState.SaslAuthenticated;
                    case -112: return KeeperState.Expired;

                    default:
                        throw new RuntimeException("Invalid integer value for conversion to KeeperState");
                }
            }
        }
 ```

#序列化與協議
##使用Jute進行序列化

package com.paxos.zk.chp07;

import org.apache.jute.*; import org.apache.zookeeper.server.ByteBufferInputStream;

import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer;

public class MockRequestHeader implements Record {

private long sessionId;

private String type;

public long getSessionId() {
    return sessionId;
}

public void setSessionId(long sessionId) {
    this.sessionId = sessionId;
}

public String getType() {
    return type;
}

public void setType(String type) {
    this.type = type;
}

@Override
public String toString() {
    return "MockRequestHeader{" +
            "sessionId=" + sessionId +
            ", type='" + type + '\'' +
            '}';
}

@Override
public void serialize(OutputArchive archive, String tag) throws IOException {
    archive.startRecord(this, tag);
    archive.writeLong(sessionId, "sessionId");
    archive.writeString(type, "type");
    archive.endRecord(this, tag);
}

@Override
public void deserialize(InputArchive archive, String tag) throws IOException {

    archive.startRecord(tag);
    sessionId = archive.readLong("sessionId");
    type = archive.readString("type");
    archive.endRecord(tag);
}

public static void main(String[] args) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
    MockRequestHeader requestHeader =   new MockRequestHeader();
    requestHeader.setSessionId(-1L);
    requestHeader.setType("test");

    requestHeader.serialize(boa, "header");

    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());

    ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    MockRequestHeader header2 = new MockRequestHeader();
    header2.deserialize(bbia, "header");
    System.out.println(header2);

    bbis.close();
    baos.close();
}

}

##深入Jute
###Record介面

/** * Interface that is implemented by generated classes. * */ @InterfaceAudience.Public public interface Record { public void serialize(OutputArchive archive, String tag) throws IOException; public void deserialize(InputArchive archive, String tag) throws IOException; }

###OutputArchive 和InputArchive
提供基本的讀寫方法
###通訊協議
實現各個更新和相應的Response
#客戶端
客戶端的核心元件如下:
* ZooKeeper例項:客戶端的入口
* ClientWatchManager:客戶端watch管理器
* HostProvider:客戶端地址列表管理器
* ClientCnxn:客戶端核心執行緒,其內部包含兩個執行緒:SendThread和EventThread。前者負責ZK客戶端和服務端直接的網路IO通訊,後者負責對服務端事件進行處理。

客戶端整體初始化和啟動步驟如下:
1. 設定預設Watch
2. 設定ZK伺服器地址列表
3. 建立CLientCnxn

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException { LOG.info(“Initiating client connection, connectString=” + connectString + ” sessionTimeout=” + sessionTimeout + ” watcher=” + watcher);

    watchManager.defaultWatcher = watcher;

    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}
##一次會話的建立過程
###初始化階段
1. 初始化ZK物件
2. 設定會話預設Watcher
3. 構造ZK伺服器地址列表管理器:HostProvider
4. 建立並初始化客戶端網路聯結器:ClientCnxn
5. 初始化SendThread和EventThread
6. 啟動SendThread和EventThread
7. 獲取一個伺服器地址
8. 建立TCP連線
9. 構造ConnectRequest請求
10. 傳送請求
11. 接收服務端響應
12. 處理Response
13. 連線成功
14. 生成事件:SyncConnected-None
15. 查詢Watcher
16. 處理事件

###伺服器地址列表

/** * A parser for ZooKeeper Client connect strings. * * This class is not meant to be seen or used outside of ZooKeeper itself. * * The chrootPath member should be replaced by a Path object in issue * ZOOKEEPER-849. * * @see org.apache.zookeeper.ZooKeeper */ public final class ConnectStringParser {

private static final int DEFAULT_PORT = 2181;

private final String chrootPath;

private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
###Chroot:客戶端隔離名稱空間
Chroot特性:允許每個客戶端為自己設定一個名稱空間,如果一個ZK客戶端設定了Chroot,那麼該客戶端對伺服器的任何操作,都會被限制在其自己的名稱空間下。
客戶端可以通過在connectString中新增字尾的方式來設定Chroot。
### HostProvider:地址列表管理器
在ConnectStringParser解析器中,對伺服器地址做一個簡單的處理,並將伺服器地址和相應的埠封裝成一個InetSocketAddress物件,以List形式儲存。然後經過處理的地址列表進一步封裝到StaticHostProvider類中。
HostProvider是StaticHostProvider的介面。

@InterfaceAudience.Public public interface HostProvider { public int size();

/**
 * The next host to try to connect to.
 *
 * For a spinDelay of 0 there should be no wait.
 *
 * @param spinDelay Milliseconds to wait if all hosts have been tried once.
 * @return The next host to try to connect to with resolved address. If the host is not resolvable, the unresolved
 * address will be returned.
 */
public InetSocketAddress next(long spinDelay);

/**
 * Notify the HostProvider of a successful connection.
 * 
 * The HostProvider may use this notification to reset it's inner state.
 */
public void onConnected();

}

###StaticHostProvider

public final class StaticHostProvider implements HostProvider { public interface Resolver { InetAddress[] getAllByName(String name) throws UnknownHostException; }

private static final Logger LOG = LoggerFactory
        .getLogger(StaticHostProvider.class);

private final List<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>(5);

private int lastIndex = -1;

private int currentIndex = -1;

private Resolver resolver;
##ClientCnxn
ClientCnxn是ZK的核心工作類,負責維護客戶端與服務端之間的網路連線並進行一系列網路通訊。

/** * This class manages the socket i/o for the client. ClientCnxn maintains a list * of available servers to connect to and “transparently” switches servers it is * connected to as needed. * */ public class ClientCnxn

###Packet
Packet是ClientCnxn內部定義的一個對協議層的封裝,作為ZK中請求與相應的載體,程式碼結構如下:

/** * This class allows us to pass the headers and the relevant records around. */ static class Packet { RequestHeader requestHeader;

    ReplyHeader replyHeader;

    Record request;

    Record response;

    ByteBuffer bb;

    /** Client's view of the path (may differ due to chroot) **/
    String clientPath;
    /** Servers's view of the path (may differ due to chroot) **/
    String serverPath;

    boolean finished;

    AsyncCallback cb;

    Object ctx;

    WatchRegistration watchRegistration;

    public boolean readOnly;
其中包含,請求頭、響應頭、請求體、響應體、節點路徑和註冊的Watch。
createBB物件負責對Packet屬性進行序列化,最終生成ByteBuffer物件。

public void createBB() { try { ByteArrayOutputStream baos = new ByteArrayOutputStream(); BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); boa.writeInt(-1, “len”); // We’ll fill this in later if (requestHeader != null) { requestHeader.serialize(boa, “header”); } if (request instanceof ConnectRequest) { request.serialize(boa, “connect”); // append “am-I-allowed-to-be-readonly” flag boa.writeBool(readOnly, “readOnly”); } else if (request != null) { request.serialize(boa, “request”); } baos.close(); this.bb = ByteBuffer.wrap(baos.toByteArray()); this.bb.putInt(this.bb.capacity() - 4); this.bb.rewind(); } catch (IOException e) { LOG.warn(“Ignoring unexpected exception”, e); } }

outgoingQueue和pendingQueue
ClientCnxn中,兩個比較核心的佇列outgoingQueue和pendingQueue,分別代表客戶端的請求傳送佇列和服務端的響應等待佇列。Outgoing佇列是一個請求傳送佇列,專門使用者儲存哪些餘姚傳送到服務端的Packet集合。Pending佇列為了儲存已經從客戶端傳送到服務端,但是需要等到服務端響應的Packet集合。

/** * These are the packets that have been sent and are waiting for a response. */ private final LinkedList pendingQueue = new LinkedList();

/**
 * These are the packets that need to be sent.
 */
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
##ClientCnxnSocket:底層Socket通訊
ClientCnxnSocketNIO預設實現了ZK的底層通訊,核心是doIO邏輯。
###請求傳送
正常情況下,從outgoing佇列中提取一個可傳送的Packet物件,同時生成一個客戶端請求序號XID並將其設定到Packet請求頭中,序列化後傳送。請求傳送完成後,會立即將該Packet儲存到Pending佇列中,以便等待服務端相應返回後進行處理。
###相應接收
客戶端獲取服務端的相應資料後,根據客戶端不同的請求型別,會進行不同的處理:
* 如果檢測到當前客戶端尚未初始化,說明當前客戶端與服務端正在進行會話建立,那麼就直接將接收到的ByteBuffer序列化成ConnectResponse物件。
* 如果當前客戶端已經和服務端正常會話,冰球接收到服務端相應是一個事件,那麼ZK客戶端會將接收到的ByteBuffer序列化成WatcherEvent物件,並將該事件放入待處理佇列。
* 如果是一個常規的請求相應,那麼會從PendingQueue中取出一個Packet來進行相應的處理。ZK會檢驗服務端響應中包含的XID值來確保請求處理的順序性,然後將接收到的ByteBuffer序列化成相應的Response。

###SendThread

/** * This class services the outgoing request queue and generates the heart * beats. It also spawns the ReadThread. */ class SendThread extends ZooKeeperThread { private long lastPingSentNs; private final ClientCnxnSocket clientCnxnSocket; private Random r = new Random(System.nanoTime()); private boolean isFirstConnect = true;

###EventThread

class EventThread extends ZooKeeperThread { private final LinkedBlockingQueue waitingEvents = new LinkedBlockingQueue();

    /** This is really the queued session state until the event
     * thread actually processes the event and hands it to the watcher.
     * But for all intents and purposes this is the state.
     */
    private volatile KeeperState sessionState = KeeperState.Disconnected;

   private volatile boolean wasKilled = false;
   private volatile boolean isRunning = false;

    EventThread() {
        super(makeThreadName("-EventThread"));
        setDaemon(true);
    }
#會話
##會話建立
###session
Session是ZK中的會話實體,代表了一個客戶端會話。包含四個基本屬性:
* sessionID:會話id,用來唯一標識一個會話
* TimeOut:會話超時時間
* TickTime:下次會話超時時間。
* isClosing:標記會話是否已經被關閉

###SessionTracker
SessionTracker是ZK服務端的會話管理器,負責會話的建立、管理和清理等工作。

/** * This is the basic interface that ZooKeeperServer uses to track sessions. The * standalone and leader ZooKeeperServer use the same SessionTracker. The * FollowerZooKeeperServer uses a SessionTracker which is basically a simple * shell to track information to be forwarded to the leader. */ public interface SessionTracker { public static interface Session { long getSessionId(); int getTimeout(); boolean isClosing(); } public static interface SessionExpirer { void expire(Session session);

    long getServerId();
}
```
/**
 * This is a full featured SessionTracker. It tracks session in grouped by tick
 * interval. It always rounds up the tick interval to provide a sort of grace
 * period. Sessions are thus expired in batches made up of sessions that expire
 * in a given interval.
 */
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);

    HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();

    HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();

    ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
    long nextSessionId = 0;
    long nextExpirationTime;

    int expirationInterval;
  • sessionById:用來根絕sessionID管理session實體
  • sessionWithTimeout:根據sessionID來管理會話的超時時間。
  • sessionSets:根據下次會話超時時間點來歸檔會話,便於進行會話管理和超時檢查。

建立連線

服務端對於客戶端建立會話的請求分為四個步驟:處理ConnectRequest請求、會話建立、處理器鏈路處理、會話相應。

會話管理

分桶策略

ZK的會話管理主要由SessionTracker負責,採用一種特殊的會話管理方式,稱之為分桶策略。分桶策略是指,將類似的會話放在同一區塊中進行管理,以便於ZK對會話進行不同區塊的格里處理以及同一區塊的統一處理。

伺服器啟動

單機伺服器啟動

![IMAGE](quiver-image-url/24008D21E77CC4BD8444FE8F089EED34.jpg =967x473) ZK伺服器啟動分為五個主要步驟:配置檔案解析、初始化資料管理器、初始化網路IO、資料恢復和對外訪問。

預啟動

預啟動步驟如下: 1. 統一由QuorumPeerMain作為啟動類。無論是單機版還是叢集版,ZK都是配置QuorumPeerMain作為入口類啟動。

 /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }
  1. 解析配置檔案
 protected void initializeAndRun(String[] args)
        throws ConfigException, IOException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.servers.size() > 0) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }
  1. 建立並啟動歷史檔案清理器DatadirCleanupManager
  2. 判斷當前啟動是器群模式還是單機模式
  3. 再次解析配置檔案
  4. 建立ZK例項
public void runFromConfig(QuorumPeerConfig config) throws IOException {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer");
      try {
          ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
          cnxnFactory.configure(config.getClientPortAddress(),
                                config.getMaxClientCnxns());

          quorumPeer = getQuorumPeer();

          quorumPeer.setQuorumPeers(config.getServers());
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                  new File(config.getDataLogDir()),
                  new File(config.getDataDir())));
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
          quorumPeer.setClientPortAddress(config.getClientPortAddress());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }

          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

初始化

初始化步驟如下: 1. 建立伺服器統計器ServerStats


/**
 * Basic Server Statistics
 */
public class ServerStats {
    private long packetsSent;
    private long packetsReceived;
    private long maxLatency;
    private long minLatency = Long.MAX_VALUE;
    private long totalLatency = 0;
    private long count = 0;
    private AtomicLong fsyncThresholdExceedCount = new AtomicLong(0);
  1. 建立ZK資料管理器FileTxnSnapLog
/**
 * This is a helper class 
 * above the implementations 
 * of txnlog and snapshot 
 * classes
 */
public class FileTxnSnapLog {
    //the direcotry containing the 
    //the transaction logs
    private final File dataDir;
    //the directory containing the
    //the snapshot directory
    private final File snapDir;
    private TxnLog txnLog;
    private SnapShot snapLog;
    public final static int VERSION = 2;
    public final static String version = "version-";
  1. 設定伺服器tickTime和會話超時時間
  2. 建立ServerCnxnFactory
  3. 初始化ServerCnxnFactory
  4. 啟動ServerCnxnFactory主執行緒
  5. 恢復本地資料
  6. 建立並啟動會話管理器SessionTracker
  7. 初始化ZK的請求處理鏈
  8. 註冊KMX
  9. 註冊ZK伺服器例項

叢集版初始化

![IMAGE](quiver-image-url/D6AC260F715831A9CF7ACDC0E2F5CE03.jpg =964x540)

預啟動

  1. 統一由QuorumPeerMain作為啟動類
  2. 解析配置檔案zoo.cfg
  3. 建立並啟動歷史檔案清理器DatadirCleanupManager
  4. 判斷是叢集模式還是單機模式

初始化

public static void main(String[] args) {
        ZooKeeperServerMain main = new ZooKeeperServerMain();
        try {
            main.initializeAndRun(args);
        } catch (IllegalArgumentException e) {
            LOG.error("Invalid arguments, exiting abnormally", e);
            LOG.info(USAGE);
            System.err.println(USAGE);
            System.exit(2);
        } catch (ConfigException e) {
            LOG.error("Invalid config, exiting abnormally", e);
            System.err.println("Invalid config, exiting abnormally");
            System.exit(2);
        } catch (Exception e) {
            LOG.error("Unexpected exception, exiting abnormally", e);
            System.exit(1);
        }
        LOG.info("Exiting normally");
        System.exit(0);
    }
 /**
     * Run from a ServerConfig.
     * @param config ServerConfig to use.
     * @throws IOException
     */
    public void runFromConfig(ServerConfig config) throws IOException {
        LOG.info("Starting server");
        FileTxnSnapLog txnLog = null;
        try {
            // Note that this thread isn't going to be doing anything else,
            // so rather than spawning another thread, we will just call
            // run() in this thread.
            // create a file logger url from the command line args
            final ZooKeeperServer zkServer = new ZooKeeperServer();
            // Registers shutdown handler which will be used to know the
            // server error or shutdown state changes.
            final CountDownLatch shutdownLatch = new CountDownLatch(1);
            zkServer.registerServerShutdownHandler(
                    new ZooKeeperServerShutdownHandler(shutdownLatch));

            txnLog = new FileTxnSnapLog(new File(config.dataLogDir), new File(
                    config.dataDir));
            txnLog.setServerStats(zkServer.serverStats());
            zkServer.setTxnLogFactory(txnLog);
            zkServer.setTickTime(config.tickTime);
            zkServer.setMinSessionTimeout(config.minSessionTimeout);
            zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns());
            cnxnFactory.startup(zkServer);
            // Watch status of ZooKeeper server. It will do a graceful shutdown
            // if the server is not running or hits an internal error.
            shutdownLatch.await();
            shutdown();

            cnxnFactory.join();
            if (zkServer.canShutdown()) {
                zkServer.shutdown(true);
            }
        } catch (InterruptedException e) {
            // warn, but generally this is ok
            LOG.warn("Server interrupted", e);
        } finally {
            if (txnLog != null) {
                txnLog.close();
            }
        }
    }
  1. 建立ServerCnxnFactory
  2. 初始化ServerCnxnFactory
  3. 建立ZK的資料管理器FileTxnSnapLog
  4. 建立QuorumPeer例項
  5. 建立記憶體資料庫ZKDatabaser
  6. 初始化QuorumPeer
  7. 恢復本地資料
  8. 啟動ServerCnxnFactory

Leader選舉

  1. 初始化Leader選舉。ZK首先根據自身的SID,lastLoggedZxid和當前伺服器的epoch來生成一個初始化的投票,簡單來說就是每個伺服器都給自己投票。ZK根據zoo.cfg的electionAlg屬性來指定對應的選舉演算法。
  2. 註冊JMX服務
  3. 檢測當前伺服器狀態
  4. Leader選舉

Leader和Follower啟動期互動過程

![IMAGE](quiver-image-url/6182D5E7FBA76FFEE149FD97CB64AD3F.jpg =970x514) 1. 建立Leader伺服器和Follower伺服器 2. Leader伺服器啟動Follower接收器LearnerCnxAcceptor 3. Learner伺服器開始和Leader建立連線 4. Leader伺服器建立LearnerHandler 5. 向leader註冊 6. Leader解析Learner資訊,計算新的epoch 7. 傳送Leader狀態 8. Leaner傳送ACK訊息 9. 資料同步 10. 啟動leader和Learner伺服器

Leader選舉

伺服器啟動時期的Leader選舉

  1. 每個Server會發出一個投票

每次投票包含的最基本的元素有:所推舉的伺服器的myid和ZXID,初始化階段每個伺服器都會給自己投票(myid, zxid),然後將投票結果發給叢集中的其他機器。 2. 接收來自各個伺服器的投票

每個伺服器都會接收來自其他伺服器的投票。 3. 處理投票。 在接收來自其他伺服器的投票後,針對每一個投票,伺服器都需要將別人的投票和自己的投票PK,規則如下: * 優先檢查ZXID.ZXID比較大的優先作為Leader * 如果ZXID相同,那麼比較myid,myid較大的伺服器作為Leader

  1. 統計投票

伺服器統計局所有投票,判斷是否已經有過半(大於或等於(n/2+1))的機器接收到相同的投票資訊 5. 改變伺服器狀態

伺服器執行期間的Leader選舉

  1. 變更狀態。Leader掛掉後,其他非Observer伺服器都會將自己的伺服器狀態變更為LOOKING,然後進入選舉Leader流程
  2. 每個Server發乎投票,與氣動期間基本相同。
  3. 接受各個伺服器的投票
  4. 處理投票
  5. 統計投票
  6. 改變伺服器狀態

Leader選舉演算法

在ZK中提供了三種選舉演算法,分別是LeaderElection、UDP版本的FastLeaderElection和TCP版本的FastLeaderElection,可以通過配置檔案zoo.cfg中使用electionAlg屬性來指定。最新版本已經廢棄了其他演算法,只保留TCP版本的FastLeaderElection選舉蘇凡。

SID:伺服器ID

SID是一個數字,用來唯一標識一臺ZK叢集中的機器,每臺機器不能重複,和myid值一致。

ZXID:事務ID

ZXID是一個事務id,用來唯一標識一次伺服器狀態的變更。在某一時刻,叢集每臺機器的ZXID值不一定全都一致,這和ZK對客戶端更新請求的處理邏輯有關。

Vote:投票

Leader選舉,當叢集中的機器發現自己無法檢測到Leader機器的時候,就會開始嘗試進行投票。

Quorum:過半機器數

過半機器數 quorum = (n /2 + 1)

演算法分析

進入Leader選舉

當zk叢集中的一臺伺服器出現以下兩種情況之一時,就會開始進入Leader選舉。 * 伺服器初始化啟動 * 伺服器執行期間無法和Leader保持連線

而當一臺機器進入Leader選舉流程時,當前叢集也可能會處於以下兩種狀態: * 叢集本來就已經存在一個Leader * 叢集中確實不存在Leader

如下流程,主要是當Leader不存在的情況下,如何進行選舉: ##### 開始第一次投票 通常情況下兩種情況會導致叢集中不存在Leader,一種情況是在整個伺服器剛剛初始化啟動時,一種是當前Leader伺服器掛掉了。此時叢集所有的機器都處於一種試圖選舉出一個Leader的狀態,我們把這種狀態稱為LOOKING,意思是正在尋找Leader。當一臺伺服器處理LOOKING狀態的時候,那麼它就會向叢集中所有其他機器傳送訊息,我們稱這個訊息為投票。 在這個投票資訊中包含兩個最基本的訊息:所推舉的伺服器的SID和ZXID,分別標識被推舉的伺服器的唯一標識和事務ID,以(SID,ZXID)來標識一次投票資訊。 在第一次投票的時候,由於還無法檢測到叢集中其他機器的狀態資訊,因此每臺機器都是將自己作為被推舉的物件來進行投票。

變更投票

叢集中每臺機器發出自己的投票後,也會接收到來自叢集中其他機器的投票。每臺機器根據一定的規則,來處理收到的其他機器的投票,並以此來決定是否需要變更自己的投票。如下為基本的術語: * vote_sid: 接收到的投票中所推舉Leader伺服器的SID * vote_zxid: 接收到的投票中所推舉Leader伺服器的ZXID * self_sid:當前伺服器自己的SID * self_zxid:當前伺服器自己的ZXID

每次對於收到的投票的處理,都是對一個(vote_sid, vote_zxid) (self_sid, self_zxid)的對比過程。 * 規則1:如果vote_zxid大於self_zxid,就認可當前收到的投票,並再次將該投票傳送出去 * 規則2:如果vote_zxid小於self_zxid,那麼就堅持自己的投票,不做變更 * 規則3:如果vote_zxid等於self_zxid,那麼就對比sid,選擇sid較大者 * 規則4:如果vote_zxid等於self_zxid,並且vote_sid小於self_sid,那麼堅持自己的投票,不做變更。

確定Leader

經過第二次投票後,叢集中每臺機器都會再次收到其他機器的投票,然後統計結果,如果一臺機器收到了超過半數的相同投票,那麼這個投票對應的sid的機器則為Leader。

Leader選舉實現細節

伺服器狀態

在QuorumPeer.ServerState類中列舉了4中伺服器狀態,分別是:LOOKING、FOLLOWING、LEADING和OBSERVING。 * LOOKING:尋找Leader狀態,當伺服器處於該狀態時,它會認為當前叢集中沒有Leader,因此需要進入Leader選舉狀態。 * FOLLOWING:跟隨者狀態,表明當前伺服器角色是FOLLOWER * LEADING:領導者狀態,表明當前伺服器角色是Leader * OBSERVING:觀察者狀態,表明當前伺服器角色是OBSERVER

投票資料結構

在Leader選舉過程中主要通過投票來實現,每個投票包含兩個最基本的資訊:所推舉伺服器的SID和ZXID。


public class Vote {
    private static final Logger LOG = LoggerFactory.getLogger(Vote.class);

    public Vote(long id, 
                    long zxid) {
        this.version = 0x0;
        this.id = id;
        this.zxid = zxid;
        this.electionEpoch = -1;
        this.peerEpoch = -1;
        this.state = ServerState.LOOKING;
    }

QuorumCnxManager:網路IO

ClientCnxn是ZK客戶端中用於處理網路IO的一個管理器。每臺伺服器啟動的時候都會啟動一個QuorumCnxManager,負責每臺伺服器之間的底層Leader選舉過程中的網路通訊。

訊息佇列

  /*
     * Counter to count connection processing threads.
     */
    private AtomicInteger connectionThreadCnt = new AtomicInteger(0);

    /*
     * Mapping from Peer to Thread number
     */
    final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
    final ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
    final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;

    /*
     * Reception queue
     */
    public final ArrayBlockingQueue<Message> recvQueue;

QuprumCnxManager在內部維護了一系列的佇列,用於儲存接受到的、待發送的訊息以及訊息的傳送器。除接收佇列以外,這裡的所有佇列都是按SID分組形成佇列集合。 * recvQueue:訊息接收佇列,用於存放哪些從其他伺服器接收到的訊息 * queueSendMap:訊息傳送佇列,用於儲存那些待發送的訊息。queueSendMap是一個MAP,按照SID分組,分別為叢集中每臺機器分配了一個單獨的佇列,從而保證每臺機器之間的訊息傳送互不影響。 * senderWorkerMap:傳送器集合。每個SendWorker訊息傳送器,都對應一臺遠端ZK伺服器,負責訊息的傳送。同樣,在senderWorkerMap中,也按照SID進行分組。 * lastMessageSent:最近傳送過訊息,在這個集合中,為每個SID保留最近傳送過的一個訊息。

建立連線

 // visible for testing
    public QuorumCnxManager(final long mySid,
                            Map<Long,QuorumPeer.QuorumServer> view,
                            QuorumAuthServer authServer,
                            QuorumAuthLearner authLearner,
                            int socketTimeout,
                            boolean listenOnAllIPs,
                            int quorumCnxnThreadsSize,
                            boolean quorumSaslAuthEnabled,
                            ConcurrentHashMap<Long, SendWorker> senderWorkerMap) {
        this.senderWorkerMap = senderWorkerMap;

        this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
        this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
        this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
        String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
        if(cnxToValue != null){
            this.cnxTO = Integer.parseInt(cnxToValue);
        }

        this.mySid = mySid;
        this.socketTimeout = socketTimeout;
        this.view = view;
        this.listenOnAllIPs = listenOnAllIPs;

        initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
                quorumSaslAuthEnabled);

        // Starts listener thread that waits for connection requests 
        listener = new Listener();
    }
  /**
     * Thread to listen on some port
     */
    public class Listener extends ZooKeeperThread {

        volatile ServerSocket ss = null;

        public Listener() {
            // During startup of thread, thread name will be overridden to
            // specific election address
            super("ListenerThread");
        }

        /**
         * Sleeps on accept().
         */
        @Override
        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    ss.bind(addr);
                    while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());

                        // Receive and handle the connection request
                        // asynchronously if the quorum sasl authentication is
                        // enabled. This is required because sasl server
                        // authentication process may take few seconds to finish,
                        // this may delay next peer connection requests.
                        if (quorumSaslAuthEnabled) {
                            receiveConnectionAsync(client);
                        } else {
                            receiveConnection(client);
                        }

                        numRetries = 0;
                    }
                } catch (IOException e) {
                    LOG.error("Exception while listening", e);
                    numRetries++;
                    try {
                        ss.close();
                        Thread.sleep(1000);
                    } catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. " +
                                  "Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!shutdown) {
                LOG.error("As I'm leaving the listener thread, "
                        + "I won't be able to participate in leader "
                        + "election any longer: "
                        + view.get(QuorumCnxManager.this.mySid).electionAddr);
            }
        }

啟動QuorumCnxManager時,建立一個ServerSocket來監聽Leader選舉埠。開啟埠監聽後,ZK能夠不斷的接收到來自其他伺服器的建立連線,在接收到其他伺服器的請求時,會由receiveConnection函式來處理。為了避免兩臺機器之間重複建立TCP連線,ZK設計了一種規則:只允許最大SID的伺服器主動和其他伺服器建立連線,否則斷開連線。

 /**
     * If this server receives a connection request, then it gives up on the new
     * connection if it wins. Notice that it checks whether it has a connection
     * to this server already or not. If it does, then it sends the smallest
     * possible long value to lose the challenge.
     * 
     */
    public void receiveConnection(final Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));

            handleConnection(sock, din);
        } catch (IOException e) {
            LOG.error("Exception handling connection, addr: {}, closing server connec