1. 程式人生 > >zookeeper 原始碼:客戶端連線過程

zookeeper 原始碼:客戶端連線過程

可能我們直接使用 zookeeper 的機會並不多,甚至都不會直接去使用,但是 zookeeper 作為分散式協調框架,在如今分散式系統領域有著非常大的作用,很多流行框架都有使用它,如分散式治理框架 dubbo,大資料領域中的 hbase,所以瞭解 zookeeper 是非常有必要的。 此篇文章是從原始碼的角度去了解下底層是怎麼連線 zookeeper 的,肯定感覺很奇怪,一個連線操作有啥好了解的,但是你看了此篇文章,zookeeper 一個簡單的連線操作其實做了很多事情。我們在使用 zookeeper 的時候,一般都是以如下方式去連線 zookeeper 叢集:

public ZooKeeper connect
(String connStr) throws IOException { return new ZooKeeper(connStr, 3000, new Watcher() { @Override public void process(WatchedEvent watchedEvent) { if (Event.KeeperState.SyncConnected == watchedEvent.getState()) { countDownLatch.countDown(); System.
out.println("connect zk..."); } } }); }

是的,直接 new Zookeeper() 的方式去連線 zookeeper 叢集,這裡需要注意的是zookeeper 連線是非同步操作。那麼其建構函式做了什麼事情呢?

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly)
        throws IOException
{
    LOG.info
("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher); // ZKWatchManager 管理監聽器(watcher)以及處理由客戶端(ClientCnxn)產生的事件 watchManager.defaultWatcher = watcher; // 解析 zookeeper 叢集地址字串,但並不對地址進行解析 ConnectStringParser connectStringParser = new ConnectStringParser( connectString); // 解析地址 HostProvider hostProvider = new StaticHostProvider( connectStringParser.getServerAddresses()); // 建立連線物件,但是並不真正建立連線,而是在需要的時候才建立 cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, watchManager, getClientCnxnSocket(), canBeReadOnly); cnxn.start(); }

其中最重要的程式碼是最後兩行,我們先看其中第一行語句會做些什麼。

/**
 * 建立 ClientCnxn 物件(連線物件),呼叫它的建構函式之後,隨後就得呼叫它的  start() 方法 
 *
 * @param chrootPath - the chroot of this client. Should be removed from this Class in ZOOKEEPER-838
 * @param hostProvider
 *                zookeeper 叢集地址列表
 * @param sessionTimeout
 *                連線超時時間
 * @param zooKeeper
 *                同此 ClientCnxn 關聯的 Zookeeper 物件
 * @param watcher 對連線進行監聽的監聽器
 * @param clientCnxnSocket
 *                網路套接字(支援 nio / netty)
 * @param sessionId 回話 id
 * @param sessionPasswd 回話密碼
 * @param canBeReadOnly
 *                此連線只可以進行讀操作
 * @throws IOException
 */
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
            ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
            long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
    this.zooKeeper = zooKeeper;
    this.watcher = watcher;
    this.sessionId = sessionId;
    this.sessionPasswd = sessionPasswd;
    this.sessionTimeout = sessionTimeout;
    this.hostProvider = hostProvider;
    this.chrootPath = chrootPath;
	// 計算連線超時時間和讀操作超時時間
    connectTimeout = sessionTimeout / hostProvider.size();
    readTimeout = sessionTimeout * 2 / 3;
    readOnly = canBeReadOnly;
	
	// 執行緒物件,兩個都為守護執行緒
	// 設定狀態為 CONNECTING
    sendThread = new SendThread(clientCnxnSocket);
    eventThread = new EventThread();
}

可以看到此建構函式的引數眾多,我們需要關注的只有幾個,一個 Zookeeper 物件,一個代表網路套接字的 ClientCnxnSocket 物件,一個是 sessionId。其他的可以忽略。而此建構函式貌似沒有做什麼事情,就是簡單賦值,但是其中有兩個執行緒物件很終於,一個是 sendThread,一個是 eventThread,從名字上感覺 sendThread 執行緒專門負責網路的連線和讀取操作,eventThread 執行緒專門負責對事件的處理。在這裡沒有看到對這兩個執行緒進行啟動,而方法註釋有說在呼叫它之後隨後就呼叫 ClientCnxn 物件的 start() 方法。

public void start() {
    sendThread.start();
    eventThread.start();
}

這個方法就是啟動剛才說的兩個執行緒,我們首先看看 SendThread 物件的 run() 方法。

@Override
public void run() {
	// 對 ClientCnxnSocket 物件進行一些初始化操作
    clientCnxnSocket.introduce(this,sessionId);
    // 設定當前時間
    clientCnxnSocket.updateNow();
    // 設定最近傳送時間和心跳時間
    clientCnxnSocket.updateLastSendAndHeard();
    int to;
    // 最近 ping 的時間
    long lastPingRwServer = System.currentTimeMillis();
    // 時間間隔
    final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
    // state != CLOSED && state != AUTH_FAILED
    while (state.isAlive()) {
        try {
        	// 如果還沒有建立連線
            if (!clientCnxnSocket.isConnected()) {
                if(!isFirstConnect){
                		// 不是第一次建立連線的回話,先隨意睡眠一會
                    try {
                        Thread.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected exception", e);
                    }
                }
                // 不重複建立連線(關閉回話時)
                if (closing || !state.isAlive()) {
                    break;
                }
                // 敲黑板語句,開始連線
                startConnect();
                clientCnxnSocket.updateLastSendAndHeard();
            }
			// 已經建立連線
            if (state.isConnected()) {
                // determine whether we need to send an AuthFailed event.
                if (zooKeeperSaslClient != null) {
                    boolean sendAuthEvent = false;
                    if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                        try {
                            zooKeeperSaslClient.initialize(ClientCnxn.this);
                        } catch (SaslException e) {
                           LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        }
                    }
                    KeeperState authState = zooKeeperSaslClient.getKeeperState();
                    if (authState != null) {
                        if (authState == KeeperState.AuthFailed) {
                            // An authentication error occurred during authentication with the Zookeeper Server.
                            state = States.AUTH_FAILED;
                            sendAuthEvent = true;
                        } else {
                            if (authState == KeeperState.SaslAuthenticated) {
                                sendAuthEvent = true;
                            }
                        }
                    }

                    if (sendAuthEvent == true) {
                        eventThread.queueEvent(new WatchedEvent(
                              Watcher.Event.EventType.None,
                              authState,null));
                    }
                }
                to = readTimeout - clientCnxnSocket.getIdleRecv();
            } else {
                to = connectTimeout - clientCnxnSocket.getIdleRecv();
            }
            
            if (to <= 0) {
                throw new SessionTimeoutException(
                        "Client session timed out, have not heard from server in "
                                + clientCnxnSocket.getIdleRecv() + "ms"
                                + " for sessionid 0x"
                                + Long.toHexString(sessionId));
            }
            // 已建立連線
            if (state.isConnected()) {
            	//1000(1 second) is to prevent race condition missing to send the second ping
            	//also make sure not to send too many pings when readTimeout is small 
                int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                		((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                	// 傳送心跳
                    sendPing();
                    clientCnxnSocket.updateLastSend();
                } else {
                    if (timeToNextPing < to) {
                        to = timeToNextPing;
                    }
                }
            }

            // If we are in read-only mode, seek for read/write server
            if (state == States.CONNECTEDREADONLY) {
                long now = System.currentTimeMillis();
                int idlePingRwServer = (int) (now - lastPingRwServer);
                if (idlePingRwServer >= pingRwTimeout) {
                    lastPingRwServer = now;
                    idlePingRwServer = 0;
                    pingRwTimeout =
                        Math.min(2*pingRwTimeout, maxPingRwTimeout);
                    pingRwServer();
                }
                to = Math.min(to, pingRwTimeout - idlePingRwServer);
            }
			// 重點
            clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this);
        } catch (Throwable e) {
            if (closing) {
                if (LOG.isDebugEnabled()) {
                    // closing so this is expected
                    LOG.debug("An exception was thrown while closing send thread for session 0x"
                            + Long.toHexString(getSessionId())
                            + " : " + e.getMessage());
                }
                break;
            } else {
                // this is ugly, you have a better way speak up
                if (e instanceof SessionExpiredException) {
                    LOG.info(e.getMessage() + ", closing socket connection");
                } else if (e instanceof SessionTimeoutException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof EndOfStreamException) {
                    LOG.info(e.getMessage() + RETRY_CONN_MSG);
                } else if (e instanceof RWServerFoundException) {
                    LOG.info(e.getMessage());
                } else {
                    LOG.warn(
                            "Session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " for server "
                                    + clientCnxnSocket.getRemoteSocketAddress()
                                    + ", unexpected error"
                                    + RETRY_CONN_MSG, e);
                }
                cleanup();
                if (state.isAlive()) {
                    eventThread.queueEvent(new WatchedEvent(
                            Event.EventType.None,
                            Event.KeeperState.Disconnected,
                            null));
                }
                clientCnxnSocket.updateNow();
                clientCnxnSocket.updateLastSendAndHeard();
            }
        }
    } // ending while
    cleanup();
    clientCnxnSocket.close();
    if (state.isAlive()) {
        eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                Event.KeeperState.Disconnected, null));
    }
    ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                             "SendThread exitedloop.");
}

