1. 程式人生 > >品味ZooKeeper之Watcher機制_2

品味ZooKeeper之Watcher機制_2

而已 s函數 boolean 搜索樹 stat 數據同步 ron 例如 chain

品味ZooKeeper之Watcher機制

本文思維導圖如下:
技術分享圖片

前言

Watcher機制是zookeeper最重要三大特性數據節點Znode+Watcher機制+ACL權限控制中的其中一個,它是zk很多應用場景的一個前提,比如集群管理、集群配置、發布/訂閱。

Watcher機制涉及到客戶端與服務器(註意,不止一個機器,一般是集群,這裏先認為一個整體分析)的兩者數據通信與消息通信,除此之外還涉及到客戶端的watchManager。

下面正式進入主題。

1.watcher原理框架

技術分享圖片

由圖看出,zk的watcher由客戶端,客戶端WatchManager,zk服務器組成。整個過程涉及了消息通信及數據存儲。

  • zk客戶端向zk服務器註冊watcher的同時,會將watcher對象存儲在客戶端的watchManager。
  • Zk服務器觸發watcher事件後,會向客戶端發送通知,客戶端線程從watchManager中回調watcher執行相應的功能。

註意的是server服務器端一般有多臺共同一起對外提供服務的,裏面涉及到zk專有的ZAB協議(分布式原子廣播協議)。在這先不分析,後面會有單獨一文來介紹,因為ZAB協議是zookeeper的實現精髓,有了zab協議才能使zk真正落地,真正的高可靠,數據同步,適於商用。

技術分享圖片

有木有看到小紅旗?加入小紅旗是一個watcher,當小紅旗被創建並註冊到node1節點(會有相應的API實現)後,就會監聽node1+node_a+node_b或node_a+node_b。這裏兩種情況是因為在創建watcher註冊時會有多種途徑。並且watcher不能監聽到孫節點。註意註意註意,watcher設置後,一旦觸發一次後就會失效,如果要想一直監聽,需要在process回調函數裏重新註冊相同的 watcher

2.通知狀態與事件

public class WatcherTest implements Watcher {
    @Override
    public void process(WatchedEvent event) {
        // TODO Auto-generated method stub
        WatcherTest  w = new WatcherTest();
        ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w);  
    }
    
    public static void main(String[] args){
        WatcherTest  w = new WatcherTest();
        ZooKeeper   zk = new ZooKeeper(wx.getZkpath(), 10000, w);
    }
}

上面例子是把異常處理,邏輯處理等都省掉。watcher的應用很簡單,主要有兩步:繼承 Watcher 接口,重寫 process 回調函數。

當然註冊方式有很多,有默認和重新覆蓋方式,可以一次觸發失效也可以一直有效觸發。這些都可以通過代碼實現。

2.1 KeeperStatus通知狀態

KeeperStatus完整的類名是org.apache.zookeeper.Watcher.Event.KeeperState

2.2 EventType事件類型

EventType完整的類名是org.apache.zookeeper.Watcher.Event.EventType

技術分享圖片

此圖是zookeeper常用的通知狀態與對應事件類型的對應關系。除了客戶端與服務器連接狀態下,有多種事件的變化,其他狀態的事件都是None。這也是符合邏輯的,因為沒有連接服務器肯定不能獲取獲取到當前的狀態,也就無法發送對應的事件類型了。

這裏重點說下幾個重要而且容易迷惑的事件:

  • NodeDataChanged事件
  • 無論節點數據發生變化還是數據版本發生變化都會觸發
  • 即使被更新數據與新數據一樣,數據版本dataVersion都會發生變化
  • NodeChildrenChanged
  • 新增節點或者刪除節點
  • AuthFailed
  • 重點是客戶端會話沒有權限而是授權失敗

客戶端只能收到服務器發過來的相關事件通知,並不能獲取到對應數據節點的原始數據及變更後的新數據。因此,如果業務需要知道變更前的數據或者變更後的新數據,需要業務保存變更前的數據(本機數據結構、文件等)和調用接口獲取新的數據

3.watcher註冊過程

3.1涉及接口

創建zk客戶端對象實例時註冊:

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

通過這種方式註冊的watcher將會作為整個zk會話期間的默認watcher,會一直被保存在客戶端ZK WatchManagerdefaultWatcher 中,如果這個被創建的節點在其它時候被創建watcher並註冊,則這個默認的watcher會被覆蓋。註意註意註意,watcher觸發一次就會失效,不管是創建節點時的 watcher 還是以後創建的 watcher

其他註冊watcher的API:

  • getChildren(String path, Watcher watcher)
  • getChildren(String path, boolean watch)
  • Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
  • getData(String path, boolean watch, Stat stat)
  • Boolean watch表示是否使用上下文默認的watcher,即創建zk實例時設置的watcher
  • getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
  • exists(String path, boolean watch)
  • Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
  • exists(String path, Watcher watcher)

舉栗子

技術分享圖片
技術分享圖片
技術分享圖片
技術分享圖片
技術分享圖片

這就是watcher的簡單例子,zk的實際應用集群管理,發布訂閱等復雜功能其實就在這個小例子上拓展的。

3.2客戶端註冊

