1. 程式人生 > >Zookeeper客戶端原始碼分析(一)建立連線

Zookeeper客戶端原始碼分析(一)建立連線

本文基於zookeeper-3.4.14,由於zookeeper的很多構造方法都是呼叫的另一個構造方法,所以分析程式碼的時候直接分析最終的那個構造方法

demo程式碼如下

public class Life {

    public static void main(String[] args) throws Exception {
        new Life().start();
    }

    private void start() throws Exception {
        ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 30000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
				System.out.println("預設watch被觸發");
            }
        });
        zooKeeper.getData("/data", new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("獲取資料");
            }
        }, null);
    }
}

一、初始化流程

1.1 引數設定

//Zookeeper.java
//第438行
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                 boolean canBeReadOnly) throws IOException {
    LOG.info("Initiating client connection, connectString=" + connectString
            + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

    //客戶端新建時的watcher作為defaultWatcher,每次請求都會被呼叫
    watchManager.defaultWatcher = watcher;

    //將傳入的字串(127.0.0.1:2181/aaa)解析成具體的地址
    //'/aaa'在zookeeper中稱作chrootPath,以後此客戶端的所有請求的路徑前邊都會加上它
    //比如zooKeeper.getData("/data"),實際取的路徑是/aaa/data
    ConnectStringParser connectStringParser = new ConnectStringParser(
            connectString);
    //StaticHostProvider負責解析域名(其實就是呼叫Java的api),負責給客戶端提供服務端連線地址
    HostProvider hostProvider = new StaticHostProvider(
            connectStringParser.getServerAddresses());
    //getClientCnxnSocket()用來獲取ClientCnxnSocket的實現類,預設是ClientCnxnSocketNIO,
    //可以通過zookeeper.clientCnxnSocket引數配置實現類的全限定名
    //但是ClientCnxnSocket是同包內可見,所以你不能自己實現,只能用zookeeper提供的;或者建一個跟它一樣的包
    cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
            hostProvider, sessionTimeout, this, watchManager,
            getClientCnxnSocket(), canBeReadOnly);
    cnxn.start();
}

//ClientCnxn.java
//第386行
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;

    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;

    //處理請求和響應的執行緒
    sendThread = new SendThread(clientCnxnSocket);
    //處理事件的執行緒
    eventThread = new EventThread();
}

//ClientCnxn.java
//第420行
public void start() {
    sendThread.start();
    eventThread.start();
}

//ClientCnxn.java中的兩個無界佇列
//已經發送出去等待回覆的資料包
private final LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
//要傳送出去的資料包
private final LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();

1.2 連線zookeeper服務端

1、建立socket連線

