1. 程式人生 > >基於Curator操作ZooKeeper(一)-基本操作

基於Curator操作ZooKeeper(一)-基本操作

Java原生API操作ZooKeeper可參看:

Java原生API操作Zookeeper(一)

Java原生API操作Zookeeper(二)

相關內容:

基於Curator操作ZooKeeper(二)-Watcher操作

基於Curator操作ZooKeeper(二)-Watcher操作-補充TreeCache

Java原生操作API有以下幾個不足之處:

  • 超時重連,不支援自動,必須要手動實現;
  • Watcher註冊一次後只能使用一次;
  • 不支援遞迴建立節點;

一般企業操作ZooKeeper的客戶端都會使用Apache Curator。和ZkClient一樣,Curator解決了很ZooKeeper客戶端非常底層的細節開發工作,包括連線重連、反覆註冊Watcher和NodeExistsException異常等,目前已經成為了Apache的頂級專案,是全世界範圍內使用最廣泛的ZooKeeper客戶端之一。

除了封裝一些開發人員不需要特別關注的底層細節之外,Curator還在ZooKeeperAPI的基礎上進行了包裝,提供了一套易用性和可讀性更強的Fluent風格的客戶端API框架。除此之外,Curator中還提供了ZooKeeper各種應用場景(Recipe,如共享鎖服務、Master選舉機制和分散式計數器等)的抽象封裝。

引入相關的依賴

       <dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.11</version>
     
        </dependency>

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>4.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>4.0.0</version>
        </dependency>

建立客戶端

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));
		
		Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

執行結果:

namespace

在3.2.0及其之後版本的ZooKeeper中,添加了“Chroot”特性,該特性允許每個客戶端為自己設定一個名稱空間 ( Namespace )。如果一個ZooKeeper客戶端設定了Chroot,那麼該客戶端對伺服器的任何操作,都將會被限制在其自己的名稱空間下。

舉個例子來說,如果我們希望為應用X分配/apps/X下的所有子節點,那麼該應用可以將其所有ZooKeeper客戶端的Chroot設定為/apps/X的。一旦設定了Chroot之後,那麼對這個客戶端來說,所有的節點路徑都以/apps/X為根節點,它和ZooKeeper發起的所有請求中相關的節點路徑,都將是一個相對路徑—相對於/apps/X的路徑。例如通過ZooKeeper客戶端API建立點/test_chroot,那麼實際上在服務端被建立的節點是/apps/X/test_ chroot,通過設定Chroot,我們能夠將一個客戶端應用與ZooKeeper服務端
的一棵子樹相對應,在那些多個應用共用一個ZooKeeper叢集的場景下,這對於實現不同應用之間的相互隔離非常有幫助。

節點的增刪改查(同步)

新增節點

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

		//建立節點
		String nodePath = "/dongguabai/test";
        byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空

        Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

有個要注意的地方是:

由於在ZooKeeper中規定了所有非葉子節點必須為持久節點,呼叫上面這個API之後,只有path引數對應的資料節點是臨時節點,其父節點均為持久節點。

查詢某個節點下的資料:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

		//建立節點
		String nodePath = "/dongguabai/test";
        byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空

        //獲取節點
        byte[] bytes = cto.client.getData().forPath(nodePath);
        System.out.println("第一次獲取節點資料為:"+new String(bytes));

        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("第二次獲取節點資料為:"+new String(bytes1));
        System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));


        Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

輸出結果:

查詢子節點:

資料準備:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;

import java.util.List;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/a";
        //建立節點
       /* byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空*/

        //獲取節點
       /* byte[] bytes = cto.client.getData().forPath(nodePath);
        System.out.println("第一次獲取節點資料為:"+new String(bytes));

        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("第二次獲取節點資料為:"+new String(bytes1));
        System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));
*/
        //獲取子節點
        List<String> list = cto.client.getChildren().forPath(nodePath);
        System.out.println("開始列印子節點:");
        list.forEach(result-> System.out.println(result));
        System.out.println("列印結束!");

        //修改節點
      /*  Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
        System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));

        Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
        System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/

        //刪除節點
       /* Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("獲取節點資料為:"+new String(bytes1));
        cto.client.delete()
                .guaranteed()  //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功
                .deletingChildrenIfNeeded()  //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除
                .withVersion(stat.getVersion())
                .forPath(nodePath);*/

        Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

修改節點:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/test";
        //建立節點
       /* byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空*/

        //獲取節點
       /* byte[] bytes = cto.client.getData().forPath(nodePath);
        System.out.println("第一次獲取節點資料為:"+new String(bytes));

        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("第二次獲取節點資料為:"+new String(bytes1));
        System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));*/

        //修改節點
        Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
        System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));

        Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
        System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));



        Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

輸出結果:

刪除節點:

執行程式之前:

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;

public class CuratorOperator {

	public CuratorFramework client = null;
	public static final String zkServerPath = "192.168.220.135:2181,192.168.220.136:2181,192.168.220.137:2181";

