1. 程式人生 > >Zookeeper原始碼閱讀(十一) ZK Client-Server(3)

Zookeeper原始碼閱讀(十一) ZK Client-Server(3)

前言

上一篇講了client端和server建立連線的初始化和建立過程,這兩個部分主要是和sendthread緊緊相關的,這一篇講一下響應階段,響應階段和sendthread,eventthread都有一定的關係。

獲取響應

其實獲取響應對於sendthread來說就是readresponse方法,在上一篇已經詳細講過了,主要的流程就是:

  1. 反序列化response;
  2. 根據回覆頭來處理,如果是ping,auth和sasl直接處理後返回,不會加入waitingevent佇列;
  3. 如果是server的通知表示是event,加入佇列
  4. 處理pendingqueue裡已經發送的packet。

但是這裡關於ping的處理有點需要再說下。

if (replyHdr.getXid() == -2) {//ping的response,只要能收到就表示能ping通
    // -2 is the xid for pings
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got ping response for sessionid: 0x"
                + Long.toHexString(sessionId)
                + " after "
                + ((System.nanoTime() - lastPingSentNs) / 1000000)
                + "ms");//打log就ok
    }
    return;
}
if (replyHdr.getXid() == -4) {//auth的返回頭
    // -4 is the xid for AuthPacket               
    if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {//是否是驗證失敗,如果失敗了就要加入到等待佇列裡讓eventthread處理
        state = States.AUTH_FAILED;                    
        eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                                                 Watcher.Event.KeeperState.AuthFailed, null) );                                     
    }
    if (LOG.isDebugEnabled()) {
        LOG.debug("Got auth sessionid:0x"
                  + Long.toHexString(sessionId)); //打log
    }
    return;
}

如果之前說的,ping和auth的request不會加入pendingqueue裡,收到回覆後直接處理了。這裡有個地方強調下,client端檢測心跳的機制其實readresponse這裡沒有用,這裡只是收個回覆打個log。真正控制是不是斷開連線的地方就在上一篇發請求的那裡,通過和上一次ping的時間來判斷是否過期。那怎麼去知道上一次ping通是啥時候呢,在client負責和server連線的doIO方法裡有個updateLastHeard()方法,收到server的訊息後便會執行這個方法。

void updateLastHeard() {
    this.lastHeard = now;
}

可以看到這個方法就是更新上次收到的時間的。

to = readTimeout - clientCnxnSocket.getIdleRecv();

to就是預設的readtimeout和當前請求和上一次請求的間隔時間的差值。如果處於連線狀態,則利用和上次ping時間的間隔的比較來判斷是否超時,如果超時就會丟擲異常。其實新的問題就來了,server怎麼知道自己和server處於連線還是斷開連線的狀態呢,這個後面講session再專門說一下。

加入等待佇列的event

經過前面的分析和總結可以知道sendthread把Packet傳送到server後會把部分Packet加入到pendingqueue中等待,而接收到server的回覆後會把event加入到等待佇列中處理,eventthread的主要功能就是處理這些event。首先總結下哪些event會被加入到等待佇列中。

  1. auth驗證失敗的event。
// -4 is the xid for AuthPacket               
if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { //ClientCnxn 756
    state = States.AUTH_FAILED;                    
    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
            Watcher.Event.KeeperState.AuthFailed, null) );                                  
}
  1. watcher被觸發的event
eventThread.queueEvent( we );//ClientCnxn 794
  1. sasl驗證失敗的event
// An authentication error occurred when the SASL client tried to initialize:
// for Kerberos this means that the client failed to authenticate with the KDC.
// This is different from an authentication error that occurs during communication
// with the Zookeeper server, which is handled below.
LOG.warn("SASL configuration failed: " + e + " Will continue connection to Zookeeper server without "
  + "SASL authentication, if Zookeeper server allows it.");
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1012
  Watcher.Event.EventType.None,
  Watcher.Event.KeeperState.AuthFailed, null));
  1. sasl驗證的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1094
      Watcher.Event.EventType.None,
      authState,null));
  1. 斷開連線的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn  1175 1188
        Event.EventType.None,
        Event.KeeperState.Disconnected,
        null));
  1. session過期的event
