.

服務端有3種執行方式:leader,follower,observer。leader是領導者,一個ZooKeeper叢集同一時刻最多隻能有一個leader。follower是跟隨者,可以有多個跟隨者。Observer是觀察者,也可以有多個觀察者。

叢集剛開始的時候沒有leader,這時所有的follower會發起選舉過程,選舉出唯一一個leader。只有選舉出了leader之後叢集才能正式工作,被選舉出的機器的角色則從follower轉換成leader,其餘的機器則將自己的leader地址更改為新選舉出的leader地址。observer不參與選舉過程。

正常的叢集對外提供統一服務介面,不管leader、follower、還是observer都可以提供對外服務,對於客戶端來說他們是沒有區別的。

服務端功能可以分成兩大類:選舉相關的和服務相關的,我們先看服務相關的。

服務相關的專門用於響應ZooKeeper客戶端的請求,類名稱為ZooKeeperServer。它又分成3個子類,名稱為:LeaderZooKeeperServer,FollowerZooKeeperServer,ObserverZooKeeperServer。顧名思義它們分別在Leader,Follower,Observer時使用。

當LeaderZooKeeperServer收到客戶端寫事務請求時,先將請求交給Leader類來發起整個叢集的寫事務流程,這時候Leader類給所有Follower傳送PROPOSAL,要求Follower將寫請求持久化到本地磁碟以防丟失,Follower持久化後回覆ACK資訊給Leader,當Leader收集到超過半數的ACK回覆時,給Follower傳送COMMIT包提交事務執行,最終Leader和Follower都會執行該事務。當然最後是由LeaderZooKeeperServer返回結果給客戶端的。

當FollowerZooKeeperServer收到客戶端寫事務請求時,會呼叫Follower類傳送REQUEST訊息給Leader。Leader再給所有Follower傳送PROPOSAL發起寫事務流程,後面的流程和Leader模式下類似。

對於寫操作來說,都是要交給leader然後再開始後續流程的。而對於讀操作來說,由於叢集提供了分散式一致性儲存,就由本地服務端提供服務,而不用區別服務端的角色。

1、怎麼啟動

服務端的啟動是從QuorumPeerMain開始的,它又幹了什麼事呢?

QuorumPeerMain讀取配置資訊然後啟動QuorumPeer,配置資訊由QuorumPeerConfig類負責讀取。然後在ZooKeeperServerMain方法中啟動ZooKeeperServer並等待關閉事件,ZooKeeperServer又啟動ServerCnxn負責和客戶端的網路通訊。

2、ZooKeeperServer

ZooKeeperServer是服務端基類,它使用NettyServerCnxn作為底層Socket通訊機制,NettyServerCnxn收到客戶端請求之後會自動呼叫ZooKeeperServer的prorcessPacket方法來執行客戶端命令,併發送響應包給客戶端。

ZooKeeper服務端定義了3種子類:LeaderZooKeeperServer、FollowerZooKeeperServer和ObserverZooKeeperServer。它們分別用於Leader,Follower和Observer。

ZooKeeper客戶端連線服務端時,可能連線的是Leader,也可能連線的是Follower或者Observer。連線到不同型別的服務端它的命令請求流程是不太一樣的,特別是寫事務請求。

如果連線的是Leader,則Leader會將寫請求生成Proposal包廣播到Follower,通知Follower有寫事務要執行,這時Follower先將寫事務請求儲存到磁碟,然後給Leader傳送ACK確認。Leader在超過半數確認的情況下才會真正開始執行寫命令,寫命令完成後Leader再給Follower傳送COMMIT命令,通知Follower寫操作已完成,這時Follower也會接著呼叫CommitProcessor並且最後呼叫FinalRequestProcessor執行同樣的寫命令,全部完成後Leader和Follower仍然保持相同的ZooKeeper事務,保持叢集資料一致性。

讀操作則不需要這麼複雜,客戶端不管連線的是Leader還是Follower,它傳送的讀命令都會直接執行並返回結果給客戶端,因此讀操作的併發效能要高很多。