	/**
	 * 例項化zk客戶端
	 */
	public CuratorOperator() {
		/**
		 * 同步建立zk示例,原生api是非同步的
		 * 
		 * curator連結zookeeper的重試策略:
		 *
		 * 1>ExponentialBackoffRetry【推薦】
			 * baseSleepTimeMs:初始sleep時間(ms)
			 * maxRetries:最大重試次數,超過時間就不連結了
			 * maxSleepMs:最大重試時間(ms)
		 *
		 * 給定一個初始sleep時間base5leep丁imeMs,在這個基礎上結合重試次數,通過以下公式計算出當前需要sleep的時間:
		   當前sleep時間=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
		   可以看出,隨著重試次數的增加,計算出的sleep時間會越來越大。如果該sleep時間在maxSleepMs的範圍之內,那麼就使用該sleep時間,否則使用maxSleepMs。另外,
		   maxRetries引數控制了最大重試次數,以避免無限制的重試。
		 */
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
		
		/**
		 * curator連結zookeeper的策略:
		 * 2>RetryNTimes【推薦】
			 * n:重試的次數
			 * sleepMsBetweenRetries:每次重試間隔的時間(ms)
		 */
//		RetryPolicy retryPolicy = new RetryNTimes(3, 5000);
		
		/**
		 * curator連結zookeeper的策略:
		 * 3>RetryOneTime
		 * sleepMsBetweenRetry:只重試一次,重試間隔的時間
		 */
//		RetryPolicy retryPolicy2 = new RetryOneTime(3000);
		
		/**
		 * 4>
		 * 永遠重試,不推薦使用
		 */
//		RetryPolicy retryPolicy3 = new RetryForever(retryIntervalMs)
		
		/**
		 * curator連結zookeeper的策略:
		 * 5>RetryUntilElapsed
		 * maxElapsedTimeMs:最大重試時間
		 * sleepMsBetweenRetries:每次重試間隔
		 * 重試時間超過maxElapsedTimeMs後,就不再重試
		 */
//		RetryPolicy retryPolicy4 = new RetryUntilElapsed(2000, 3000);

		//建立客戶端
		client = CuratorFrameworkFactory.builder()  //builder
				.connectString(zkServerPath)
				.sessionTimeoutMs(10000)  //session超時時間
				.retryPolicy(retryPolicy)  //重試策略
				//namespace:
				.namespace("testCRUD")
				.build();
		/**
		 * CuratorFrameworkFactory工廠在創建出一個客戶端CuratorFramework例項之後,實質上並沒有完成會話的建立,而是需要呼叫
		   CuratorFramework的sta rt)方法來完成會話的建立。
		 */
		client.start();
	}
	
	/**
	 * 
	 * @Description: 關閉zk客戶端連線
	 */
	public void closeZKClient() {
		if (client != null) {
			this.client.close();
		}
	}
	
	public static void main(String[] args) throws Exception {
		// 例項化
		CuratorOperator cto = new CuratorOperator();
		boolean isZkCuratorStarted = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted ? "連線中" : "已關閉"));

        String nodePath = "/dongguabai/test";
        //建立節點
       /* byte[] data = "abcd".getBytes();
        cto.client.create()
                .creatingParentContainersIfNeeded()   //遞迴建立節點
                .withMode(CreateMode.PERSISTENT)      //節點模式
                .withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //ACL
                .forPath(nodePath,data); //不指定內容,則內容為空*/

        //獲取節點
       /* byte[] bytes = cto.client.getData().forPath(nodePath);
        System.out.println("第一次獲取節點資料為:"+new String(bytes));

        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("第二次獲取節點資料為:"+new String(bytes1));
        System.out.println("獲取的Stat為:"+ JsonUtil.toJSON(stat));*/

        //修改節點
      /*  Stat stat = cto.client.setData().forPath(nodePath,"new1".getBytes());
        System.out.println("第一次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));

        Stat stat1 = cto.client.setData().withVersion(stat.getVersion()).forPath(nodePath, "new2".getBytes());
        System.out.println("第二次獲取節點資料為:"+new String(cto.client.getData().forPath(nodePath)));*/

        //刪除節點
        Stat stat = new Stat();
        byte[] bytes1 = cto.client.getData().storingStatIn(stat).forPath(nodePath);
        System.out.println("獲取節點資料為:"+new String(bytes1));
        cto.client.delete()
                .guaranteed()  //防止網路抖動,只要客戶端會話有效,那麼Curator 會在後臺持續進行刪除操作,直到節點刪除成功
                .deletingChildrenIfNeeded()  //如果有子節點會刪除,注意除非人為刪除namespace,否則namespace不會刪除
                .withVersion(stat.getVersion())
                .forPath(nodePath);

        Thread.sleep(3000);
		
		cto.closeZKClient();
		boolean isZkCuratorStarted2 = cto.client.isStarted();
		System.out.println("當前客戶的狀態:" + (isZkCuratorStarted2 ? "連線中" : "已關閉"));
	}
	
}

執行結果:

判斷節點是否存在:

其他相關資料:

https://blog.csdn.net/en_joker/article/details/78778917

https://www.cnblogs.com/LiZhiW/p/4926385.html?utm_source=tuicool&utm_medium=referral