1. 程式人生 > >Zookeeper源碼閱讀(十二) Seesion(1)

Zookeeper源碼閱讀(十二) Seesion(1)

服務器和客戶端 sync client 消息 sim 建立連接 uri 源碼閱讀 nds

前言

前面三篇主要從client的角度說了下client和server建立連接的過程,這一篇和後面一篇開始看下Zookeeper中非常重要的一個概念:Session,session是zookeeper client和server建立和維護連接的單位(我這個描述感覺有點奇怪 ?? )。

Session狀態

Zookeeper的所有操作基本都是基於session的,如之前提到的wathcer的機制,客戶端請求的順序執行和臨時節點的生命周期。

從我們使用API的角度,session的連接和保持就是客戶端通過實例化Zookeeper對象來與Zookeeper server端創建並保持連接TCP連接的過程。在客戶端與服務器端成功創建了一個連接後,一個會話就被創建了。而在一個會話的生命周期中,session的狀態可能在幾種不同的狀態中切換,而這些狀態可以分為connecting,connected,reconnecting,reconnected,close等。

狀態切換

  1. 客戶端嘗試去連接服務器端(public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException),這時客戶端回去嘗試連接服務器,而session的狀態就變成了connecting。這個過程之前在講sengthread的部分有詳細講過,具體是client會從server(HostProvider)的列表裏逐個嘗試連接;
  2. 由於網絡或程序等原因導致服務器和客戶端斷開連接,此時客戶端會嘗試去重新連接server,則session重新進入connecting狀態;
  3. 重連成功後,session變為connected狀態;
  4. 會話超時,權限檢查失敗或客戶端主動發起斷開連接請求後session變為close狀態。

p.s. 這裏要提一下第一步,在3.2.0版本中增加了chroot後綴(配置時加在後面,不是說這個chroot的功能是後綴,而且恰恰相反,功能是前綴)的設置,在zk的配置中類似配置 "127.0.0.1:4545/app/a" 或者 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"這樣的設置,那麽在zookeeper上所有的節點前都回家再/app/a的前綴。所以連接後在server上的根節點是/app/a。

作用:This feature is particularly useful in multi-tenant environments where each user of a particular ZooKeeper service could be rooted differently. This makes re-use much simpler as each user can code his/her application as if it were rooted at "/", while actual location (say /app/a) could be determined at deployment time.

Zookeeper官方文檔是這樣描述的,就是說在一臺機器或vm上有多個tenant安裝zookeeper,通過在配置中增加這樣的配置,這樣根據節點本身就知道它的具體位置。

Zookeeper官方用下圖來表示狀態的變化:

技術分享圖片

結合在sendthread介紹中說的,在startconnect方法中連接中會把狀態設置為States.CONNECTING,連接成功後,在Onconnected方法裏會把狀態設置為CONNECTED。

在zk的create/exists/getchildren…等等接口內部最後回去submitRequest並把生成的packet放入queue中,在queuePacket方法的cnLossPacket方法中會根據狀態去處理session超時,驗證失敗和連接丟失的問題。

private void conLossPacket(Packet p) {
    if (p.replyHeader == null) {
        return;
    }
    switch (state) {
    case AUTH_FAILED://驗證失敗
        p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
        break;
    case CLOSED://session超時導致close
        p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
        break;
    default://其他原因導致連接丟失
        p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
    }
    finishPacket(p);
}

在創建Session時,需要設置Session Timeout這個重要參數。這是Zookeeper服務允許一個Session在定義它失效之前的時間。如果服務在時間t內不能看到與一個Session關聯的消息,它將定義這個Session失效。如果客戶端在1/3 t時間內沒有聽到任何從服務器過來的消息,它將發送一個心跳消息給服務器。在(2/3)t時間, Zookeeper客戶端開始尋找另一個Zookeeper服務器,並且它有另外的(1/3)t的時間尋找。

會話創建

實體

public interface SessionTracker {
public static interface Session {
    long getSessionId();, 
    int getTimeout();
    boolean isClosing();
}
public static interface SessionExpirer {
    void expire(Session session);

    long getServerId();
}

可以看到,在SessionTracker接口中有兩個內部接口Session和SessionExpirer,可以看到分別和session與session過期有關系。

public static class SessionImpl implements Session {
    SessionImpl(long sessionId, int timeout, long expireTime) {
        this.sessionId = sessionId;
        this.timeout = timeout;
        this.tickTime = expireTime;
        isClosing = false;
    }