ZooKeeper客戶端和服務端之間資料互動流程示意圖如下:

 

它們之間採用Netty框架作為底層Socket連線,NettyServerFactory是服務端連線管理池,為每個客戶端建立一個NettyServer,NettyServer接收到客戶端命令請求之後會呼叫ZooKeeperServer的processPacket方法處理該客戶端請求。

2.1 processPacket

當ZooKeeperServer的底層NettyServerCnxn接收到客戶端的Request時,該方法被NettyServerCnzx的主接收執行緒呼叫。ZooKeeperServer然後開始處理Request。

該方法的主要程式碼:

public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
    // We have the request, now process and setup for next
    InputStream bais = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
    RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                  h.getType(), incomingBuffer, cnxn.getAuthInfo());
    si.setOwner(ServerCnxn.me);
    setLocalSessionFlag(si);
submitRequest(si);
}

最後一行的submitRequest方法呼叫firstProcessor開始處理Request,firstProcessor是處理請求包的入口方法,在Leader/Follower等不同的模式下其firstProcessor是不同的。主要程式碼如下:

public void submitRequest(Request si) {
    if (firstProcessor == null) {
        touch(si.cnxn);
        boolean validpacket = Request.isValid(si.type);
        if (validpacket) {
            firstProcessor.processRequest(si);
            if (si.cnxn != null) {
                incInProcess();
            }
         } 
    }
}

2.2 processTxn

處理客戶端的寫操作請求,通過ZKDatabase類實現具體邏輯。

該方法由FinalRequestProcessor呼叫,客戶端讀操作不在這裡實現,而是放在FinalRequestProcessor中直接程式碼實現。當然讀操作最終也是通過ZkDatabase的相關方法實現具體邏輯的。

處理寫操作請求,寫操作涉及到叢集狀態同步,和讀操作有很大的區別,寫操作提供了統一的入口方法processTxn來處理。

該方法的主要程式碼如下:

private ProcessTxnResult processTxn(Request request,TxnHeader hdr, Record txn) {
    ProcessTxnResult rc;
    int opCode = request != null ? request.type : hdr.getType();
    long sessionId = request != null ? request.sessionId : hdr.getClientId();
    if (hdr != null) {
        rc = getZKDatabase().processTxn(hdr, txn);
    } else {
        rc = new ProcessTxnResult();
    }
    if (opCode == OpCode.createSession) {
        if (hdr != null && txn instanceof CreateSessionTxn) {
           CreateSessionTxn cst = (CreateSessionTxn) txn;
           sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
        } else if (request != null && request.isLocalSession()) {
           request.request.rewind();
           int timeout = request.request.getInt();
           request.request.rewind();
           sessionTracker.addSession(request.sessionId, timeout);
        } else {
           LOG.warn("*****>>>>> Got " + txn.getClass() + " " + txn.toString());
        }
    } else if (opCode == OpCode.closeSession) {
      sessionTracker.removeSession(sessionId);
    }
    return rc;
}

2.3 NettyServerCnxn

NettyServerCnxn是ZooKeeper客戶端和服務端底層通訊的介面,採用Netty元件作為Socket介面。客戶端提交的命令會通過NettyServerCnxn到達服務端,然後觸發NettyServerCnxn中的receiveMessage方法,該方法再呼叫ZooKeeperServer的processPacket方法處理接收到的請求包。其中程式碼片段如下:

public void receiveMessage(ChannelBuffer message) {
     if (initialized) {
           zks.processPacket(this, bb);
           if (zks.shouldThrottle(outstandingCount.incrementAndGet())) {
                 disableRecvNoWait();
           }
      } else {
           zks.processConnectRequest(this, bb);
           initialized = true;
      }
 }

當客戶端第一次連線到服務端時,會呼叫ZooKeeperServer的processConnectRequest方法處理;其他情況下會呼叫ZooKeeperServer的processPacket方法處理客戶端請求。具體的處理邏輯可參見ZooKeeperServer及其子類(LeaderZooKeeperServer、FollowerZooKeeperServer、ObserverZooKeeperServer)