sendThread 方法體很長,各種 if 判斷,我們假設是第一次開始建立連線,那麼首先關注的一行程式碼是:startConnect(),我們看看 SendThread 類的 startConnect() 方法是怎麼開始建立連線的。

private void startConnect() throws IOException {
	// 狀態設定為 CONNECTING
    state = States.CONNECTING;

    InetSocketAddress addr;
    if (rwServerAddress != null) {
        addr = rwServerAddress;
        rwServerAddress = null;
    } else {
    	// 獲取下一個可連線的服務端
        addr = hostProvider.next(1000);
    }
	
	// 設定執行緒名
    setName(getName().replaceAll("\\(.*\\)",
            "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
    if (ZooKeeperSaslClient.isEnabled()) {
        try {
            String principalUserName = System.getProperty(
                    ZK_SASL_CLIENT_USERNAME, "zookeeper");
            zooKeeperSaslClient =
                new ZooKeeperSaslClient(
                        principalUserName+"/"+addr.getHostName());
        } catch (LoginException e) {
            // An authentication error occurred when the SASL client tried to initialize:
            // for Kerberos this means that the client failed to authenticate with the KDC.
            // This is different from an authentication error that occurs during communication
            // with the Zookeeper server, which is handled below.
            LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
              + "SASL authentication, if Zookeeper server allows it.");
            eventThread.queueEvent(new WatchedEvent(
              Watcher.Event.EventType.None,
              Watcher.Event.KeeperState.AuthFailed, null));
            saslLoginFailed = true;
        }
    }
    logStartConnect(addr);
	
	// 使用套接字建立連線
    clientCnxnSocket.connect(addr);
}

注意最後一行程式碼,真正去使用套接字建立遠端連線,這裡我們拿 nio 的實現 ClientCnxnSocketNIO 為例進行說明。

@Override
void connect(InetSocketAddress addr) throws IOException {
	// 建立 SocketChannel 
    SocketChannel sock = createSock();
    try {
    	// 往 Selector 註冊 SocketChannel,註冊的 key 為 SelectionKey.OP_CONNECT
       registerAndConnect(sock, addr);
    } catch (IOException e) {
        LOG.error("Unable to open socket to " + addr);
        sock.close();
        throw e;
    }
    initialized = false;

    /*
     * Reset incomingBuffer
     */
    lenBuffer.clear();
    incomingBuffer = lenBuffer;
}

此方法並沒有改變客戶端的連線狀態,還是 CONNECTING 狀態,因此接下來需要注意 前面 run() 方法中的程式碼是:clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue, ClientCnxn.this) 。