技術分享圖片
這裏的客戶端註冊主要是把上面第一點的zookeeper原理框架的註冊步驟展開,簡單來說就是zk客戶端在註冊時會先向zk服務器請求註冊,服務器會返回請求響應,如果響應成功則zk服務端把watcher對象放到客戶端的WatchManager管理並返回響應給客戶端。

3.3服務器端註冊

技術分享圖片

FinalRequestProcessor
/**
 * This Request processor actually applies any transaction associated with a
 * request and services any queries. It is always at the end of a
 * RequestProcessor chain (hence the name), so it does not have a nextProcessor
 * member.
 *
 * This RequestProcessor counts on ZooKeeperServer to populate the
 * outstandingRequests member of ZooKeeperServer.
 */
public class FinalRequestProcessor implements RequestProcessor

由源碼註釋得知,FinalRequestProcessor類實際是任何事務請求和任何查詢的的最終處理類。也就是我們客戶端對節點的set/get/delete/create/exists等操作最終都會運行到這裏。

以exists函數為例子:

case OpCode.exists: {
    lastOp = "EXIS";
    // TODO we need to figure out the security requirement for this!
    ExistsRequest existsRequest = new ExistsRequest();
    ByteBufferInputStream.byteBuffer2Record(request.request,
    existsRequest);
    String path = existsRequest.getPath();
    if (path.indexOf(‘\0‘) != -1) {
    throw new KeeperException.BadArgumentsException();
    }
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest
    .getWatch() ? cnxn : null);
    rsp = new ExistsResponse(stat);
    break;
}

existsRequest.getWatch() ? cnxn : null此句是在調用exists API時,判斷是否註冊watcher,若是就返回 cnxncnxn是由此句代碼ServerCnxn cnxn = request.cnxn;創建的。

/**
 * Interface to a Server connection - represents a connection from a client
 * to the server.
 */
public abstract class ServerCnxn implements Stats, Watcher

通過ServerCnxn類的源碼註釋得知,ServerCnxn是維持服務器與客戶端的tcp連接與實現了 watcher。總的來說,ServerCnxn類創建的對象cnxn即包含了連接信息又包含watcher信息。

同時仔細看ServerCnxn類裏面的源碼,發現有以下這個函數,process函數正是watcher的回調函數啊。

public abstract class ServerCnxn implements Stats, Watcher {
    .
    .
    public abstract void process(WatchedEvent event);
    Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); 
    //getZKDatabase實際上是獲取是在zookeeper運行時的數據庫。請看下面
    .
    .
}
ZKDatabase
/**
 * This class maintains the in memory database of zookeeper
 * server states that includes the sessions, datatree and the
 * committed logs. It is booted up  after reading the logs
 * and snapshots from the disk.
 */
public class ZKDatabase

通過源碼註釋得知ZKDatabase是在zookeeper運行時的數據庫,在FinalRequestProcessor的case exists中會把existsRequest(exists請求傳遞給ZKDatabase)。

/**
 * the datatree for this zkdatabase
 * @return the datatree for this zkdatabase
 */
public DataTree getDataTree() {
return this.dataTree;
}

ZKDatabase裏面有這關鍵的一個函數是從zookeeper運行時展開的節點數型結構中搜索到合適的節點返回。

watchManager
  • Zk服務器端Watcher的管理者
  • 從兩個維度維護watcher
  • watchTable從數據節點的粒度來維護
  • watch2Paths從watcher的粒度來維護
  • 負責watcher事件的觸發

    class WatchManager {
        private final Map<String, Set<Watcher>> watchTable =
        new HashMap<String, Set<Watcher>>();
    
        private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>();
        Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);}
    }
watcher觸發
public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {
    Stat s = new Stat();
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    byte lastdata[] = null;
    synchronized (n) {
        lastdata = n.data;
        n.data = data;
        n.stat.setMtime(time);
        n.stat.setMzxid(zxid);
        n.stat.setVersion(version);
        n.copyStat(s);
    }
    // now update if the path is in a quota subtree.
    String lastPrefix = getMaxPrefixWithQuota(path);
    if(lastPrefix != null) {
      this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
      - (lastdata == null ? 0 : lastdata.length));
    }
    dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發事件
    return s;
}

客戶端回調watcher步驟:

  • 反序列化,將孒節流轉換成WatcherEvent對象。因為在Java中網絡傳輸肯定是使用了序列化的,主要是為了節省網絡IO和提高傳輸效率。
  • 處理chrootPath。獲取節點的根節點路徑,然後再搜索樹而已。
  • 還原watchedEvent:把WatcherEvent對象轉換成WatchedEvent。主要是把zk服務器那邊的WatchedEvent事件變為WatcherEvent,標為已watch觸發。
  • 回調Watcher:把WatchedEvent對象交給EventThread線程。EventThread線程主要是負責從客戶端的ZKWatchManager中取出Watcher,並放入waitingEvents隊列中,然後供客戶端獲取。

4.小結

到此,zookeeper的watcher機制基本告一段落了,watcher機制主要是客戶端、zk服務器和watchManager三者的協調合作完成的。這裏只分析了watcher的內容,例如涉及到的ZAB協議等沒有分析,準備把它放在下下文中,下文是zookeeper的ACL訪問控制權限。

品味ZooKeeper之Watcher機制_2