eventThread.queueEvent(new WatchedEvent(//ClientCnxn 1280
        Watcher.Event.EventType.None,
        Watcher.Event.KeeperState.Expired, null));
  1. 只讀連線的event
KeeperState eventState = (isRO) ?
        KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent( //ClientCnxn 1309
        Watcher.Event.EventType.None,
        eventState, null));

這7中event某種程度上是屬於server通知的訊息,所以必須要eventthread去處理,但是實際上sendthread很多發出去的Packet(如create,getdata等等)也會被加入到等待佇列中,但是這個是有限制的,只有在非同步模式下才會被加入到等待佇列中。

EventThread

EventThread是客戶端ClientCnxn內部的一個事件處理執行緒,負責客戶端的事件處理,並觸發客戶端註冊的Watcher監聽。EventThread中的watingEvents佇列用於臨時存放那些需要被觸發的Object,包括客戶端註冊的Watcher和非同步介面中註冊的回撥器AsyncCallback。同時,EventThread會不斷地從watingEvents中取出Object,識別具體型別(Watcher或AsyncCallback),並分別呼叫process和processResult介面方法來實現對事件的觸發和回撥。

程式碼結構:

域:

private final LinkedBlockingQueue<Object> waitingEvents =
     new LinkedBlockingQueue<Object>();

 /** This is really the queued session state until the event
  * thread actually processes the event and hands it to the watcher.
  * But for all intents and purposes this is the state.
  */
 private volatile KeeperState sessionState = KeeperState.Disconnected;

private volatile boolean wasKilled = false;
private volatile boolean isRunning = false;

這裡最重要的就是waitingEvents,這個FIFO佇列就是之前說的等待佇列。下面說下佇列處理的各種型別的資料。

server的notification加入佇列

public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None
            && sessionState == event.getState()) { //根據事件型別和狀態來判斷,如果事件型別為None且session狀態沒有變化就不加入佇列中
        return;
    }
    sessionState = event.getState(); //獲取session狀態

    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair(//構建路徑和事件(連線狀態和event狀態)的關係,之前介紹過
            watcher.materialize(event.getState(), event.getType(),
                    event.getPath()),
                    event);//根據事件型別做對應的處理
    // queue the pair (watch set & event) for later processing
    waitingEvents.add(pair);//加入佇列,等待處理
}

非同步請求的Packet加入佇列

private void finishPacket(Packet p) {
    if (p.watchRegistration != null) {
        p.watchRegistration.register(p.replyHeader.getErr());
    }

    if (p.cb == null) {//同步模式
        synchronized (p) {
            p.finished = true;
            p.notifyAll();//如果呼叫的是同步的介面,在submitRequest時會wait住,而且同步的介面沒有回撥方法,所以不會加入佇列中。
            //submitRequest裡wait住的部分
            //synchronized (packet) {
            //    while (!packet.finished) {
            //        packet.wait();
            //    }
            //}
        }
    } else {//非同步
        p.finished = true;
        eventThread.queuePacket(p);//非同步介面時把packet加入等待佇列
    }
}

上面的程式碼解釋了為什麼呼叫非同步接口才會把packet加入佇列。

public void queuePacket(Packet packet) {
   if (wasKilled) {//eventThread是否被kill
      synchronized (waitingEvents) {
         if (isRunning) waitingEvents.add(packet);//正在跑就加入佇列
         else processEvent(packet);//如果執行緒沒跑了就直接處理掉
      }
   } else {
      waitingEvents.add(packet);//加入對等佇列
   }
}

這裡有兩個變數wasKilled和isRunning解釋下,它們的操作是在eventthread的run方法中被處理的。

