1. 程式人生 > >Zookeeper 原始碼(三)Zookeeper 客戶端原始碼

Zookeeper 原始碼(三)Zookeeper 客戶端原始碼

Zookeeper 原始碼(三)Zookeeper 客戶端原始碼

Zookeeper Client

Zookeeper 客戶端由以下幾個核心元件組成:

說明
Zookeeper Zookeeper 客戶端入口
ClientWatchManager 客戶端 Watcher 管理器
HostProvider 客戶端地址列表管理器
ClientCnxn 客戶端核心執行緒,其內部又包含兩個執行緒,即 SendThread 和 EventThread。前者是一個 IO 執行緒,主要負責 ZooKeeper 客戶端和服務端之間的網路通訊;後者是一個事件執行緒,主要負責對服務端事件進行處理。
ClientCnxnSocketNetty 最底層的通訊 netty

客戶端整體結構如下圖:

Zookeeper 客戶端類圖

一、Zookeeper

客戶端在構造階段建立 ClientCnxn 與服務端連線,後續命令都是通過 ClientCnxn 傳送給服務端。ClientCnxn 是客戶端與服務端通訊的底層介面,它和 ClientCnxnSocketNetty 一起工作提供網路通訊服務。

服務端是 ZookeeperServer 類,收到 ClientCnxn 的請求處理後再通過 ClientCnxn 返回到客戶端。

ClientCnxn 連線時可以同時指定多臺伺服器地址,根據指定的演算法連線一臺伺服器,當某個伺服器發生故障無法連線時,會自動連線到其他的伺服器。實現這一機制的是 StaticHostProvider 類。

(1) 客戶端使用:

ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1", 5000, new Watcher() {
    @Override
    public void process(WatchedEvent event) {
        if (event.getState() == Event.KeeperState.SyncConnected) {
            if (type == Event.EventType.None) {
                // 如果連線建立成功才能繼續執行
                countDownLatch.countDown();
            }
        }
    }
});
countDownLatch.await();

zooKeeper.create(
        "/testRoot",                  // 節點路徑,不允許遞迴建立節點
        "testRoot".getBytes(),        // 節點內容
        ZooDefs.Ids.OPEN_ACL_UNSAFE,  // 節點許可權,一般情況下不用關注
        CreateMode.PERSISTENT);       // 節點型別
}

(2) ZooKeeper 建立

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly,
        HostProvider aHostProvider) throws IOException {
    
    // 1. watcher 儲存在 ZKWatchManager 的 defaultWatcher 中,作為整個會話的預設 watcher
    watchManager = defaultWatchManager();
    watchManager.defaultWatcher = watcher;
   
    // 2. 解析 server 獲取 IP 以及 PORT
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    hostProvider = aHostProvider;

    // 3. 建立 ClientCnxn 例項
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
    cnxn.seenRwServerBefore = true; // since user has provided sessionId
    // 4. 啟動 SendThread 和 EventThread 執行緒,這兩個執行緒均為守護執行緒
    cnxn.start();
}

建立底層通訊 ClientCnxnSocketNIO 或 ClientCnxnSocketNetty

public static final String ZOOKEEPER_CLIENT_CNXN_SOCKET = "zookeeper.clientCnxnSocket";
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {
    String clientCnxnSocketName = System
            .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
    if (clientCnxnSocketName == null) {
        clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
    }
    try {
        return (ClientCnxnSocket) Class.forName(clientCnxnSocketName)
                .newInstance();
    } catch (Exception e) {
        IOException ioe = new IOException("Couldn't instantiate "
                + clientCnxnSocketName);
        ioe.initCause(e);
        throw ioe;
    }
}

(3) ClientCnxn 建立

Packet | 所有的請求都會封裝成 packet
outgoingQueue | 即將傳送的請求 packets
pendingQueue | 已經發送等待響應的 packets

ClientCnxn 建立時建立了兩個執行緒 SendThread 和 EventThread,這兩個執行緒都是守護執行緒,主執行緒結束時即關閉執行緒。

public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
        ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
        long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();
}

SendThread(ClientCnxnSocket clientCnxnSocket) {
    super(makeThreadName("-SendThread()"));
    state = States.CONNECTING;
    this.clientCnxnSocket = clientCnxnSocket;
    setDaemon(true);
}

EventThread() {
    super(makeThreadName("-EventThread"));
    setDaemon(true);
}