1. 程式人生 > >ZooKeeper原始碼學習筆記(1)--client端解析

ZooKeeper原始碼學習筆記(1)--client端解析

前言

ZooKeeper是一個相對簡單的分散式協調服務,通過閱讀原始碼我們能夠更進一步的清楚分散式的原理。

環境

ZooKeeper 3.4.9

入口函式

bin/zkCli.sh中,我們看到client端的真實入口其實是一個org.apache.zookeeper.ZooKeeperMain的Java類

"$JAVA" "-Dzookeeper.log.dir=${ZOO_LOG_DIR}" "-Dzookeeper.root.logger=${ZOO_LOG4J_PROP}" \
     -cp "$CLASSPATH" $CLIENT_JVMFLAGS $JVMFLAGS
\ org.apache.zookeeper.ZooKeeperMain "[email protected]"

通過原始碼走讀,看到在ZooKeeperMain中主要由兩部分構成

connectToZK(cl.getOption("server"));

while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
  executeLine(line);
}
  1. 構造一個ZooKeeper物件,同ZooKeeperServer進行建立通訊連線
  2. 通過反射呼叫jline.ConsoleReader
    類,對終端輸入進行讀取,然後通過解析單行命令,呼叫ZooKeeper介面。

如上所述,client端其實是對 zookeeper.jar 的簡單封裝,在構造出一個ZooKeeper物件後,通過解析使用者輸入,呼叫 ZooKeeper 介面和 Server 進行互動。

ZooKeeper 類

剛才我們看到 client 端同 ZooKeeper Server 之間的互動其實是通過 ZooKeeper 物件進行的,接下來我們詳細深入到 ZooKeeper 類中,看看其和服務端的互動邏輯。

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean
canBeReadOnly) throws IOException { ConnectStringParser connectStringParser = new ConnectStringParser(connectString); HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }

在 ZooKeeper的構造方法中,可以看到 ZooKeeper 中使用 Server 的伺服器地址構建了一個 ClientCnxn 類,在這個類中,系統新建了兩個執行緒

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();

其中,SendThread 負責將ZooKeeper的請求資訊封裝成一個Packet,傳送給 Server ,並維持同Server的心跳,EventThread負責解析通過通過SendThread得到的Response,之後傳送給Watcher::processEvent進行詳細的事件處理。

Client 時序圖

如上圖所示,Client中在終端輸入指令後,會被封裝成一個Request請求,通過submitRequest,進一步被封裝成Packet包,提交給SendThread處理。

SendThread通過doTransportPacket傳送給Server,並通過readResponse獲取結果,解析成一個Event,再將Event加入EventThread的佇列中等待執行。

EventThread通過processEvent消費佇列中的Event事件。

SendThread

SendThread 的主要作用除了將Packet包傳送給Server之外,還負責維持Client和Server之間的心跳,確保 session 存活。

現在讓我們從原始碼出發,看看SendThread究竟是如何執行的。

SendThread是一個執行緒類,因此我們進入其run()方法,看看他的啟動流程。

while (state.isAlive()) {
  if (!clientCnxnSocket.isConnected()) {
    // 啟動和server的socket連結
    startConnect();
  }
  // 根據上次的連線時間,判斷是否超時
  if (state.isConnected()) {
    to = readTimeout - clientCnxnSocket.getIdleRecv();
  } else {
    to = connectTimeout - clientCnxnSocket.getIdleRecv();
  }
  if (to <= 0) {
    throw new SessionTimeoutException(warnInfo);
  }
  // 傳送心跳包
  if (state.isConnected()) {
    if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
      sendPing();
      clientCnxnSocket.updateLastSend();
    }
  }
  // 將指令資訊傳送給 Server
  clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
}

從上面的程式碼中,可以看出SendThread的主要任務如下:
1. 建立同 Server 之間的 socket 連結
2. 判斷連結是否超時
3. 定時傳送心跳任務
4. 將ZooKeeper指令傳送給Server

與 Server 的長連結

ZooKeeper通過獲取ZOOKEEPER_CLIENT_CNXN_SOCKET變數構造了一個ClientCnxnSocket物件,預設情況下是ClientCnxnSocketNIO

String clientCnxnSocketName = System
                .getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);
if (clientCnxnSocketName == null) {
  clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();
}

ClientCnxnSocketNIO::connect中我們可以看到這裡同Server之間建立了一個socket連結。

SocketChannel sock = createSock();
registerAndConnect(sock, addr);

超時與心跳