@Override
void doTransport(int waitTimeOut, List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue,
                 ClientCnxn cnxn)
        throws IOException, InterruptedException {
    selector.select(waitTimeOut);
    Set<SelectionKey> selected;
    synchronized (this) {
        selected = selector.selectedKeys();
    }
    // Everything below and until we get back to the select is
    // non blocking, so time is effectively a constant. That is
    // Why we just have to do this once, here
    updateNow();
    for (SelectionKey k : selected) {
        SocketChannel sc = ((SocketChannel) k.channel());
        if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
            if (sc.finishConnect()) { // 注意此處
                updateLastSendAndHeard();
                sendThread.primeConnection();
            }
        } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
            doIO(pendingQueue, outgoingQueue, cnxn);
        }
    }
    if (sendThread.getZkState().isConnected()) {
        synchronized(outgoingQueue) {
            if (findSendablePacket(outgoingQueue,
                    cnxn.sendThread.clientTunneledAuthenticationInProgress()) != null) {
                enableWrite();
            }
        }
    }
    selected.clear();
}

此方法中,如果我們是建立連線的話,有個方法呼叫需要注意,就是 sc.finishConnect(),在前面 connect() 方法中有對一個方法進行呼叫:registerAndConnect(sock, addr),它裡面配置了 SocketChannel 為非阻塞模式,並呼叫了 SocketChannel 類的 connect() 方法,**如果 SocketChannel 在非阻塞模式下,此時呼叫 connect(),該方法可能在連線建立之前就返回了。為了確定連線是否建立,可以呼叫 finishConnect() 的方法。**因此,這裡 finishConnect() 方法呼叫要麼返回 true,要麼就是丟擲異常。返回 true 的話,就說明跟服務端已經建立了連線,可以傳送資料了,我們看看 primeConnection() 方法的邏輯。

void primeConnection() throws IOException