1. 程式人生 > >zookeeper源碼之客戶端

zookeeper源碼之客戶端

服務端 run t對象 成對 bool .com 操作 code 分享

  zookeeper自身提供了一個簡易的客戶端。主要包括一下幾個模塊:

  1.啟動模塊。

  2.核心執行模塊。

  3.網絡通信模塊。

啟動模塊

  啟動程序,接收和解析命令行。詳見zookeeper源碼之客戶端啟動模塊。

核心執行模塊

  客戶端操作ZooKeeper服務端的核心類,詳見zookeeper源碼之客戶端核心執行模塊。    

類圖

技術分享圖片

ZooKeeper

  ZooKeeper是客戶端操作ZooKeeper服務端的核心類。當用戶向ZooKeeperMain執行相關命令時,最終會交給ZooKeeper執行,其會將用戶請求封裝成對象,然後發送到服務端。內部使用ClientCnxn來提供與服務端的通信。 請求數據會被封裝成RequestHeader、Request對象,相應的返回結果會存儲在Response,ReplyHeader對象。

public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());
        final String serverPath = prependChroot(clientPath);
        RequestHeader h 
= new RequestHeader(); h.setType(ZooDefs.OpCode.create); CreateRequest request = new CreateRequest(); CreateResponse response = new CreateResponse(); request.setData(data); request.setFlags(createMode.toFlag()); request.setPath(serverPath); if (acl != null
&& acl.size() == 0) { throw new KeeperException.InvalidACLException(); } request.setAcl(acl); ReplyHeader r = cnxn.submitRequest(h, request, response, null); if (r.getErr() != 0) { throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath); } if (cnxn.chrootPath == null) { return response.getPath(); } else { return response.getPath().substring(cnxn.chrootPath.length()); } }

ClientCnxn

  為客戶端發送請求到服務端,管理底層IO連接。 將用戶調用的請求對象(RequestHeader、Request)封裝成Packet對象,存入發送隊列。內部有一個線程會不斷讀取發送隊列中的Packet對象,通過NIO將Packet對象發送到服務端,然後將Packet對象放入pending隊列,該線程會不斷讀取服務端的返回信息,並且將結果設置到Packet對象的Response,ReplyHeader對象中。

//等待發送的數據包隊列
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
//發送後等待結果的數據包隊列
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();

class
SendThread extends Thread { boolean doIO() throws InterruptedException, IOException { ...if (!outgoingQueue.isEmpty()) { ByteBuffer pbb = outgoingQueue.getFirst().bb; sock.write(pbb); if (!pbb.hasRemaining()) { sentCount++; Packet p = outgoingQueue.removeFirst(); if (p.header != null && p.header.getType() != OpCode.ping && p.header.getType() != OpCode.auth) { pendingQueue.add(p); } } } } ... }      ... @Override public void run() { ...while (zooKeeper.state.isAlive()) { ...if (doIO()) { lastHeard = now; } ... }
...
}

zookeeper源碼之客戶端