1. 程式人生 > >Zookeeper Java API (三) zk節點的通知(Watcher)和回撥(CallBack)

Zookeeper Java API (三) zk節點的通知(Watcher)和回撥(CallBack)

Zk中的通知(Watcher)

ZooKeeper中實現對接點的監控,需要實現Watcher介面類,實現其中的process方法

public class WatcherDemo implements Watcher{
    public void process(WatchedEvent event) {

    }
}

WatcherEvent對如下狀態進行監聽
1. ZooKeeper會話狀態:Disconnected,SyncConnected,AuthFailed,ConnectedReadOnly,SaslAuthenticated和Expired
2. 事件型別:NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged和None
3. 事件型別不是None時,返回一個znode路徑

如何設定一個監視點

對節點的建立設定監視點
/**
 * @author Eason
 * @create 2018-04-07 21:41
 **/
public class WatcherDemo implements Watcher {

    public void process(WatchedEvent event) {
        System.out.println("Enter the process method,the event is :"+event);
        Event.EventType type = event.getType();
        switch
(type) { case NodeDeleted: System.out.println("新建節點:" + event.getPath()); } } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { String connectionString = "192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183"
; ZooKeeper zooKeeper = new ZooKeeper(connectionString, 15 * 1000, new WatcherDemo(), false); zooKeeper.create("/myEphmeralPath1", "random".getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); TimeUnit.SECONDS.sleep(60); zooKeeper.close(); } }

執行完成後 控制檯輸出

Connected to the target VM, address: '127.0.0.1:50459', transport: 'socket'
Enter the process method,the event is :WatchedEvent state:SyncConnected type:None path:null
Disconnected from the target VM, address: '127.0.0.1:50459', transport: 'socket'

為什麼沒有監控到節點的建立呢?

首先,我們在建立ZK連線時指定了一個Watcher,但是這個Watcher指監控到了ZK連線成功的狀態SyncConnected.這裡同樣有一個問題,為什麼關閉客戶端時沒有監控到Disconnected事件呢.對於這個問題,個人理解是因為當前客戶端失去連線,Watcher失效,在debug模式下使用同一個Watcher可以監控到Disconnected事件

/**
 * @author Eason
 * @create 2018-04-07 21:41
 **/
public class WatcherDemo implements Watcher {

    public void process(WatchedEvent event) {
        System.out.println("Enter the process method,the event is :"+event);
        Event.EventType type = event.getType();
        switch (type) {
            case NodeDeleted:
                System.out.println("新建節點:" + event.getPath());
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        String connectionString = "192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183";
        ZooKeeper zooKeeper = new ZooKeeper(connectionString, 15 * 1000, new WatcherDemo(), false);
        ZooKeeper zooKeeper1 = new ZooKeeper(connectionString, 15 * 1000, new WatcherDemo(), false);
        zooKeeper.create("/myEphmeralPath1", "random".getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        zooKeeper.close();
        TimeUnit.SECONDS.sleep(60);
    }
}

控制檯輸出

Enter the process method,the event is :WatchedEvent state:SyncConnected type:None path:null
Enter the process method,the event is :WatchedEvent state:SyncConnected type:None path:null
Enter the process method,the event is :WatchedEvent state:Disconnected type:None path:null
Enter the process method,the event is :WatchedEvent state:Expired type:None path:null

注意的是需要debug模式下才可以看到輸出結果,具體原因不清楚.

現在回到第一個問題,如何對建立節點設定監控呢?
首先看一下create有關的API

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode)
public void create(final String path, byte data[], List<ACL> acl,CreateMode createMode,  StringCallback cb, Object ctx)

可以看到,create方法是沒有Watcher相關方法的,因此,基於判斷沒有就建立的思想,可以使用exist API來設定一個監視點

/**
 * @author Eason
 * @create 2018-04-07 21:41
 **/
public class WatcherDemo01 implements Watcher {

