1. 程式人生 > >第6章 使用ZooKeeper原生Java API進行客戶端開發

第6章 使用ZooKeeper原生Java API進行客戶端開發

使用ZooKeeper原生Java API進行客戶端開發

6-1 建立客戶端與zk服務端的連線

在這裡插入圖片描述


Java客戶端連線zk服務端進行連線:

public class ZKConnect implements Watcher {
		
	final static Logger log = LoggerFactory.getLogger(ZKConnect.class);

	public static final String zkServerPath = "192.168.1.110:2181";
	//如果是叢集的話,連結地址直接用“,”隔開就行了;
//	public static final String zkServerPath = "192.168.1.111:2181,192.168.1.111:2182,192.168.1.111:2183";
public static final Integer timeout = 5000; public static void main(String[] args) throws Exception { /** * 客戶端和zk服務端連結是一個非同步的過程 * 當連線成功後後,客戶端會收的一個watch通知 * * 引數: * connectString:連線伺服器的ip字串, * 比如: "192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181" * 可以是一個ip,也可以是多個ip,一個ip代表單機,多個ip代表叢集 * 也可以在ip後加路徑 * sessionTimeout:超時時間,心跳收不到了,那就超時 * watcher:通知事件,如果有對應的事件觸發,則會收到一個通知;如果不需要,那就設定為null * canBeReadOnly:可讀,當這個物理機節點斷開後,還是可以讀到資料的,只是不能寫, * 此時資料被讀取到的可能是舊資料,此處建議設定為false,不推薦使用 * sessionId:會話的id * sessionPasswd:會話密碼 當會話丟失後,可以依據 sessionId 和 sessionPasswd 重新獲取會話 */
ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnect()); log.warn("客戶端開始連線zookeeper伺服器..."); log.warn("連線狀態:{}", zk.getState()); new Thread().sleep(2000); log.warn("連線狀態:{}", zk.getState()); } @Override public void process(WatchedEvent event) { log.warn("接受到watch通知:{}", event); } }

執行結果:

2018-11-30 17:05:46,643 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:41)] - [WARN] 客戶端開始連線zookeeper伺服器…
2018-11-30 17:05:46,658 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:42)] - [WARN] 連線狀態:CONNECTING
2018-11-30 17:05:46,979 [main-EventThread] [com.imooc.zk.demo.ZKConnect.process(ZKConnect.java:51)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-11-30 17:05:48,658 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:46)] - [WARN] 連線狀態:CONNECTED


6-2 zk會話重連機制

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 恢復之前的會話連線demo演示
 */
public class ZKConnectSessionWatcher implements Watcher {
	
	final static Logger log = LoggerFactory.getLogger(ZKConnectSessionWatcher.class);

	public static final String zkServerPath = "192.168.1.110:2181";
	public static final Integer timeout = 5000;
	
	public static void main(String[] args) throws Exception {
		
		ZooKeeper zk = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher());
		
		long sessionId = zk.getSessionId();
		String ssid = "0x" + Long.toHexString(sessionId);
		System.out.println(ssid);
		byte[] sessionPassword = zk.getSessionPasswd();
		
		log.warn("客戶端開始連線zookeeper伺服器...");
		log.warn("連線狀態:{}", zk.getState());
		new Thread().sleep(1000);
		log.warn("連線狀態:{}", zk.getState());
		
		new Thread().sleep(200);
		
		// 開始會話重連
		log.warn("開始會話重連...");
		
		ZooKeeper zkSession = new ZooKeeper(zkServerPath, 
											timeout, 
											new ZKConnectSessionWatcher(), 
											sessionId, 
											sessionPassword);
		log.warn("重新連線狀態zkSession:{}", zkSession.getState());
		new Thread().sleep(1000);
		log.warn("重新連線狀態zkSession:{}", zkSession.getState());
	}
	
	@Override
	public void process(WatchedEvent event) {
		log.warn("接受到watch通知:{}", event);
	}
}