SendThread::run中,可以看到針對連結是否建立分別有readTimeoutconnetTimeout 兩種超時時間,一旦發現連結超時,則丟擲異常,終止 SendThread

在沒有超時的情況下,如果判斷距離上次心跳時間超過了1/2個超時時間,會再次傳送心跳資料,避免訪問超時。

傳送 ZooKeeper 指令

在時序圖中,我們看到從終端輸入指令後,我們會將其解析成一個Packet 包,等待SendThread進行傳送。

ZooKeeper::create為例

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.create);
CreateRequest request = new CreateRequest();
CreateResponse response = new CreateResponse();
request.setData(data);
request.setFlags(createMode.toFlag());
request.setPath(serverPath);
if (acl != null && acl.size() == 0) {
    throw new KeeperException.InvalidACLException();
}
request.setAcl(acl);
ReplyHeader r = cnxn.submitRequest(h, request, response, null);

在這裡create指令,被封裝成了一個 CreateRequest,通過submitRequest被轉成了一個Packet

public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration);
    synchronized (packet) {
        while (!packet.finished) {
            packet.wait();
        }
    }
    return r;
}

Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration) {
    Packet packet = null;
    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    synchronized (outgoingQueue) {
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    sendThread.getClientCnxnSocket().wakeupCnxn();
    return packet;
}

submitRequest中,我們進一步看到Request被封裝成一個Packet包,並加入SendThread::outgoingQueue佇列中,等待執行。

Note:在這裡我們還看到,ZooKeeper方法中所謂的同步方法其實就是在Packet被提交到SendThread之後,陷入一個while迴圈,等待處理完成後再跳出的過程

SendThread::runwhile迴圈中,ZooKeeper通過doTransport將存放在outgoingQueue中的Packet包傳送給 Server。

void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn) {
    if (sockKey.isReadable()) {
        // 讀取response資訊
        sendThread.readResponse(incomingBuffer);
    }
    if (sockKey.isWritable()) {
        Packet p = findSendablePacket(outgoingQueue, cnxn.sendThread.clientTunneledAuthenticationInProgress());
        sock.write(p.bb);
    }
}

doIO傳送socket資訊之前,先從socket中獲取返回資料,通過readResonse進行處理。

void readResponse(ByteBuffer incomingBuffer) throws IOException {
     ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
     BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
     ReplyHeader replyHdr = new ReplyHeader();
     replyHdr.deserialize(bbia, "header");
     if (replyHdr.getXid() == -1) {
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");
        WatchedEvent we = new WatchedEvent(event);
        eventThread.queueEvent( we );
     }
}

readReponse中,通過解析資料,我們可以得到WatchedEvent物件,並將其壓入EventThread的訊息佇列,等待分發

EventThread

public void run() {
    while (true) {
        Object event = waitingEvents.take();
        if (event == eventOfDeath) {
            wasKilled = true;
        } else {
            processEvent(event);
        }
}

EventThread中通過processEvent對佇列中的事件進行消費,並分發給不同的Watcher

watch事件註冊和分發

通常在ZooKeeper中,我們會為指定節點新增一個Watcher,用於監聽節點變化情況,以ZooKeeper:exist為例

// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
    wcb = new ExistsWatchRegistration(watcher, clientPath);
}

final String serverPath = prependChroot(clientPath);

RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.exists);
ExistsRequest request = new ExistsRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
SetDataResponse response = new SetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);

程式碼的大致邏輯和create類似,但是對wathcer做了一層ExistWatchRegistration的包裝,當packet物件完成請求之後,呼叫register方法,根據不同包裝的WatchRegistration將watch註冊到不同watch列表中,等待回撥。

if (p.watchRegistration != null) {
    p.watchRegistration.register(p.replyHeader.getErr());
}

在 ZooKeeper 中一共有三種類型的WatchRegistration,分別對應DataWatchRegistration,ChildWatchRegistration,ExistWatchRegistration。 並在ZKWatchManager類中根據每種型別的WatchRegistration,分別有一張map表負責存放。

private final Map<String, Set<Watcher>> dataWatches =
            new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
            new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
            new HashMap<String, Set<Watcher>>();

EventThread::processEvent 時,根據event的所屬路徑,從三張map中獲取對應的watch列表進行訊息通知及處理。

總結

client 端的原始碼分析就到此為止了。

ZooKeeper Client 的原始碼很簡單,擁有三個獨立執行緒分別對命令進行處理,分發和響應操作,在保證各個執行緒相互獨立的基礎上,儘可能避免了多執行緒操作中出現鎖的情況。