    public void process(WatchedEvent event) {
        System.out.println("Enter the process method,the event is :" + event);
        Event.EventType type = event.getType();
        switch (type) {
            case NodeCreated:
                System.out.println("新建節點:" + event.getPath());
            case NodeDeleted:
                System.out.println("刪除節點:" + event.getPath());
            case NodeDataChanged:
                System.out.println("修改節點:" + event.getPath());
            case NodeChildrenChanged:
                System.out.println("子節點:" + event);
        }
    }

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        WatcherDemo01 watcherDemo = new WatcherDemo01();
        String connectionString = "192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183";
        ZooKeeper zooKeeper = new ZooKeeper(connectionString, 15 * 1000, watcherDemo, false);
        String path = "/watcherdemo01";
        String childPath = "/watcherdemo01/child01";

        Stat stat = zooKeeper.exists(path, watcherDemo);
        if (stat == null) {
            zooKeeper.create(path, "random".getBytes(), OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        if (stat != null) {
            zooKeeper.delete(path, stat.getVersion());
        }
        TimeUnit.SECONDS.sleep(5);
        zooKeeper.close();
    }
}

如何設定監控點

下面是對如何設定監控點的總結
- 對於NodeCreated 通過exists API設定
- 對於NodeDeleted 通過exists 和 getData()設定
- 對於NodeDataChanged 通過exists或getData設定
- 對於NodeChildrenChanged 通過getChildren設定

ZooKeeper回撥

首先要區分通知和回撥的區別,通知是ZooKeeper中註冊了監視點的客戶端收到的事件報告訊息,而回調是基於非同步思想的,通過回撥函式來確定操作的完成情況

建立節點的回撥通知
/**
 * @author Eason
 * @create 2018-04-07 22:47
 **/
public class CallBackDemo implements Watcher {
    public static void main(String[] args) throws InterruptedException, IOException {
        String connectionString = "192.168.1.6:2181,192.168.1.6:2182,192.168.1.6:2183";
        ZooKeeper zooKeeper = new ZooKeeper(connectionString, 15 * 1000, new CallBackDemo(), false);
        String path = "/callBackDemo";
        String params = "123";
        zooKeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new AsyncCallback.StringCallback() {
            public void processResult(int rc, String path, Object ctx, String name) {
                System.out.println("Object ctx 引數對應於create 方法最後一個引數");
                System.out.println(ctx.toString());
                KeeperException.Code code = KeeperException.Code.get(rc);
                switch (code) {
                    case OK:
                    case NOAUTH:
                    case NONODE:
                    case APIERROR:
                    case NOTEMPTY:
                    case AUTHFAILED:
                    case BADVERSION:
                    case INVALIDACL:
                    case NODEEXISTS:
                    case NOTREADONLY:
                    case SYSTEMERROR:
                    case BADARGUMENTS:
                    case SESSIONMOVED:
                    case UNIMPLEMENTED:
                    case CONNECTIONLOSS:
                    case SESSIONEXPIRED:
                    case INVALIDCALLBACK:
                    case MARSHALLINGERROR:
                    case OPERATIONTIMEOUT:
                    case DATAINCONSISTENCY:
                    case RUNTIMEINCONSISTENCY:
                    case NOCHILDRENFOREPHEMERALS:
                }
            }
        }, params);
        TimeUnit.SECONDS.sleep(10);
    }