最主要的就是下面的這幾行程式碼:獲取sessionId,session的密碼:

long sessionId = zk.getSessionId();
byte[] sessionPassword = zk.getSessionPasswd();


這個就是開始進行重連:

ZooKeeper zkSession = new ZooKeeper(zkServerPath, timeout, new ZKConnectSessionWatcher(), sessionId, sessionPassword);


這個就是執行結果:

2018-11-30 17:29:09,845 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:41)] - [WARN] 客戶端開始連線zookeeper伺服器…
2018-11-30 17:29:09,878 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:42)] - [WARN] 連線狀態:CONNECTING
2018-11-30 17:29:09,947 [main-EventThread] [com.imooc.zk.demo.ZKConnect.process(ZKConnect.java:51)] - [WARN] 接受到watch通知:WatchedEvent state:SyncConnected type:None path:null
2018-11-30 17:29:11,879 [main] [com.imooc.zk.demo.ZKConnect.main(ZKConnect.java:46)] - [WARN] 連線狀態:CONNECTED


我們通過四字命令檢視當前的會話數為零:
在這裡插入圖片描述

6-3 同步非同步建立zk節點

/**
 * 
 * @Title: ZKConnectDemo.java
 * @Description: zookeeper 操作demo演示
 */
public class ZKNodeOperator implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.1.110:2181";
	public static final Integer timeout = 5000;
	
	public ZKNodeOperator() {}
	
	public ZKNodeOperator(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKNodeOperator());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 
	 * @Title: ZKOperatorDemo.java
	 * @Description: 建立zk節點
	 */
	public void createZKNode(String path, byte[] data, List<ACL> acls) {
		
		String result = "";
		try {
			/**
			 * 同步或者非同步建立節點,都不支援子節點的遞迴建立,非同步有一個callback函式
			 * 引數:
			 * path:建立的路徑
			 * data:儲存的資料的byte[]
			 * acl:控制權限策略
			 * 			Ids.OPEN_ACL_UNSAFE --> world:anyone:cdrwa
			 * 			CREATOR_ALL_ACL --> auth:user:password:cdrwa
			 * createMode:節點型別, 是一個列舉
			 * 			PERSISTENT:持久節點
			 * 			PERSISTENT_SEQUENTIAL:持久順序節點
			 * 			EPHEMERAL:臨時節點
			 * 			EPHEMERAL_SEQUENTIAL:臨時順序節點
			 */
			result = zookeeper.create(path, data, acls, CreateMode.PERSISTENT);
			
//			String ctx = "{'create':'success'}";
//			zookeeper.create(path, data, acls, CreateMode.PERSISTENT, new CreateCallBack(), ctx);
			
			System.out.println("建立節點:\t" + result + "\t成功...");
			new Thread().sleep(2000);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public static void main(String[] args) throws Exception {
		ZKNodeOperator zkServer = new ZKNodeOperator(zkServerPath);
		
		// 建立zk節點
//		zkServer.createZKNode("/testnode", "testnode".getBytes(), Ids.OPEN_ACL_UNSAFE);
		
		/**
		 * 引數:
		 * path:節點路徑
		 * data:資料
		 * version:資料狀態
		 */
//		Stat status  = zkServer.getZookeeper().setData("/testnode", "xyz".getBytes(), 2);
//		System.out.println(status.getVersion());
		
		/**
		 * 引數:
		 * path:節點路徑
		 * version:資料狀態
		 */
		zkServer.createZKNode("/test-delete-node", "123".getBytes(), Ids.OPEN_ACL_UNSAFE);
//		zkServer.getZookeeper().delete("/test-delete-node", 2);
		
		String ctx = "{'delete':'success'}";
		zkServer.getZookeeper().delete("/test-delete-node", 0, new DeleteCallBack(), ctx);
		Thread.sleep(2000);
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}

	@Override
	public void process(WatchedEvent event) {
	}
}

執行結果:

建立節點: /test-delete-node 成功…
刪除節點/test-delete-node
{‘delete’:‘success’}


6-4 修改zk節點資料

/**

  • 引數:
  • path:節點路徑
  • data:資料
  • version:資料狀態
    */
    Stat status = zkServer.getZookeeper().setData("/testnode", “xyz”.getBytes(), 2);
    System.out.println(status.getVersion());

6-5 同步非同步刪除zk節點

/**

  • 引數:
  • path:節點路徑
  • version:資料狀態
    */
    zkServer.createZKNode("/test-delete-node", “123”.getBytes(), Ids.OPEN_ACL_UNSAFE);
    zkServer.getZookeeper().delete("/test-delete-node", 2);

我們一定要注意版本號的問題;
當我們需要非同步的呼叫的時候,那我們就要這樣來做:
在這裡插入圖片描述
我們在操作的時候,一定要注意版本號的問題;


6-6 CountDownLatch的介紹

在這裡插入圖片描述


這個時候,就是一個阻塞的狀態:
在這裡插入圖片描述


6-7 CountDownLatch程式碼示例

DangerCenter類:

/**
 * 抽象類,用於演示 危險品化工車監控中心 統一檢查
 */
public abstract class DangerCenter implements Runnable {

	private CountDownLatch countDown;		// 計數器
	private String station;					// 排程站
	private boolean ok;						// 排程站針對當前自己的站點進行檢查,是否檢查ok的標誌
	
	public DangerCenter(CountDownLatch countDown, String station) {
		this.countDown = countDown;
		this.station = station;
		this.ok = false;
	}

	@Override
	public void run() {
		try {
			check();
			ok = true;
		} catch (Exception e) {
			e.printStackTrace();
			ok = false;
		} finally {
			if (countDown != null) {
				countDown.countDown();
			}
		}
	}

	/**
	 * 檢查危化品車
	 * 蒸罐
	 * 汽油
	 * 輪胎
	 * gps
	 * ...
	 */
	public abstract void check();

	public CountDownLatch getCountDown() {
		return countDown;
	}
	public void setCountDown(CountDownLatch countDown) {
		this.countDown = countDown;
	}
	public String getStation() {
		return station;
	}
	public void setStation(String station) {
		this.station = station;
	}
	public boolean isOk() {
		return ok;
	}
	public void setOk(boolean ok) {
		this.ok = ok;
	}
	
}

下面有三個子類:

public class StationBeijingIMooc extends DangerCenter {

	public StationBeijingIMooc(CountDownLatch countDown) {
		super(countDown, "北京慕課排程站");
	}

	@Override
	public void check() {
		System.out.println("正在檢查 [" + this.getStation() + "]...");
		
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
	}

}
public class StationJiangsuSanling extends DangerCenter {

	public StationJiangsuSanling(CountDownLatch countDown) {
		super(countDown, "江蘇三林排程站");
	}

	@Override
	public void check() {
		System.out.println("正在檢查 [" + this.getStation() + "]...");
		
		try {
			Thread.sleep(1500);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
	}

}
public class StationShandongChangchuan extends DangerCenter {

	public StationShandongChangchuan(CountDownLatch countDown) {
		super(countDown, "山東長川排程站");
	}

	@Override
	public void check() {
		System.out.println("正在檢查 [" + this.getStation() + "]...");
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		System.out.println("檢查 [" + this.getStation() + "] 完畢,可以發車~");
	}

}

6-8 獲取zk節點資料

在這裡插入圖片描述


/**
 * 
 * @Description: zookeeper 獲取節點資料的demo演示
 */
public class ZKGetNodeData implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.1.110:2181";
	public static final Integer timeout = 5000;
	private static Stat stat = new Stat();
	
	public ZKGetNodeData() {}
	
	public ZKGetNodeData(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKGetNodeData());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
	
		ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
		
		/**
		 * 引數:
		 * path:節點路徑
		 * watch:true或者false,註冊一個watch事件
		 * stat:狀態
		 */
		byte[] resByte = zkServer.getZookeeper().getData("/imooc", true, stat);
		String result = new String(resByte);
		System.out.println("當前值:" + result);
		countDown.await();
	}
	
	@Override
	public void process(WatchedEvent event) {
		try {
			if(event.getType() == EventType.NodeDataChanged){
				ZKGetNodeData zkServer = new ZKGetNodeData(zkServerPath);
				byte[] resByte = zkServer.getZookeeper().getData("/imooc", false, stat);
				String result = new String(resByte);
				System.out.println("更改後的值:" + result);
				System.out.println("版本號變化dversion:" + stat.getVersion());
				countDown.countDown();
			} else if(event.getType() == EventType.NodeCreated) {
				
			} else if(event.getType() == EventType.NodeChildrenChanged) {
				
			} else if(event.getType() == EventType.NodeDeleted) {
				
			} 
		} catch (KeeperException e) { 
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}
}

6-9 獲取zk子節點列表

/**
 * @Description: zookeeper 獲取子節點資料的demo演示
 */
public class ZKGetChildrenList implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.1.110:2181";
	public static final Integer timeout = 5000;
	
	public ZKGetChildrenList() {}
	
	public ZKGetChildrenList(String connectString) {
		try {
			zookeeper = new ZooKeeper(connectString, timeout, new ZKGetChildrenList());
		} catch (IOException e) {
			e.printStackTrace();
			if (zookeeper != null) {
				try {
					zookeeper.close();
				} catch (InterruptedException e1) {
					e1.printStackTrace();
				}
			}
		}
	}
	
	private static CountDownLatch countDown = new CountDownLatch(1);
	
	public static void main(String[] args) throws Exception {
	
		ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
		
		/**
		 * 引數:
		 * path:父節點路徑
		 * watch:true或者false,註冊一個watch事件
		 */
//		List<String> strChildList = zkServer.getZookeeper().getChildren("/imooc", true);
//		for (String s : strChildList) {
//			System.out.println(s);
//		}
		
		// 非同步呼叫
		String ctx = "{'callback':'ChildrenCallback'}";
//		zkServer.getZookeeper().getChildren("/imooc", true, new ChildrenCallBack(), ctx);
		zkServer.getZookeeper().getChildren("/imooc", true, new Children2CallBack(), ctx);
		
		countDown.await();
	}
	
	@Override
	public void process(WatchedEvent event) {
		try {
			if(event.getType()==EventType.NodeChildrenChanged){
				System.out.println("NodeChildrenChanged");
				ZKGetChildrenList zkServer = new ZKGetChildrenList(zkServerPath);
				List<String> strChildList = zkServer.getZookeeper().getChildren(event.getPath(), false);
				for (String s : strChildList) {
					System.out.println(s);
				}
				countDown.countDown();
			} else if(event.getType() == EventType.NodeCreated) {
				System.out.println("NodeCreated");
			} else if(event.getType() == EventType.NodeDataChanged) {
				System.out.println("NodeDataChanged");
			} else if(event.getType() == EventType.NodeDeleted) {
				System.out.println("NodeDeleted");
			} 
		} catch (KeeperException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

	public ZooKeeper getZookeeper() {
		return zookeeper;
	}
	public void setZookeeper(ZooKeeper zookeeper) {
		this.zookeeper = zookeeper;
	}
	
}

6-10 判斷zk節點是否存在

/**
 * @Description: zookeeper 判斷階段是否存在demo
 */
public class ZKNodeExist implements Watcher {

	private ZooKeeper zookeeper = null;
	
	public static final String zkServerPath = "192.168.1.110:2181";
	public static final Integer timeout = 5000;
	
	public ZKNodeExist() {}
	
	public ZKNodeExist(St