1. 程式人生 > >Java API 操作 zookeeper

Java API 操作 zookeeper

1.需要的jar包: 1.1 zookeeper的核心包:zookeeper-3.4.5.jar 1.2 log4j的jar包:log4j-1.2.15.jar 1.3 netty的jar包:netty-3.2.2.Final.jar 1.4 slf4j的jar包:slf4j-api-1.6.1.jar 和 slf4j-log4j12-1.6.1.jar 在zookeeper的壓縮檔案裡面就有: https://pan.baidu.com/s/1imnS-78bqSorM5tkI4SkEQ 在這裡插入圖片描述

程式碼:

package com.thp.bigdata;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Before;
import org.junit.Test;

/**
 * zookeeper的客戶端 資料的增刪改查,已經對資料的監聽跟響應
 * 
 * @author tommy
 *
 */
public class SimpleZkClient {

	// zookeeper叢集裡面只要有半數存活就可用
	
	/**
	 * 注意點:  如果是直接使用的 ip地址的話,那麼我測試的時候總會報錯,ConnectionLoss 的錯誤
	 *         在host檔案裡面配置了IP地址 然後使用主機名的話,程式是可以執行的
	 */
	
	// private static final String connectString = "192.168.17.150:2181,192.168.17.151:2181,192.168.17.153:2181";
	
	private static final String connectString = "bd1:2181,bd2:2181,bd3:2181";

	// 超時時間
	private static final int sessionTimeout = 2000;

	ZooKeeper zkClient = null;

	@Before
	public void init() throws IOException {
		zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			// zookeeper可以根據事件作出相應的響應
			@Override
			public void process(WatchedEvent event) {
				// 收到事件通知後的回撥函式 (應該是我們自己的事件處理邏輯)
				System.out.println(event.getType() + " -- " + event.getPath()); // 事件的型別  --   事件發生的節點
			}
		});
	}

	/**
	 * 資料的增刪改查
	 * 
	 * @throws InterruptedException
	 * @throws KeeperException
	 */

	// 建立資料節點到zookeeper中
	@Test
	public void testCreate() throws KeeperException, InterruptedException {

		// 第一個引數 要建立節點的路徑
		// 第二個引數 data 資料可以是任意的 只需要轉換成byte型別的陣列就可以
		// 第三個引數是 許可權: Ids.CREATOR_ALL_ACL --- 是一個列舉 可以選擇對應的許可權 一般選擇
		// OPEN_ACL_UNSAFE
		// 第四個引數 節點的型別 四種可供選擇
		String nodeCreated = zkClient.create("/eclipse3", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.PERSISTENT);

	}
	
	// 獲取子節點
	@Test
	public void getChildren() throws KeeperException, InterruptedException {
		List<String> children = zkClient.getChildren("/", true);
		for(String child : children) {
			System.out.println(child);
		}
		
		// 睡一段時間 可以檢視到監聽的變化    在zookeeper客戶端 進行修改
		// 注意 :  這個監聽器只監聽一次
		Thread.sleep(Long.MAX_VALUE);   // NodeChildrenChanged -- /
		
	}
}

第二個測試程式碼 可以監聽zookeeper上面節點的變化,但是隻會監聽一次,之後就不再監聽 這樣是不符合業務的需求的,我們應該需要實時的監聽

我們需要改進程式碼:

	@Before
	public void init() throws IOException {
		zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
			// zookeeper可以根據事件作出相應的響應
			@Override
			public void process(WatchedEvent event) {
				// 收到事件通知後的回撥函式 (應該是我們自己的事件處理邏輯)
				System.out.println(event.getType() + " -- " + event.getPath()); // 事件的型別  --   事件發生的節點
				try {
					// getChildren 只是為了 設定這個監聽  相當於又監聽了
					zkClient.getChildren("/", true);
				} catch (KeeperException | InterruptedException e) {
					e.printStackTrace();
				}
			}
		});
	}

在這裡插入圖片描述

判斷節點是否存在:

// 判斷znode是否存在
	@Test
	public void testExist() throws KeeperException, InterruptedException {
		Stat stat = zkClient.exists("/eclipse1", false);
		System.out.println(stat == null ? "not exist" : "exist");
	}
	

刪除節點:

// 刪除znode資料
	@Test
	public void deleteZnode() throws KeeperException, InterruptedException, UnsupportedEncodingException {
		// 引數2是指定刪除的版本  -1 表示刪除所有的版本
		zkClient.delete("/eclipse1", -1);
	}
	

修改節點資料:

// 修改znode資料
	@Test
	public void setData() throws KeeperException, InterruptedException, UnsupportedEncodingException {
		zkClient.setData("/app1", "i lost you".getBytes(), -1);
		byte[] data = zkClient.getData("/app1", false, null);
		System.out.println(new String(data));
	}
	

問題:ConnesctionLoss 連線丟失: 原因: 開啟zookeeper的配置檔案 /root/apps/zookeeper-3.4.5/conf 在這裡插入圖片描述 裡面配置的時候使用的就是主機名,所以我們在使用Java API 的時候連線也需要使用主機名,否則就會連線拒絕 是繫結在名字上的而不是繫結在IP上的,兩端得是一致的