1. 程式人生 > >ZooKeeper中的會話機制

ZooKeeper中的會話機制

在本文中將對zk的會話機制進行總結

相關的類

  • SessionTracker
  • SessionTrackerImpl

會話狀態

常見的幾種會話狀態如下:

  • CONNECTING,正在連線
  • CONNECTED, 已連線
  • RECONNECTING,正在重連
  • RECONNECTED,已重連
  • CLOSE,會話關閉

連線建立的初始化階段,客戶端的狀態會變成CONNECTING,同時客戶端會從伺服器地址列表中隨機獲取一個ip地址嘗試進行網路連線,知道成功建立連線,這時候,客戶端的狀態就會變成CONNECTED。但是,在通常情況下,由於網路的不可靠性,時常會伴隨這網路中斷的出現,這時候,客戶端和服務端之間會出現斷開連線的情況,一旦出現這種情況,客戶端會嘗試去重新連線服務端,這時,客戶端的狀態會再一次變成CONNECTING,直到重新連線上伺服器後,客戶端的狀態又會再次變成CONNECTED。

因此,在通常情況下,客戶端的會話狀態始終在CONNECTED和CONNECTING之間變化。

出現CLOSE的情況:

  • 會話超時
  • 許可權檢查失敗
  • 客戶端主動退出

會話屬性

會話session是ZooKeeper中的會話實體,代表了一個客戶端的會話。其定義在org.apache.zookeeper.server.SessionTracker.Session中。介面的定義如下:

public interface SessionTracker {
    public static interface Session {
        long getSessionId();
        int
getTimeout(); boolean isClosing(); } }

其實現類為org.apache.zookeeper.server.SessionTrackerImpl.SessionImpl,程式碼如下:

public static class SessionImpl implements Session {
        final long sessionId;
        final int timeout;
        boolean isClosing;
        Object owner;

        public
long getSessionId() { return sessionId; } public int getTimeout() { return timeout; } public boolean isClosing() { return isClosing; } }

從上述程式碼中可以看出,Session主要由以下三個屬性組成:

  • sessionId,這是一個64位的long型整數,代表一個唯一的會話。每次客戶端建立會話的時候,ZooKeeper都會為其分配一個全域性唯一的一個sessionId。
  • timeout,會話的超時時間。客戶端在構造ZooKeeper例項的時候,會為本次會話配置一個會話的超時時間。客戶端在向ZooKeeper伺服器傳送這個超時時間後,服務端會根據自己的配置最終確定本次會話的超時時間。
  • isClosing,這是一個標誌位,表示本次會話是否已經關閉。在服務端的“會話超時檢查”執行緒在檢查到該會話已經失效的時候,會第一時間將這個標誌位置為true,只要這個標誌位為true,那麼服務端就不會在處理該會話的請求了。

會話初始化

在講session的初始化之前,首先先看看SessionTrackerImpl的實現

public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker 
{
    protected final ConcurrentHashMap<Long, SessionImpl> sessionsById = 
            new ConcurrentHashMap<Long, SessionImpl>();
    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
    private final AtomicLong nextSessionId = new AtomicLong();
    //建構函式
    public SessionTrackerImpl(SessionExpirer expirer,
            ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
            long serverId, ZooKeeperServerListener listener)
    {
        super("SessionTracker", listener);
        this.expirer = expirer;
        this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
        this.sessionsWithTimeout = sessionsWithTimeout;
        //初始化sessionId
        this.nextSessionId.set(initializeNextSession(serverId));
        for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
            addSession(e.getKey(), e.getValue());
        }
        EphemeralType.validateServerId(serverId);
    }
}

從上面的建構函式中可以看出,對session進行初始化,是由方法initializeNextSession來完成的。那麼,下面我們就來看看該方法的具體實現細節。

public static long initializeNextSession(long id) {
        long nextSid;
        nextSid = (Time.currentElapsedTime() << 24) >>> 8;
        nextSid =  nextSid | (id << 56); 
        if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) {
            ++nextSid;  // this is an unlikely edge case, but check it just in case
        }
        return nextSid;
    }

從上面的初始化方法中可以看出,該方法的入參是一個long的整數id,該id表示的是服務端的機器編號,這個引數是在部署ZooKeeper伺服器的時候,配置在myid檔案中的。

初始化的步驟如下:

  • 生成系統當前時間的時間戳,64位的long型整數
  • 將時間戳左移24位,在無符號右移8位
  • 經過上一步,該時間戳的高8位全部為0,低56位不為0
  • 接著,將機器編號左移56位,那麼機器編號的高8位不為0,低56位全為0
  • 最後,將上面得到的機器編號和時間戳進行或運算
為什麼要這樣做運算?

