1. 程式人生 > >zookeeper原始碼閱讀分析筆記--客戶端服務端通訊機制以及session超時、過期處理

zookeeper原始碼閱讀分析筆記--客戶端服務端通訊機制以及session超時、過期處理

    這兩天看了一下zookeeper的相關的原始碼,版本基於3.4.5,程式碼結構還是比較清晰的;
這裡重點分析一下zookeeper client和server端之間的通訊以及相關的異常處理機制。  
1、客戶端
  客戶端幾個主要的類為Zookeeper、ClientCnxn、SendThread、ClientCnxnSocketNIO。
客戶端通過Zookeeper相關的API和server進行同步或者非同步通訊,包括create、delete、exists、getChildren、getData、setData、sync等,
在實現中zookeeper可以呼叫ClientCnxn.submit同步等待返回結果或者呼叫ClientCnxn.queuePacket把訊息放到outgoing佇列中非同步傳送並提供callback進行非同步回撥;
ClientCnxn負責管理client端的socket IO,維護zookeeper server列表,透明的在server之間連線切換;ClientCnxn有一個SendThread後臺執行緒,
負責對outgoing queue中的訊息傳送和server端返回訊息接收;ClientCnxnSocketNIO代表一個server端的NIO socket 長連線連線,負責和server端底層進行通訊。
SendThread服務於queueQueue 進行package的傳送接收,並啟動後臺執行緒心跳建立連線;pendingQueue用於存放已經發送出去,未回覆的包,在收到回覆後,
從pendingQueue佇列中刪除;SendThread從socket讀取服務端返回的結果後,通過readResponse對訊息進行處理,根據返回的replyHeader中xid進行相應的後續處理,
當xid=-2時候,表示這是一個ping的返回,當xid=-4,代表這是一個auth 的返回,當xid=-1時候,代表這是一個zookeeper節點變化導致的通知watch執行的訊息返回,
返回的訊息用watchEvent包裝,傳送到EventThread中的waitingEvents佇列中,EventThread後臺執行緒從佇列中拉取訊息執行watcher中的process邏輯。

   客戶端和服務端之間的NIO socket連線模型這裡就不多說了,以前的NIO 系列blog中對這些有比較多的闡述,
這裡說一下客戶端和服務端的長連線的session失效、超時、連線斷掉的問題,客戶端又是如何處理的;
ClientCnxn包括這幾個變數,
private int connectTimeout;
private volatile int negotiatedSessionTimeout;
private int readTimeout;
private final int sessionTimeout;在client連線到server後,server返回給client確認資訊(包括伺服器返回給客戶端的真實的timeout時間--negotiatedSessionTimeout),
client read結果(SendThread.run-->ClientCnxnSocketNIO.doTransport-->doIO),設定相關的timeout引數。
在這裡初始化,
   sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
比如,
           readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
這些timeout代表client端在超時的這段時間裡,沒有讀到從server端返回的訊息(比如傳送ping 資料到server,server給返回資訊)
在ClientCnxnSocketNIO.doTransport中,進行select(waitTimeOut)操作,先updateNow
如果有socket可以讀資料,則讀資料後,updateLastRec,沒資料可讀的話(不更新updateLastRec),下次SendThread.run迴圈,就可能會出現讀超時。
在SendThread.run迴圈中判斷是否超時
                           to = readTimeout - clientCnxnSocket.getIdleRecv();//(now - lastHeard);
                    } else {
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    if (to <= 0) {
//超時
                        throw new SessionTimeoutException(
                                "Client session timed out, have not heard from server in "
                                        + clientCnxnSocket.getIdleRecv() + "ms"
                                        + " for sessionid 0x"
                                        + Long.toHexString(sessionId));
                    }
client不斷的傳送sendPing()心跳,以維持在server端的session有效。
   if (state.isConnected()) {
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
在SendThread.run迴圈中,client傳送sendPing()心跳,以維持在server端的session有效。
   if (state.isConnected()) {
                        int timeToNextPing = readTimeout / 2
                                - clientCnxnSocket.getIdleSend();
                        if (timeToNextPing <= 0) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
如果客戶端訪問服務端而服務端認為客戶端已經超時了或者服務端宕機時,客戶端會呼叫SendThread.cleanup操作,銷燬sockect,
把sockKey設定為null,這樣在SendThread.run的while迴圈中會判斷isConn,進而重新連線一個server.
參見SendThread.run中的異常處理部分
      if (e instanceof SessionExpiredException) {//從服務端丟擲的
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {//客戶端丟擲的
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        cleanup();//在cleanup中銷燬sockect,把sockKey設定為null,這樣在SendThread.run的while迴圈中會判斷isConn,進而重新連線一個server.
         } 
ClientWatchManager管理客戶端所有得watcher,並進行分類,dataWatches、existWatches、childWatches。


2、服務端
NIOServerCnxnFactory,服務端進行NIO操作(Select,Channel)的類,每建立一個客戶端連線,就生成一個NIOServerCnxn,
這裡說一下session的超時時間設定,
socket channel收到訊息建立連線請求的時候,NIOServerCnxn.readPayload-->NIOServerCnxnreadConnectRequest()--->zkServer.processConnectRequest(this, incomingBuffer);
ZookeeperServer,可見如果客戶端發來的sessionTimeout超過min-max這個範圍,server會自動擷取為min或max
   minSessionTimeout 單位毫秒。預設2倍tickTime
   maxSessionTimeout 單位毫秒。預設20倍tickTime
  (tickTime也是一個配置項。是Server內部控制時間邏輯的最小時間單位)

public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
        int minSessionTimeout = getMinSessionTimeout();
        if (sessionTimeout < minSessionTimeout) {
            sessionTimeout = minSessionTimeout;
        }
        int maxSessionTimeout = getMaxSessionTimeout();
        if (sessionTimeout > maxSessionTimeout) {
            sessionTimeout = maxSessionTimeout;
        }
        cnxn.setSessionTimeout(sessionTimeout);
SessionTracker儲存客戶端session的列表,判斷session是否過期,一種是在處理服務端請求的packg,一種是LearnerHandler.run後臺執行緒執行,
會呼叫SessionTracker.touchSession(ServerCnxn cnxn)進行判斷是否超時間
   void touch(ServerCnxn cnxn) throws MissingSessionException {
        if (cnxn == null) {
            return;
        }
        long id = cnxn.getSessionId();
        int to = cnxn.getSessionTimeout();
        if (!sessionTracker.touchSession(id, to)) {//無論client是重連,還是其他,超過session timeout時候沒連上,就表示session過期了
            throw new MissingSessionException(
                    "No session with sessionid 0x" + Long.toHexString(id)
                    + " exists, probably expired and removed");
        }
    }

如果session過期就刪除session資訊,包括這個會話建立的臨時節點和註冊的Watcher