    public void process(WatchedEvent event) {

    }
}

注意Object ctx引數,具體狀態嗎不再贅述

相關推薦

Zookeeper Java API () zk節點通知(Watcher)(CallBack)

Zk中的通知(Watcher) ZooKeeper中實現對接點的監控,需要實現Watcher介面類,實現其中的process方法 public class WatcherDemo implements Watcher{ public void p

Zookeeper Java API (二) zk節點內容的操作節點的刪除

獲取節點內容 獲取節點內容的方法定義 public byte[] getData(String path, boolean watch, Stat stat) 其中: - path:要獲取內容的節點路徑 - watch:是否設定監控,也就是是否需要

ZooKeeper Java API

state exce 處理 art throws 枚舉類 event exception 類型 org.apache.zookeeper.ZookeeperZookeeper 是在 Java 中客戶端主類,負責建立與 zookeeper 集群的會話, 並提供方法進行操作。o

zookeeper java api(2)

    這裡介紹其他的API對zookeeper的操作。 同步方式獲取子節點資料 public static void getChildrenSync() throws KeeperException, InterruptedException {

zookeeper java api(1)

1 Zookeeper安裝以及啟動     這裡我已經進行了安裝,並且啟動了Zookeeper。埠是2182 2 Zookeeper config tickTime=2000 initLimit=10 syncLimit=5 dataDir=D://zook

Zookeeper --- Java API簡單例項

1、簡單使用 import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeep

使用ZooKeeper Java API程式設計

https://www.cnblogs.com/IcanFixIt/p/7882107.htmlhttps://blog.csdn.net/qiushisoftware/article/details/79043379https://blog.csdn.net/wo54107

Zookeeper Java API呼叫

public class ZooKeeperTest implements Watcher{ //public final static String zkServerPath = "12.45.67.80:2181"; //叢集,使用逗號分隔 public final

Java API操作ZK node

建立會話 建立簡單連線 /** * 測試建立Zk會話 * Created by liuhuichao on 2017/7/25. */ public class ZooKeeper_C

Java API訪問ZK的許可權控制

無許可權訪問結點 /** * 對於ZK的授權訪問 * Created by liuhuichao on 2017/7/27. */ public class AutoSample {

【Kafka】使用非自帶zookeeper Java API 例子

這裡沒有使用kafka自帶的zk。1.啟動zk:zk下載解壓至任意資料夾。新建kafka-zk-csdn資料夾。這裡啟動包含三個節點的zk偽叢集,進入kafka-zk-csdn資料夾,新建zk1,zk2和zk3資料夾。kafka-zk-csdn:--zk1    --data

JAVA基礎--JAVA API常見對象(包裝類正則)12

hexstring 需要 java基礎 test 指定 ffffff 9.png .com ring 一、基本類型包裝類   1.基本類型包裝類介紹       8種基本類型:    byte short int long float

Effective Java版——15. 使類成員的可訪問性最小化

control 常見 以及 操作 數據表示 potential info 四大 access Tips 《Effective Java, Third Edition》一書英文版已經出版,這本書的第二版想必很多人都讀過,號稱Java四大名著之一,不過第二版2009年出版,到

JAVA實驗 抽象類的繼承介面的實現 (多型)

題目要求: 1.編寫一個ComputerAverage抽象類,類中有一個抽象方法求平均分average,可以有引數。定義Gymnastics類和School類,它們都是ComputerAverage的子類,Gymnastics類中計算選手的平均成績的方法是去掉一個最低分

Java執行緒之非同步(Callback)

●介紹      有時候執行一個任務需要很長時間,單執行緒下執行緒會處於阻塞狀態。這個時候我們會考慮一種非阻塞的處理模式。非阻塞任務在任何程式語言裡都必不可少,Java也不例外。多執行緒就是一個很好的解決辦法。     

python中的Redis鍵空間通知(過期

介紹 Redis是一個記憶體資料結構儲存庫,用於快取,高速資料攝取,處理訊息佇列,分散式鎖定等等。 使用Redis優於其他記憶體儲存的優點是Redis提供永續性和資料結構,如列表,集合,有序集和雜湊。 在本文中,我想簡要介紹一下Redis鍵空間通知。我將解釋鍵空間通知是什麼,並演示如何配置Redis以接

Java內部類(2): 用法

為什麼需要內部類 一般來說,內部類繼承自某個類或實現了某個介面,內部類程式碼操作建立它的外部類的物件,可以說,內部類提供了某種進入外部類的視窗,而且內部類自己本身實現了有效的程式碼隱藏。 內部類必須要回答的一個問題是:如果只是需要一個介面的引用,為什麼外部類

.net mvc ()微信開放平臺授權

微信開放平臺授權回撥,儲存授權資訊: /// <summary> /// 微信開放平臺授權回撥 /// </summary> public ActionResult AuthCallBack()

Java-13】抽象類、介面、

抽象類、介面目的是設計與實現分離,是繼承與重寫的應用,父類只作為一個模板,然後生成每一個子類,每一個子類中要對父類中所有方法都重寫。抽象類、介面是多型的深化,都是為了統一處理子類! 回撥其實就是多型、抽象類、介面的應用,所謂回撥是指一段程式中有一個未定執行語句,這個語句可以是任意一個子類中任意函

APP微信支付(java後臺_統一下單

微信支付Java後臺1.微信配置資訊 global.properties2.方法wxpay用於生成預支付訂單資訊        方法notifyWeiXinPay用於微信支付成功後的回撥, 注意: 在手機端使用微信支付成功後,微信伺服器會根據提供的回撥地址進行回撥, para