時間戳經過這樣的運算之後,高8位全部位0,和機器編號的高8位進行或運算之後,其結果完全取決於機器編號的高8位;同理,低56位由時間戳決定。因此,可以看出,sessionId其實是由機器編號+時間戳唯一決定的。可以保證在單機環境下的唯一性。

之所以右移8位的時候採用無符號,是因為防止前面左移24位的時候,可能出現負數的情況,因此為了消除產生的負數的影響,採用無符號的右移。

會話啟用

在ZooKeeper的設計過程中,只要客戶端有請求傳送到服務端,那麼服務端就會觸發一次會話啟用。以下情況會發生的會話啟用:

  • 客戶端向服務端傳送讀寫請求
  • 如果客戶端在sessionTimeout / 3的時間內都沒有與服務端有過互動,那麼客戶端會主動的向服務端傳送ping請求(心跳檢測),服務端收到請求之後,會觸發一次會話啟用。

與會話啟用相關的方法和類由:

  • org.apache.zookeeper.server.ExpiryQueue
  • SessionTrackerImpl中的touchSession方法

首先我們來看看org.apache.zookeeper.server.ExpiryQueue

   //記錄的是: Session -> 超時時間
    private final ConcurrentHashMap<E, Long> elemMap =
                        new ConcurrentHashMap<E, Long>();
  //記錄的是: 下一個超時時間 -> session的集合                 
    private final ConcurrentHashMap<Long, Set<E>> expiryMap =
                        new ConcurrentHashMap<Long, Set<E>>();
    private final AtomicLong nextExpirationTime = new AtomicLong();
    private final int expirationInterval;
 // 服務端計算超時時間的方法,expirationInterval預設等於tickTime,2000ms
   private long roundToNextInterval(long time) {
        return (time / expirationInterval + 1) * expirationInterval;
    }

從上面的的ExpiryQueue程式碼中可以看出:

  • 分桶策略中的buckets其實就是一個set集合,每個超時時間對應一個會話的集合
  • 維護另外一個map,管理單個session對應的超時時間
  • 相應的計算超時時間的方法

在SessionTrackerImpl中對touchSession方法描述:

synchronized public boolean touchSession(long sessionId, int timeout) {
        SessionImpl s = sessionsById.get(sessionId);

        if (s == null) {
            logTraceTouchInvalidSession(sessionId, timeout);
            return false;
        }

        if (s.isClosing()) {
            logTraceTouchClosingSession(sessionId, timeout);
            return false;
        }
    //會話啟用的主要執行邏輯
        updateSessionExpiry(s, timeout);
        return true;
    }

執行流程如下:

  • 根據sessionId獲取到對應的會話實體
  • 判斷該會話是否已經關閉,如果是的話,那麼就不需要啟用,直接返回
  • 從ExpiryQueue中的elemMap獲取本次會話以前的超時時間prevExpiryTime
  • 計算新的超時時間,計算邏輯為roundToNextInterval方法
  • 根據新的超時時間,將session實體放入到新的超時時間對應的expiryMap,並且設定新的elemMap
  • 將session實體從以前的expiryMap中刪除,並且更新對應的elemMap

會話超時檢查

會話超時檢查是由SessionTracker負責的。程式碼如下:

public void run() {
        try {
            while (running) {
            //sessionExpiryQueue是一個ExpiryQueue物件
                long waitTime = sessionExpiryQueue.getWaitTime();
                if (waitTime > 0) {
                    Thread.sleep(waitTime);
                    continue;
                }
                // sessionExpiryQueue.poll()是獲取expiryMap中超時的會話
                for (SessionImpl s : sessionExpiryQueue.poll()) {
                    setSessionClosing(s.sessionId);
                    expirer.expire(s);
                }
            }
        } catch (InterruptedException e) {
            handleException(this.getName(), e);
        }
        LOG.info("SessionTrackerImpl exited loop!");
    }

整個程式碼的核心邏輯在:

for (SessionImpl s : sessionExpiryQueue.poll()) {
     setSessionClosing(s.sessionId); //將會話的isClosing標誌位置為true
     expirer.expire(s); //會話清理過程, 在後面的文章中會詳細介紹該過程的細節
}

超時檢查的策略:逐個檢查會話bucket中剩下的,超過超時時間的會話。程式碼如下

public Set<E> poll() {
        long now = Time.currentElapsedTime();
        long expirationTime = nextExpirationTime.get();
        if (now < expirationTime) {
            return Collections.emptySet();
        }
        Set<E> set = null;
        long newExpirationTime = expirationTime + expirationInterval;
        if (nextExpirationTime.compareAndSet(
              expirationTime, newExpirationTime)) {
            set = expiryMap.remove(expirationTime);
        }
        if (set == null) {
            return Collections.emptySet();
        }
        return set;
    }