    final long sessionId;
    final int timeout;
    long tickTime;
    boolean isClosing;

    Object owner;

    public long getSessionId() { return sessionId; }
    public int getTimeout() { return timeout; }
    public boolean isClosing() { return isClosing; }
}

在SessionTrackerImpl類中有Session接口的實現類,此類也代表了一個真正的session對象。可以看到SessionImpl類中有幾個變量:

sessionId:會話ID,用來標識一個唯一會話。每次客戶端和server連接創建新會話時,zk會為其分別一個全局唯一的ID;

timeout:在創建zookeeper對象時傳入的參數,客戶端向server發送了這個參數後,服務器會根據timeout時間來判斷session的狀態;

ticktime:下次會話超時的時間點,大約為當前時間+timeout,具體之後詳細解釋;

isclosing:表明一個會話是否已經被關閉,如果一個會話已經被標記為closing,server便不會處理來自此session的請求。

SessionId生成策略

在ZookeeperServer的processConnectRequest方法中有對客戶端建立連接請求的處理:

if (sessionId != 0) {//sessionId已經存在
    long clientSessionId = connReq.getSessionId();
    LOG.info("Client attempting to renew session 0x"
            + Long.toHexString(clientSessionId)
            + " at " + cnxn.getRemoteSocketAddress());
    serverCnxnFactory.closeSession(sessionId);
    cnxn.setSessionId(sessionId);
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);//重新打開session
} else {
    LOG.info("Client attempting to establish new session at "
            + cnxn.getRemoteSocketAddress());
    createSession(cnxn, passwd, sessionTimeout);//新建session
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    long sessionId = sessionTracker.createSession(timeout);
synchronized public long createSession(int sessionTimeout) {
    addSession(nextSessionId, sessionTimeout);
    return nextSessionId++;//每次取過之後nextSessionId+1
}
synchronized public void addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);//新建sessionImpl對象

可以看到在每次新建session是建立在已經保存的nextSessionId的基礎上的。然後看一下nextSessionId的初始化:

public static long initializeNextSession(long id) {
    long nextSid = 0;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid =  nextSid | (id <<56);
    return nextSid;
}

initializeNextSession方法在zookeeperserver啟動時的startup方法中,startup方法會初始化SessionTrackerImpl變量,此時nextSessionId會被初始化。

這裏用到了Time.currentElapsedTime()方法去獲得當前的時間,是一個64位的值。但是在之前的版本中用的是System.currentTimeMillis() 方法。為什麽要用新的方法替代原來的值,事實上在正常情況下都不會有問題,但是如果有人修改了系統的時間,那麽原來的方法就可能有問題。

至於nextSid生成的算法:系統時間先左移24位然後無符號右移8位然後和myid文件中的唯一id值左移56位生成的值做或操作,這樣可以生產一個64位的唯一ID,然後後面的session基於這個值遞增獲得。這也是為什麽在myid文件中配置唯一id時必須要小於256的原因。

SessionTracker

sessiontracker的作用就是server用來管理會話的,它負責了session的創建,管理和刪除,整個session的生命周期都在sessiontracker的管理之下。每個session在sessiontracker內都分成三份保存。

public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);

    HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>(); 

    HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();//

    ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
    long nextSessionId = 0;//下一次session的id
    long nextExpirationTime;//最近的超時時間

    int expirationInterval;//超時檢查間隔
static class SessionSet {
    HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}

sessionsById是根據session的id來管理session實體的屬性;而sessionSets則是根據下次超時時間來歸檔回話,便於會話管理和超時審查;sessionsWithTimeout是線程安全的,它也是按照id來保存session的超時時間,sessionsWithTimeout和zk的內存數據庫相通,會定期同步到快照中。

思考

這一篇主要說了些宏觀的概念和session id的生成機制,比較泛,但是是下一篇的基礎。

參考

https://zookeeper.apache.org/doc/r3.3.6/zookeeperProgrammers.html

https://blog.csdn.net/jeff_fangji/article/details/43916359

https://www.jianshu.com/p/594129a44814

http://www.cnblogs.com/leesf456/p/6103870.html

https://xt00002003.iteye.com/blog/2302392

Zookeeper源碼閱讀(十二) Seesion(1)