zookeeper客戶端操作及JAVA程式碼操作CURD
總結:zookeeper相當於一個遠端平臺,我們可以將資料放在上面,他是一個樹形結構,每一個節點稱為一個Znode
資料模型:每個節點或稱為目錄,都可以存放資料並且存放節點,有雙重功能
重點watch機制:
ZooKeeper 中,引入了 Watcher 機制來實現這種分散式的通知功能 。
總的來說可以概括 Watcher 為以下三個過程:客戶端向服務端註冊 Watcher、
服務端事件發生觸發 Watcher、客戶端回撥 Watcher 得到觸發事件情況
首先在需要搭建一個zookeeper叢集,非常簡單,只需要修改tar包裡面的配置檔案的資料存放位置 ,另外在存放資料位置的資料夾下面新建一個myid使用者存放編號,在配置檔案中配置3個server.1=XXX:2181 即可
好了,步入正題,開始說明zookeer的相關操作
第一部分:linux搭建的zookeeper叢集中利用客戶端操作CURD
zookeeper安裝目錄中有一個客戶端/export/servers/zookeeper-3.4.5-cdh5.14.0/bin zkCli.sh
啟動zkCli.sh 直接輸入zkCli.sh即可 ,注意客戶端要執行,zookeeper伺服器必須執行才可以啊,否則報錯
輸入help可得到客戶端所有的命令
建立節點
create [-s] [-e] path data acl s表示臨時 e表示序號遞增
查詢節點
ls path [watch] 後面可以跟watch監聽某個節點
修改節點
set path data [version]
刪除節點
delete path [version]
遞迴刪除
rmr path
獲取某個節點的詳細資訊
get path [watch]
第二部分: java程式碼控制zookeeper的增刪改查及watch機制監控
pom.xml檔案
引入3個依賴,curator-framework curator-recipes google-collections
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoopdemo</artifactId>
<groupId>cn.itcast.hadoop</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>demo01_zk</artifactId>
<repositories>
<repository>
<id>cloudera</id>
<url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.12.0</version>
</dependency>
<dependency>
<groupId>com.google.collections</groupId>
<artifactId>google-collections</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>5.1</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
</project>
新增操作
/**
* @Description:建立新節點
* @:Param
* @:Return
*/
@Test
public void createNode() throws Exception {
//1 建立客戶端
String conneciton="node01:2181,node02:2181,node03:2181";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 1);
CuratorFramework client = CuratorFrameworkFactory.newClient(conneciton,3000,3000, retryPolicy);
//2 開啟客戶端
client.start();
//3 建立節點(createMode可以建立多種形式的)
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/demo01/world");
client.close();
}
修改操作
/**
* @Description:修改節點資料
* @:Param
* @:Return
*/
@Test
public void updateNode() throws Exception {
//建立物件
String connetion = "node01:2181,node02:2181,node03:2181";
RetryPolicy retryPolicy=new ExponentialBackoffRetry(5000,1);
CuratorFramework client = CuratorFrameworkFactory.newClient(connetion, retryPolicy);
client.start();
//修改
client.setData().forPath("/demo01/world","您好啊".getBytes());
client.close();
}
查詢操作
/**
* @Description:查詢
* @:Param
* @:Return
*/
@Test
public void selectNode() throws Exception {
//建立物件
String connection ="node01:2181,node02:2181,node03:2181";
RetryPolicy retryPolicy= new ExponentialBackoffRetry(5000,1);
CuratorFramework client = CuratorFrameworkFactory.newClient(connection, retryPolicy);
client.start();
//查詢
byte[] bytes = client.getData().forPath("/demo01/world");
System.out.println(new java.lang.String(bytes));
client.close();
}
刪除操作
/**
* @Description:刪除
* @:Param
* @:Return
*/
@Test
public void deleteNode() throws Exception {
//建立
String connction="node01:2181,node02:2181,node03:2181";
RetryPolicy retryPolicy=new RetryForever(5000);
CuratorFramework client = CuratorFrameworkFactory.newClient(connction, retryPolicy);
client.start();
//刪除
client.delete().forPath("/hello5");
client.close();
}
watch機制(重點,跟上面的套路是一樣的,獲取客戶端client再操作)
/**
* @Description: watch機制
* @:Param
* @:Return
*/
@Test
public void watchNode() throws Exception {
String connction="node01:2181,node02:2181,node03:2181";
RetryPolicy retryPolicy= new RetryForever(5000);
CuratorFramework client = CuratorFrameworkFactory.newClient(connction, retryPolicy);
client.start();
//watch
//設定節點cache
TreeCache treeCache = new TreeCache(client,"/");
//設定監聽器
treeCache.getListenable().addListener(new TreeCacheListener() {
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
ChildData data = event.getData();
if(data!=null){
//switch 判斷各種可能性
switch (event.getType()){
case NODE_ADDED:
System.out.println("節點新增:路徑:"+data.getPath()+"資料:"+new String(data.getData()));
break;
case NODE_REMOVED:
System.out.println("節點刪除,路徑:"+data.getPath()+"資料:"+new String(data.getData()));
break;
case NODE_UPDATED:
System.out.println("節點修改,路徑:"+data.getPath()+"資料:"+new String(data.getData()));
break;
}
}
}
});
//開始監聽
treeCache.start();
//讓程式不結束,一直監聽
Thread.sleep(900000000);
}
這樣如果zookeeper發生增刪改操作,那麼watch機制就會及時知道,並且打印出結果