//ClientCnxn$SendThread
//第1032行
public void run() {
    //省略部分程式碼
    
    while (state.isAlive()) {
        try {
            if (!clientCnxnSocket.isConnected()) {
                //如果狀態是還未連線成功,但是isFirstConnect為false,說明之前已經有操作在連線了,就等一會
                if(!isFirstConnect){
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                //如果連線關閉了就不再重連,直接跳出迴圈(結束執行緒)
                if (closing || !state.isAlive()) {
                    break;
                }

                //當客戶端未配置readOnly屬性,卻連線到了一臺只讀的服務端的時候,
                //後續的程式碼中會檢測可以寫的客戶端存入rwServerAddress
				//TODO:rwServerAddress有什麼用,難道會重連嗎
                if (rwServerAddress != null) {
                    serverAddress = rwServerAddress;
                    rwServerAddress = null;
                } else {
                    serverAddress = hostProvider.next(1000);
                }
                startConnect(serverAddress);
                clientCnxnSocket.updateLastSendAndHeard();
            }
            
            //省略部分程式碼
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxn$SendThread
//第992行
private void startConnect(InetSocketAddress addr) throws IOException {
    saslLoginFailed = false;
    state = States.CONNECTING;

    setName(getName().replaceAll("\\(.*\\)",
                                 "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
    //是否開啟Sasl認證,預設是開啟的
    //TODO:暫時沒搞懂這是啥玩意兒,先不管
    if (ZooKeeperSaslClient.isEnabled()) {
        try {
            zooKeeperSaslClient = new ZooKeeperSaslClient(SaslServerPrincipal.getServerPrincipal(addr));
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
                     + "SASL authentication, if Zookeeper server allows it.");
            eventThread.queueEvent(new WatchedEvent(
                Watcher.Event.EventType.None,
                Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    //記錄一下日誌
    logStartConnect(addr);

    clientCnxnSocket.connect(addr);
}

//ClientCnxnSocketNIO.java
//第284行
@Override
void connect(InetSocketAddress addr) throws IOException {
    //建立SocketChannel物件
    SocketChannel sock = createSock();
    try {
        registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

//ClientCnxnSocketNIO.java
//第259行
SocketChannel createSock() throws IOException {
    SocketChannel sock;
    sock = SocketChannel.open();
    sock.configureBlocking(false);
    sock.socket().setSoLinger(false, -1);
    sock.socket().setTcpNoDelay(true);
    return sock;
}

//ClientCnxnSocketNIO.java
//第274行
void registerAndConnect(SocketChannel sock, InetSocketAddress addr) throws IOException {
    //監聽OP_CONNECT事件
    sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
    //連線服務端
    //因為配置的是非阻塞模式,所以這裡返回false,詳情可以看SocketChannel#connect()方法的註釋
    boolean immediateConnect = sock.connect(addr);
    if (immediateConnect) {
        sendThread.primeConnection();
    }
}

2、傳送連線請求

//ClientCnxn$SendThread
//第1032行
public void run() {
    //省略部分程式碼
    
    while (state.isAlive()) {
        try {
            //這裡是剛才分析的建立socket連線的程式碼
            
            //省略部分程式碼
            
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
    //這裡很多都是java NIO的標準寫法,不熟悉的同學先去學習一下
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        //之前設定過監聽OP_CONNECT事件
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) {
                updateLastSendAndHeard();
                //這裡會把之前因為返回了false未執行的操作執行掉
                //傳送連線請求給服務端
                sendThread.primeConnection();
            }
        } 
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxn$SendThread
//第878行
void primeConnection() throws IOException {
                LOG.info("Socket connection established to "
                     + clientCnxnSocket.getRemoteSocketAddress()
                     + ", initiating session");
    isFirstConnect = false;
    long sessId = (seenRwServerBefore) ? sessionId : 0;
    ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                                               sessionTimeout, sessId, sessionPasswd);
    synchronized (outgoingQueue) {
        // We add backwards since we are pushing into the front
        // Only send if there's a pending watch
        // TODO: here we have the only remaining use of zooKeeper in
        // this class. It's to be eliminated!
        
        //上邊的註釋看不懂,先放著
        //預設disableAutoWatchReset=false,即每次請求都需要重新設定watch
        if (!disableAutoWatchReset) {
            List<String> dataWatches = zooKeeper.getDataWatches();
            List<String> existWatches = zooKeeper.getExistWatches();
            List<String> childWatches = zooKeeper.getChildWatches();
            if (!dataWatches.isEmpty()
                || !existWatches.isEmpty() || !childWatches.isEmpty()) {

                Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                long setWatchesLastZxid = lastZxid;

                while (dataWatchesIter.hasNext()
                       || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                    List<String> dataWatchesBatch = new ArrayList<String>();
                    List<String> existWatchesBatch = new ArrayList<String>();
                    List<String> childWatchesBatch = new ArrayList<String>();
                    int batchLength = 0;

                    //這裡可能有部分watch的路徑加不進去(因為總長超過限制)
                    //但是似乎開發人員認為這個無關緊要
                    while (batchLength < SET_WATCHES_MAX_LENGTH) {
                        final String watch;
                        if (dataWatchesIter.hasNext()) {
                            watch = dataWatchesIter.next();
                            dataWatchesBatch.add(watch);
                        } else if (existWatchesIter.hasNext()) {
                            watch = existWatchesIter.next();
                            existWatchesBatch.add(watch);
                        } else if (childWatchesIter.hasNext()) {
                            watch = childWatchesIter.next();
                            childWatchesBatch.add(watch);
                        } else {
                            break;
                        }
                        batchLength += watch.length();
                    }

                    //將當前所有的watch監聽的路徑傳送給服務端
                    SetWatches sw = new SetWatches(setWatchesLastZxid,
                                                   dataWatchesBatch,
                                                   existWatchesBatch,
                                                   childWatchesBatch);
                    RequestHeader h = new RequestHeader();
                    h.setType(ZooDefs.OpCode.setWatches);
                    h.setXid(-8);
                    Packet packet = new Packet(h, new ReplyHeader(), sw, null, null);
                    outgoingQueue.addFirst(packet);
                }
            }
        }

        for (AuthData id : authInfo) {
            outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                                                                OpCode.auth), null, new AuthPacket(0, id.scheme,
                                                                                                   id.data), null, null));
        }
        //TODO:為什麼設定為readonly模式
        outgoingQueue.addFirst(new Packet(null, null, conReq,
                                          null, null, readOnly));
    }
    //將監聽事件修改為讀和寫
    clientCnxnSocket.enableReadWriteOnly();
    if (LOG.isDebugEnabled()) {
        LOG.debug("Session establishment request sent on "
                  + clientCnxnSocket.getRemoteSocketAddress());
    }
}

//ClientCnxn$SendThread
//第1032行
public void run() {
    //省略部分程式碼
    
    //第二輪while迴圈,之前是第一輪
    while (state.isAlive()) {
        try {
            //這個判斷現在為false
            if (!clientCnxnSocket.isConnected()) {
                //省略部分程式碼
            }
            
            //省略部分程式碼
            
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
    //省略部分程式碼
    
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            //省略部分程式碼
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxnSocketNIO.java
//第61行
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
    throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    //因為要發請求,所以這裡省略讀的程式碼
    if (sockKey.isReadable()) {
        //省略部分程式碼
    }
    if (sockKey.isWritable()) {
        synchronized(outgoingQueue) {
            //從傳送佇列中尋找待發送的資料包
            Packet p = findSendablePacket(outgoingQueue,
                                          cnxn.sendThread.clientTunneledAuthenticationInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                        (p.requestHeader.getType() != OpCode.ping) &&
                        (p.requestHeader.getType() != OpCode.auth)) {
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    p.createBB();
                }
                //傳送資料到服務端
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount++;
                    //從傳送佇列中移除該資料包
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        synchronized (pendingQueue) {
                            //新增到等待回覆的佇列中
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                //如果傳送佇列為空,暫時關閉對寫操作的處理
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                // On initial connection, write the complete connect request
                // packet, but then disable further writes until after
                // receiving a successful connection response.  If the
                // session is expired, then the server sends the expiration
                // response and immediately closes its end of the socket.  If
                // the client is simultaneously writing on its end, then the
                // TCP stack may choose to abort with RST, in which case the
                // client would never receive the session expired event.  See
                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
                
                //這裡看不太懂,先放著
                disableWrite();
            } else {
                // Just in case
                enableWrite();
            }
        }
    }
}

3、接收服務端的連接回復

//ClientCnxn$SendThread
//第1032行
public void run() {
    //省略部分程式碼
    
    while (state.isAlive()) {
        try {
            //省略部分程式碼
            
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxnSocketNIO.java
//第346行
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                     ClientCnxn cnxn)
            throws IOException, InterruptedException {
    //省略部分程式碼
    
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            //省略部分程式碼
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
        
        //省略部分程式碼
    }
    
    //省略部分程式碼
}

//ClientCnxnSocketNIO.java
//第61行
void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue, ClientCnxn cnxn)
    throws InterruptedException, IOException {
    SocketChannel sock = (SocketChannel) sockKey.channel();
    if (sock == null) {
        throw new IOException("Socket is null!");
    }
    if (sockKey.isReadable()) {
        int rc = sock.read(incomingBuffer);
        if (rc < 0) {
            throw new EndOfStreamException(
                "Unable to read additional data from server sessionid 0x"
                + Long.toHexString(sessionId)
                + ", likely server has closed socket");
        }
        if (!incomingBuffer.hasRemaining()) {
            incomingBuffer.flip();
            if (incomingBuffer == lenBuffer) {
                //省略部分程式碼
            } else if (!initialized) {
                readConnectResult();
                enableRead();
                if (findSendablePacket(outgoingQueue,
                                       cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                    // Since SASL authentication has completed (if client is configured to do so),
                    // outgoing packets waiting in the outgoingQueue can now be sent.
                    enableWrite();
                }
                lenBuffer.clear();
                incomingBuffer = lenBuffer;
                updateLastHeard();
                initialized = true;
            } else {
                //省略部分程式碼
            }
        }
    }
    if (sockKey.isWritable()) {
         //省略部分程式碼
    }
}

//ClientCnxnSocket.java
//第118行
void readConnectResult() throws IOException {
    if (LOG.isTraceEnabled()) {
        StringBuilder buf = new StringBuilder("0x[");
        for (byte b : incomingBuffer.array()) {
            buf.append(Integer.toHexString(b) + ",");
        }
        buf.append("]");
        LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
                  + buf.toString());
    }
    ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ConnectResponse conRsp = new ConnectResponse();
    conRsp.deserialize(bbia, "connect");

    // read "is read-only" flag
    boolean isRO = false;
    try {
        isRO = bbia.readBool("readOnly");
    } catch (IOException e) {
        // this is ok -- just a packet from an old server which
        // doesn't contain readOnly field
        LOG.warn("Connected to an old server; r-o mode will be unavailable");
    }

    this.sessionId = conRsp.getSessionId();
    sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                           conRsp.getPasswd(), isRO);
}

//ClientCnxn$SendThread
//第1271行
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
    //session的超時時間需要客戶端跟服務端協商,以服務端的返回值為準
    negotiatedSessionTimeout = _negotiatedSessionTimeout;
    if (negotiatedSessionTimeout <= 0) {
        state = States.CLOSED;

        eventThread.queueEvent(new WatchedEvent(
            Watcher.Event.EventType.None,
            Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();

        String warnInfo;
        warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
            + Long.toHexString(sessionId) + " has expired";
        LOG.warn(warnInfo);
        throw new SessionExpiredException(warnInfo);
    }
    if (!readOnly && isRO) {
        LOG.error("Read/write client got connected to read-only server");
    }
    readTimeout = negotiatedSessionTimeout * 2 / 3;
    connectTimeout = negotiatedSessionTimeout / hostProvider.size();
    hostProvider.onConnected();
    sessionId = _sessionId;
    sessionPasswd = _sessionPasswd;
    //設定客戶端狀態為已連線
    state = (isRO) ?
        States.CONNECTEDREADONLY : States.CONNECTED;
    seenRwServerBefore |= !isRO;
    LOG.info("Session establishment complete on server "
             + clientCnxnSocket.getRemoteSocketAddress()
             + ", sessionid = 0x" + Long.toHexString(sessionId)
             + ", negotiated timeout = " + negotiatedSessionTimeout
             + (isRO ? " (READ-ONLY mode)" : ""));
    KeeperState eventState = (isRO) ?
        KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
    //釋出一個事件
    eventThread.queueEvent(new WatchedEvent(
        Watcher.Event.EventType.None,
        eventState, null));
}

//ClientCnxn$EventThread
//第466行
public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None
        && sessionState == event.getState()) {
        return;
    }
    sessionState = event.getState();

    //根據事件尋找符合的watch
    WatcherSetEventPair pair = new WatcherSetEventPair(
        watcher.materialize(event.getState(), event.getType(),
                            event.getPath()),
        event);
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);
}

//ClientCnxn$EventThread
//第500行
public void run() {
    try {
        isRunning = true;
        while (true) {
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
                wasKilled = true;
            } else {
                processEvent(event);
            }
            if (wasKilled)
                synchronized (waitingEvents) {
                if (waitingEvents.isEmpty()) {
                    isRunning = false;
                    break;
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}

//ClientCnxn$EventThread
//第526行
private void processEvent(Object event) {
    try {
        if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
                try {
                    watcher.process(pair.event);
                } catch (Throwable t) {
                    LOG.error("Error while calling watcher ", t);
                }
            }
        } else {
            //省略部分程式碼
        }
    } catch (Throwable t) {
        LOG.error("Caught unexpected throwable", t);
    }
}

至此,連線的程式碼分析完畢

但是程式碼中還有一些地方看不懂,後續隨著深入搞明白的話回