第6章 使用ZooKeeper原生Java API進行客戶端開發
使用ZooKeeper原生Java API進行客戶端開發
- 6-1 建立客戶端與zk服務端的連線
- 6-2 zk會話重連機制
- 6-3 同步非同步建立zk節點
- 6-4 修改zk節點資料
- 6-5 同步非同步刪除zk節點
- 6-6 CountDownLatch的介紹
- 6-7 CountDownLatch程式碼示例
- 6-8 獲取zk節點資料
- 6-9 獲取zk子節點列表
- 6-10 判斷zk節點是否存在
- 6-11 acl - 預設匿名許可權
- 6-12 acl -自定義使用者許可權
- 6-13 acl - ip許可權
6-1 建立客戶端與zk服務端的連線
Java客戶端連線zk服務端進行連線:
public class ZKConnect implements Watcher {
final static Logger log = LoggerFactory.getLogger(ZKConnect.class);
public static final String zkServerPath = "192.168.1.110:2181";
//如果是叢集的話,連結地址直接用“,”隔開就行了;
// public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
public static final Integer timeout = 5000;
public static void main(String[] args) throws Exception {
/**
* 客戶端和zk服務端連結是一個非同步的過程
* 當連線成功後後,客戶端會收的一個watch通知
*
* 引數:
* connectString:連線伺服器的ip字串,
* 比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181"
* 可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表叢集
* 也可以在ip後加路徑
* sessionTimeout:超時時間,心跳收不到了,那就超時
* watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設定為null
* canBeReadOnly:可讀,當這個物理機節點斷開後,還是可以讀到資料的,只是不能寫,
* 此時資料被讀取到的可能是舊資料,此處建議設定為false,不推薦使用
* sessionId:會話的id
* sessionPasswd:會話密碼 當會話丟失後,可以依據 sessionId 和 sessionPasswd 重新獲取會話
*/
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect());
log.warn("客戶端開始連線zookeeper伺服器...");
log.warn("連線狀態:{}", zk.getState());
new Thread().sleep(2000);
log.warn("連線狀態:{}", zk.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接受到watch通知:{}", event);
}
}
執行結果:
2018-11-30 17:05:46,643 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:41)] - [WARN] 客戶端開始連線zookeeper伺服器…
2018-11-30 17:05:46,658 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:42)] - [WARN] 連線狀態:CONNECTING
2018-11-30 17:05:46,979 [main-EventThread] [com.imooc.zk.demo.ZKConnect.process(ZKConnect.java:51)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-11-30 17:05:48,658 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:46)] - [WARN] 連線狀態:CONNECTED
6-2 zk會話重連機制
/**
*
* @Title: ZKConnectDemo.java
* @Description: zookeeper 恢復之前的會話連線demo演示
*/
public class ZKConnectSessionWatcher implements Watcher {
final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
long sessionId = zk.getSessionId();
String ssid = "0x" + Long.toHexString(sessionId);
System.out.println(ssid);
byte[] sessionPassword = zk.getSessionPasswd();
log.warn("客戶端開始連線zookeeper伺服器...");
log.warn("連線狀態:{}", zk.getState());
new Thread().sleep(1000);
log.warn("連線狀態:{}", zk.getState());
new Thread().sleep(200);
// 開始會話重連
log.warn("開始會話重連...");
ZooKeeper zkSession = new ZooKeeper(zkServerPath,
timeout,
new ZKConnectSessionWatcher(),
sessionId,
sessionPassword);
log.warn("重新連線狀態zkSession:{}", zkSession.getState());
new Thread().sleep(1000);
log.warn("重新連線狀態zkSession:{}", zkSession.getState());
}
@Override
public void process(WatchedEvent event) {
log.warn("接受到watch通知:{}", event);
}
}
最主要的就是下面的這幾行程式碼:獲取sessionId,session的密碼:
long sessionId = zk.getSessionId();
byte[] sessionPassword = zk.getSessionPasswd();
這個就是開始進行重連:
ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);
這個就是執行結果:
2018-11-30 17:29:09,845 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:41)] - [WARN] 客戶端開始連線zookeeper伺服器…
2018-11-30 17:29:09,878 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:42)] - [WARN] 連線狀態:CONNECTING
2018-11-30 17:29:09,947 [main-EventThread] [com.imooc.zk.demo.ZKConnect.process(ZKConnect.java:51)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-11-30 17:29:11,879 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:46)] - [WARN] 連線狀態:CONNECTED
我們通過四字命令檢視當前的會話數為零:
6-3 同步非同步建立zk節點
/**
*
* @Title: ZKConnectDemo.java
* @Description: zookeeper 操作demo演示
*/
public class ZKNodeOperator implements Watcher {
private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
public ZKNodeOperator() {}
public ZKNodeOperator(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
/**
*
* @Title: ZKOperatorDemo.java
* @Description: 建立zk節點
*/
public void createZKNode(String path, byte[] data, List<ACL> acls) {
String result = "";
try {
/**
* 同步或者非同步建立節點,都不支援子節點的遞迴建立,非同步有一個callback函式
* 引數:
* path:建立的路徑
* data:儲存的資料的byte[]
* acl:控制權限策略
* Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
* CREATOR_ALL_ACL --> auth:user:password:cdrwa
* createMode:節點型別, 是一個列舉
* PERSISTENT:持久節點
* PERSISTENT_SEQUENTIAL:持久順序節點
* EPHEMERAL:臨時節點
* EPHEMERAL_SEQUENTIAL:臨時順序節點
*/
result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
// String ctx = "{'create':'success'}";
// zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
System.out.println("建立節點:\t" + result + "\t成功...");
new Thread().sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws Exception {
ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
// 建立zk節點
// zkServer.createZKNode("/testnode", "testnode".getBytes(), Ids.OPEN_ACL_UNSAFE);
/**
* 引數:
* path:節點路徑
* data:資料
* version:資料狀態
*/
// Stat status = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 2);
// System.out.println(status.getVersion());
/**
* 引數:
* path:節點路徑
* version:資料狀態
*/
zkServer.createZKNode("/test-delete-node", "123".getBytes(), Ids.OPEN_ACL_UNSAFE);
// zkServer.getZookeeper().delete("/test-delete-node", 2);
String ctx = "{'delete':'success'}";
zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx);
Thread.sleep(2000);
}
public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
@Override
public void process(WatchedEvent event) {
}
}
執行結果:
建立節點: /test-delete-node 成功…
刪除節點/test-delete-node
{‘delete’:‘success’}
6-4 修改zk節點資料
/**
- 引數:
- path:節點路徑
- data:資料
- version:資料狀態
*/
Stat status = zkServer.getZookeeper().setData("/testnode", “xyz”.getBytes(), 2);
System.out.println(status.getVersion());
6-5 同步非同步刪除zk節點
/**
- 引數:
- path:節點路徑
- version:資料狀態
*/
zkServer.createZKNode("/test-delete-node", “123”.getBytes(), Ids.OPEN_ACL_UNSAFE);
zkServer.getZookeeper().delete("/test-delete-node", 2);
我們一定要注意版本號的問題;
當我們需要非同步的呼叫的時候,那我們就要這樣來做:
我們在操作的時候,一定要注意版本號的問題;
6-6 CountDownLatch的介紹
這個時候,就是一個阻塞的狀態:
6-7 CountDownLatch程式碼示例
DangerCenter類:
/**
* 抽象類,用於演示 危險品化工車監控中心 統一檢查
*/
public abstract class DangerCenter implements Runnable {
private CountDownLatch countDown; // 計數器
private String station; // 排程站
private boolean ok; // 排程站針對當前自己的站點進行檢查,是否檢查ok的標誌
public DangerCenter(CountDownLatch countDown, String station) {
this.countDown = countDown;
this.station = station;
this.ok = false;
}
@Override
public void run() {
try {
check();
ok = true;
} catch (Exception e) {
e.printStackTrace();
ok = false;
} finally {
if (countDown != null) {
countDown.countDown();
}
}
}
/**
* 檢查危化品車
* 蒸罐
* 汽油
* 輪胎
* gps
* ...
*/
public abstract void check();
public CountDownLatch getCountDown() {
return countDown;
}
public void setCountDown(CountDownLatch countDown) {
this.countDown = countDown;
}
public String getStation() {
return station;
}
public void setStation(String station) {
this.station = station;
}
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
}
下面有三個子類:
public class StationBeijingIMooc extends DangerCenter {
public StationBeijingIMooc(CountDownLatch countDown) {
super(countDown, "北京慕課排程站");
}
@Override
public void check() {
System.out.println("正在檢查 [" + this.getStation() + "]...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
}
}
public class StationJiangsuSanling extends DangerCenter {
public StationJiangsuSanling(CountDownLatch countDown) {
super(countDown, "江蘇三林排程站");
}
@Override
public void check() {
System.out.println("正在檢查 [" + this.getStation() + "]...");
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
}
}
public class StationShandongChangchuan extends DangerCenter {
public StationShandongChangchuan(CountDownLatch countDown) {
super(countDown, "山東長川排程站");
}
@Override
public void check() {
System.out.println("正在檢查 [" + this.getStation() + "]...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
}
}
6-8 獲取zk節點資料
/**
*
* @Description: zookeeper 獲取節點資料的demo演示
*/
public class ZKGetNodeData implements Watcher {
private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
private static Stat stat = new Stat();
public ZKGetNodeData() {}
public ZKGetNodeData(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
private static CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
/**
* 引數:
* path:節點路徑
* watch:true或者false,註冊一個watch事件
* stat:狀態
*/
byte[] resByte = zkServer.getZookeeper().getData("/imooc", true, stat);
String result = new String(resByte);
System.out.println("當前值:" + result);
countDown.await();
}
@Override
public void process(WatchedEvent event) {
try {
if(event.getType() == EventType.NodeDataChanged){
ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
byte[] resByte = zkServer.getZookeeper().getData("/imooc", false, stat);
String result = new String(resByte);
System.out.println("更改後的值:" + result);
System.out.println("版本號變化dversion:" + stat.getVersion());
countDown.countDown();
} else if(event.getType() == EventType.NodeCreated) {
} else if(event.getType() == EventType.NodeChildrenChanged) {
} else if(event.getType() == EventType.NodeDeleted) {
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
}
6-9 獲取zk子節點列表
/**
* @Description: zookeeper 獲取子節點資料的demo演示
*/
public class ZKGetChildrenList implements Watcher {
private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
public ZKGetChildrenList() {}
public ZKGetChildrenList(String connectString) {
try {
zookeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
} catch (IOException e) {
e.printStackTrace();
if (zookeeper != null) {
try {
zookeeper.close();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
}
private static CountDownLatch countDown = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
/**
* 引數:
* path:父節點路徑
* watch:true或者false,註冊一個watch事件
*/
// List<String> strChildList = zkServer.getZookeeper().getChildren("/imooc", true);
// for (String s : strChildList) {
// System.out.println(s);
// }
// 非同步呼叫
String ctx = "{'callback':'ChildrenCallback'}";
// zkServer.getZookeeper().getChildren("/imooc", true, new ChildrenCallBack(), ctx);
zkServer.getZookeeper().getChildren("/imooc", true, new Children2CallBack(), ctx);
countDown.await();
}
@Override
public void process(WatchedEvent event) {
try {
if(event.getType()==EventType.NodeChildrenChanged){
System.out.println("NodeChildrenChanged");
ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false);
for (String s : strChildList) {
System.out.println(s);
}
countDown.countDown();
} else if(event.getType() == EventType.NodeCreated) {
System.out.println("NodeCreated");
} else if(event.getType() == EventType.NodeDataChanged) {
System.out.println("NodeDataChanged");
} else if(event.getType() == EventType.NodeDeleted) {
System.out.println("NodeDeleted");
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public ZooKeeper getZookeeper() {
return zookeeper;
}
public void setZookeeper(ZooKeeper zookeeper) {
this.zookeeper = zookeeper;
}
}
6-10 判斷zk節點是否存在
/**
* @Description: zookeeper 判斷階段是否存在demo
*/
public class ZKNodeExist implements Watcher {
private ZooKeeper zookeeper = null;
public static final String zkServerPath = "192.168.1.110:2181";
public static final Integer timeout = 5000;
public ZKNodeExist() {}
public ZKNodeExist(St