1. 程式人生 > >Zookeeper原始碼閱讀(九) ZK Client-Server(1)

Zookeeper原始碼閱讀(九) ZK Client-Server(1)

前言

Watcher部分的程式碼量總的來說還是比較多的,但是整個邏輯流程還是相對來說比較清晰的。不過還是需要常在腦子裡過一過,zk的watcher的相關的架構的設計還是挺精妙的。

從這一篇起開始說ZK client端-server端互動相關的程式碼,主要是從client本身,client和server的連線和會話以及server端這三個大點來說。這一篇主要說說大致流程和client端的初始化等。

結構

在網上看到了一張圖片非常好的描述了zk工作的大致結構,對理解zk client端以至於整體的程式碼都挺有幫助的,這裡貼出來:

上圖主要描述了ZK Client和Server端互動的過程:

  1. client端把request傳遞到Zookeeper類中(以Packet形式);
  2. Zookeeper類處理request並放入outgoingqueue中(sendthread做的);
  3. sendthread把發出的request移到pendingqueue;
  4. 收到回覆後,sendthread從pendingqueue中取出request,並生成event;
  5. eventthread處理event並觸發watchManager中的watcher,呼叫callback。

Client端程式碼結構

其實client端的很多重要的類在之前說watcher,快照和log的時候就已經接觸了很多了,這裡也是系統地總結下。

其中主要幾個類的功能:

Zookeeper:客戶端核心類之一,也是入口;

ClientCnxn:客戶端連線核心類,包含SendThreadEventThread兩個執行緒。SendThread為I/O執行緒,主要負責Zookeeper客戶端和伺服器之間的網路I/O通訊;EventThread為事件執行緒,主要負責對服務端事件進行處理;

ClientWatchManager:客戶端watcher管理器;

HostProvider:客戶端地址列表管理器。

上圖是Zookeeper類及其相關的類的互動UML圖,可以通過上圖來理解下整個Zookeeper各個功能類之間的關係和協作流程。

主要流程

zk client和server端建立連線從client來說主要分為以下三個階段:

  1. 初始化階段:上面介紹的幾個主要功能類的例項化;
  2. 建立階段:啟動及建立連線;
  3. 響應請求:響應及接收。

逐個介紹:

初始化階段

從上圖中能看出,第一步就是從Zookeeper類的例項化開始,我們選取一個Zookeeper類的構造器開始分析:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
        boolean canBeReadOnly)
    throws IOException
{
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    //設定預設watcher,之前講watcher的時候說過
    watchManager.defaultWatcher = watcher;

    //負責解析配置的server地址串
    //主要有兩個功能:1.加chroot(預設prefix,之前有介紹過);2.讀字串並把多個server地址分開
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    //根據之前的字串解析hostname,ip等,並不一定會按照原來的順序,在構造器中會將順序打散
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    //例項化clientCnxn物件
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    //啟動sendThread和eventThread
    cnxn.start();
}

其實總結下可以看出整個初始化階段分為四步:

  1. 為預設Watcher賦值

  2. 解析並設定Zookeeper伺服器地址列表
  3. 例項化ClientCnxn物件
  4. 啟動clientCnxn物件裡的sendThread和eventThread執行緒。

在這裡講一下構造器中的幾個重要類:

ConnectStringParser
public final class ConnectStringParser {
    private static final int DEFAULT_PORT = 2181;//預設port

    private final String chrootPath;//預設字首

    private final ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();//地址list

ConnectStringParser的構造器很簡單,主要就是解析chrootPath和生成上面的serverAddresses地址列表。

StaticHostProvider

有一張圖很好的形容了StaticHostProvider的工作原理。

在StaticHostProvider類中呼叫next方法會在迴圈佇列中不斷獲取,特別要注意的是這個迴圈佇列本身就已經是打亂過的。

在StaticHostProvider構造器中,把前面ConnectStringParser的server地址會再次解析一遍並生成一個佇列(因為上一步解析的結果有的沒有address),然後就會如下打亂。

Collections.shuffle(this.serverAddresses);
public InetSocketAddress next(long spinDelay) {
    //這個部分主要是迴圈
    ++currentIndex;
    if (currentIndex == serverAddresses.size()) {
        currentIndex = 0;
    }
    
    //如果這一次的server地址和上一次一樣,那麼就睡眠spinDelay時間
    if (currentIndex == lastIndex && spinDelay > 0) {
        try {
            Thread.sleep(spinDelay);
        } catch (InterruptedException e) {
            LOG.warn("Unexpected exception", e);
        }
    } else if (lastIndex == -1) {//如果是第一次訪問,就不要等待
        // We don't want to sleep on the first ever connect attempt.
        lastIndex = 0;
    }

    return serverAddresses.get(currentIndex);
}

建立階段

其實在sendThread和eventThread兩個執行緒啟動之後,建立和響應階段也就開始了。具體的流程會再後面詳細說,大致的流程是先從hostprovider獲取server地址,然後建立連線並構造請求傳送。

詳細流程:

  1. 獲取伺服器地址(從hostprovider中可以獲得),並建立TCP連線;
  2. 構造ConnectRequest請求。前面的TCP連線建立後,client和server的會話並沒有完全建立。SendThread會根據響應的引數構造ConnectRequest,幷包裝成Packet物件放入outgoingqueue中傳送到server端,這就是實際意義上的client和server的一個會話。這部分在之前的watcher傳送時有提到。
  3. ClientCnxnSocket從queue中取出Packet並序列化部分屬性發送到server。

這裡先把幾個基礎且比較重要的部分說下:

sendThread

功能:

  1. 維護client和server的心跳連線,一旦失去連線會立即重連;
  2. 管理了客戶端所有的請求傳送和響應接收操作,其將上層客戶端API操作轉換成相應的請求協議併發送到服務端,並完成對同步呼叫的返回和非同步呼叫的回撥;
  3. 接受請求的返回並傳遞給eventThread去處理。

上面的圖大致描述了outgoingqueue(客戶端請求等待發送的佇列)和pendingQueue(已經發送等待響應處理的佇列)的關係。

EventThread

EventThread是客戶端ClientCnxn內部的一個事件處理執行緒,負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。EventThread中的watingEvents佇列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和非同步介面中註冊的回撥器AsyncCallback。同時,EventThread會不斷地從watingEvents中取出Object,識別具體型別(Watcher或AsyncCallback),並分別呼叫process和processResult介面方法來實現對事件的觸發和回撥。

Packet

其實之前就已經看過Packet的一些處理了,最重要的就是Packet序列化的時候createBB方法裡只有部分屬性序列化了,包括watcher在內的很多變數都沒有序列化,這也是watcher輕量特性的保證。

outgoingqueue和pendingqueue之前提到了主要的作用,而他們內部放置的物件都是Packet。在傳送時,sendThread從outgoingqueue取出Packet序列化(帶有生成的請求序號XID在請求頭中)併發送,然後這個Packet就被轉移到pendingqueue中,等待響應處理。

響應階段

同樣的,響應階段的程式碼也比較多,後面具體說,這裡說下大致流程:

  1. ClientCnxnSocket接收到響應後,首先判斷客戶端狀態是否初始化,若未初始化,那說明當前客戶端與服務端之間正在進行會話建立並反序列化response,生成ConnectResponse(帶有sessionid),然後會通知sendThread和HostProvider進行相應的設定;
  2. 如果為初始化狀態,且收到的為事件,那麼會反序列化為WatcherEvent,並放到EventThread的等待佇列中;
  3. 如果是常規的請求,如getdata,exists等,那麼會從pendingQueue中取出一個Packet來處理。

思考

Outgoingqueue, pendingQueue和EventThread的event等待佇列關係:

outgoingqueue就是所有要傳送的客戶端的請求,pendingqueue就是傳送過的等待響應的,如果客戶端收到了server端的回覆,就會從pendingqueue中取出請求Packet並處理;而event等待佇列是為了處理server段主動發起的事件,也就是節點發生了change,server主動傳送請求到客戶端,client把這類的通知放到event等待佇列中。

notification event,非notification event

客戶端需要接受伺服器傳送過來的訊息,第一種訊息是類似於Watcher回掉這種的,我們叫做notification,他的特點是伺服器主動傳送訊息給客戶端的,比如客戶端a在資料節點a上設定了getData監聽,當客戶端b修改了節點a後,伺服器主動傳送NodeDataChanged訊息給客戶端a。第二中訊息是類似於create,getData這種,他們向伺服器傳送對應的請求後,然後將請求放進到pendingQueue中,然後等待伺服器的響應,當接受到伺服器的響應後,再從pendingQueue中取出請求,然後進行回掉。

參考

https://www.cnblogs.com/francisYoung/p/5225703.html 可以多看下理解

http://www.cnblogs.com/leesf456/p/6098255.html

https://www.jianshu.com/p/cbad04b12950

《Paxos到ZK》