@Override
public void run() {
   try {
      isRunning = true;
      while (true) {
         Object event = waitingEvents.take();//取出佇列第一個元素
         if (event == eventOfDeath) {//eventOfDeath表示eventthread需要被kill
            wasKilled = true;//設定標誌,但是這裡並沒有被真正kill,表示要被kill
         } else {
            processEvent(event);//不是death標誌就處理
         }
         if (wasKilled)
            synchronized (waitingEvents) {//如果要被kill了,直到佇列被處理完了才會把isRunning狀態設定為false
               if (waitingEvents.isEmpty()) {
                  isRunning = false;
                  break;
               }
            }
      }
   } catch (InterruptedException e) {
      LOG.error("Event thread exiting due to interruption", e);
   }

    LOG.info("EventThread shut down for session: 0x{}",
             Long.toHexString(getSessionId()));
}

對於event的處理都在processevent方法中,這個方法主要處理了watcher被觸發後的執行和各個非同步介面的回撥函式這兩部分的內容。

private void processEvent(Object event) {
   try {
       if (event instanceof WatcherSetEventPair) {//watcher型別
           // each watcher will process the event
           WatcherSetEventPair pair = (WatcherSetEventPair) event;
           for (Watcher watcher : pair.watchers) {
               try {
                   watcher.process(pair.event);//執行watcher的回撥
               } catch (Throwable t) {
                   LOG.error("Error while calling watcher ", t);
               }
           }
       } else {//非同步介面的回撥
           Packet p = (Packet) event;
           int rc = 0;
           String clientPath = p.clientPath;
           if (p.replyHeader.getErr() != 0) {
               rc = p.replyHeader.getErr();
           }
           if (p.cb == null) {
               LOG.warn("Somehow a null cb got to EventThread!");
           } else if (p.response instanceof ExistsResponse
                   || p.response instanceof SetDataResponse
                   || p.response instanceof SetACLResponse) {
               ...
           } else if (p.response instanceof GetDataResponse) {
               ...
           } else if (p.response instanceof GetACLResponse) {
               ...
           } else if (p.response instanceof GetChildrenResponse) {
               ...
           } else if (p.response instanceof GetChildren2Response) {
               ...
           } else if (p.response instanceof CreateResponse) {
               ...
           } else if (p.response instanceof MultiResponse) {
                  ...
           }  else if (p.cb instanceof VoidCallback) {
               ...
           }
       }
   } catch (Throwable t) {
       LOG.error("Caught unexpected throwable", t);
   }
}

根據函式名可以清楚地知道各個非同步介面的回撥都在這裡執行了。

eventThread的death加入佇列

public void queueEventOfDeath() {
    waitingEvents.add(eventOfDeath);
}

eventthread要被kill只有兩種情況:

  1. client和server建立連線沒有連線上或者連線斷開。
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
        byte[] _sessionPasswd, boolean isRO) throws IOException {
    negotiatedSessionTimeout = _negotiatedSessionTimeout;//連線的timeout
    if (negotiatedSessionTimeout <= 0) {//沒有連線上server
        state = States.CLOSED;

        eventThread.queueEvent(new WatchedEvent(
                Watcher.Event.EventType.None,
                Watcher.Event.KeeperState.Expired, null));
        eventThread.queueEventOfDeath();//kill eventthread
  1. 客戶端和server斷開連線時
/**
 * Shutdown the send/event threads. This method should not be called
 * directly - rather it should be called as part of close operation. This
 * method is primarily here to allow the tests to verify disconnection
 * behavior.
 */
public void disconnect() {
    if (LOG.isDebugEnabled()) {
        LOG.debug("Disconnecting client for session: 0x"
                  + Long.toHexString(getSessionId()));//log
    }

    sendThread.close();//sendthread關閉
    eventThread.queueEventOfDeath();//eventthread關閉
}

思考

zk的session機制

參考

《從Paxos到Zookeeper》

http://www.cnblogs.com/leesf456/p/6098255.html

https://www.jianshu.com/p/4a1902a44439