1. 程式人生 > >(四)、ZooKeeper Java示例

(四)、ZooKeeper Java示例

利用ZooKeeper簡單的建立節點資訊。專案使用的是Maven工程。工程結構如下:


POM檔案

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.lyh.zk</groupId>
  <artifactId>ZooKeeper01-javaClient</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>ZooKeeper01-javaClient</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
		<groupId>junit</groupId>
		<artifactId>junit</artifactId>
		<version>4.1</version>
		<scope>test</scope>
    </dependency>
    <dependency>
	    <groupId>org.apache.zookeeper</groupId>
	    <artifactId>zookeeper</artifactId>
	    <version>3.4.8</version>
	</dependency>
	<dependency>
		<groupId>com.netflix.curator</groupId>
		<artifactId>curator-framework</artifactId>
		<version>1.2.3</version>
	</dependency>
	<dependency>
		<groupId>com.netflix.curator</groupId>
		<artifactId>curator-recipes</artifactId>
		<version>1.2.3</version>
	</dependency>
	<dependency>
		<groupId>com.netflix.curator</groupId>
		<artifactId>curator-test</artifactId>
		<version>1.2.3</version>
	</dependency>
	<dependency>
		<groupId>com.netflix.curator</groupId>
		<artifactId>curator-x-discovery</artifactId>
		<version>1.2.3</version>
	</dependency>
  </dependencies>
</project>
建立ZooKeeper客戶端,MyZooKeeper實現Watcher介面。
package com.lyh.zk;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;

/**
 * ZooKeeper監聽
 * @author liuyuehu
 */
public class MyZooKeeper implements Watcher {
	
	Logger logger = Logger.getLogger(MyZooKeeper.class);
	
	protected CountDownLatch countDownLatch = new CountDownLatch(1);
	//快取時間
	private static final int SESSION_TIME = 2000;
	public static ZooKeeper zooKeeper = null;

	/**
	 * 監控所有被觸發的事件
	 */
	public void process(WatchedEvent event) {
		logger.info("收到事件通知:" + event.getState() );
		if(event.getState()==KeeperState.SyncConnected){  
            countDownLatch.countDown();  
        } 
	}
	
	/**
     * <p>連線Zookeeper</p>
     * <pre>
     *     [關於connectString伺服器地址配置]
     *     格式: 192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181
     *     這個地址配置有多個ip:port之間逗號分隔,底層操作
     *     ConnectStringParser connectStringParser =  new ConnectStringParser(“192.168.1.1:2181,192.168.1.2:2181,192.168.1.3:2181”);
     *     這個類主要就是解析傳入地址列表字串,將其它儲存在一個ArrayList中
     *     ArrayList<InetSocketAddress> serverAddresses = new ArrayList<InetSocketAddress>();
     *     接下去,這個地址列表會被進一步封裝成StaticHostProvider物件,並且在執行過程中,一直是這個物件來維護整個地址列表。
     *     ZK客戶端將所有Server儲存在一個List中,然後隨機打亂(這個隨機過程是一次性的),並且形成一個環,具體使用的時候,從0號位開始一個一個使用。
     *     因此,Server地址能夠重複配置,這樣能夠彌補客戶端無法設定Server權重的缺陷,但是也會加大風險。
     *     [客戶端和服務端會話說明]
     *     ZooKeeper中,客戶端和服務端建立連線後,會話隨之建立,生成一個全域性唯一的會話ID(Session ID)。
     *     伺服器和客戶端之間維持的是一個長連線,在SESSION_TIMEOUT時間內,伺服器會確定客戶端是否正常連線(客戶端會定時向伺服器傳送heart_beat,伺服器重置下次SESSION_TIMEOUT時間)。
     *     因此,在正常情況下,Session一直有效,並且ZK叢集所有機器上都儲存這個Session資訊。
     *     在出現網路或其它問題情況下(例如客戶端所連線的那臺ZK機器掛了,或是其它原因的網路閃斷),客戶端與當前連線的那臺伺服器之間連線斷了,
     *     這個時候客戶端會主動在地址列表(例項化ZK物件的時候傳入構造方法的那個引數connectString)中選擇新的地址進行連線。
     *     [會話時間]
     *     客戶端並不是可以隨意設定這個會話超時時間,在ZK伺服器端對會話超時時間是有限制的,主要是minSessionTimeout和maxSessionTimeout這兩個引數設定的。
     *     如果客戶端設定的超時時間不在這個範圍,那麼會被強制設定為最大或最小時間。 預設的Session超時時間是在2 * tickTime ~ 20 * tickTime
     * </pre>
     * @param connectString  Zookeeper服務地址
     * @param sessionTimeout Zookeeper連線超時時間
     */
	public void connect(String hosts){     
        try {
        	if(zooKeeper == null){
        		// ZK客戶端允許我們將ZK伺服器的所有地址都配置在這裡
    			zooKeeper = new ZooKeeper(hosts,SESSION_TIME,this);     
    			// 使用CountDownLatch.await()的執行緒(當前執行緒)阻塞直到所有其它擁有
    			//CountDownLatch的執行緒執行完畢(countDown()結果為0)
    			countDownLatch.await();
        	}
		} catch (IOException e) {
			logger.error("連線建立失敗,發生 InterruptedException , e " + e.getMessage(), e);
		} catch (InterruptedException e) {
			logger.error( "連線建立失敗,發生 IOException , e " + e.getMessage(), e );
		}     
	}   
	
	/**
	 * 關閉連線
	 */
	public void close(){     
        try {
        	if(zooKeeper != null){
        		zooKeeper.close();
        	}
		} catch (InterruptedException e) {
			logger.error("release connection error ," + e.getMessage() ,e);
		}     
    }  
}
建立一個類,ZKOpeate集中操作ZooKeeper的節點資訊。
package com.lyh.zk;

import java.util.List;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;

public class ZKOperate {
	
	Logger logger = Logger.getLogger(MyZooKeeper.class);

	MyZooKeeper myZooKeeper = new MyZooKeeper();
	
	/**
     * <p>建立zNode節點, String create(path<節點路徑>, data[]<節點內容>, List(ACL訪問控制列表), CreateMode<zNode建立型別>) </p><br/>
     * <pre>
     *     節點建立型別(CreateMode)
     *     1、PERSISTENT:持久化節點
     *     2、PERSISTENT_SEQUENTIAL:順序自動編號持久化節點,這種節點會根據當前已存在的節點數自動加 1
     *     3、EPHEMERAL:臨時節點客戶端,session超時這類節點就會被自動刪除
     *     4、EPHEMERAL_SEQUENTIAL:臨時自動編號節點
     * </pre>
     * @param path zNode節點路徑
     * @param data zNode資料內容
     * @return 建立成功返回true, 反之返回false.
     */
	public boolean createZNode(String path,String data){
		try {
			String zkPath = MyZooKeeper.zooKeeper.create(path, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
			logger.info("ZooKeeper建立節點成功,節點地址:" + zkPath);
			return true;
		} catch (KeeperException e) {
			logger.error("建立節點失敗:" + e.getMessage() + ",path:" + path  ,e);
		} catch (InterruptedException e) {
			logger.error("建立節點失敗:" + e.getMessage() + ",path:" + path  ,e);
		}
		return false;
	}
	
	 /**
     * <p>刪除一個zMode節點, void delete(path<節點路徑>, stat<資料版本號>)</p><br/>
     * <pre>
     *     說明
     *     1、版本號不一致,無法進行資料刪除操作.
     *     2、如果版本號與znode的版本號不一致,將無法刪除,是一種樂觀加鎖機制;如果將版本號設定為-1,不會去檢測版本,直接刪除.
     * </pre>
     * @param path zNode節點路徑
     * @return 刪除成功返回true,反之返回false.
     */
	public boolean deteleZNode(String path){
		try {
			MyZooKeeper.zooKeeper.delete(path, -1);
			logger.info("ZooKeeper刪除節點成功,節點地址:" + path);
			return true;
		} catch (InterruptedException e) {
			logger.error("刪除節點失敗:" + e.getMessage() + ",path:" + path ,e);
		} catch (KeeperException e) {
			logger.error("刪除節點失敗:" + e.getMessage() + ",path:" + path  ,e);
		}
		return false;
	}
	
	/**
     * <p>更新指定節點資料內容, Stat setData(path<節點路徑>, data[]<節點內容>, stat<資料版本號>)</p>
     * <pre>
     *     設定某個znode上的資料時如果為-1,跳過版本檢查
     * </pre>
     * @param path zNode節點路徑
     * @param data zNode資料內容
     * @return 更新成功返回true,返回返回false
     */
	public boolean updateZNodeData(String path,String data){
		try {
			Stat stat = MyZooKeeper.zooKeeper.setData(path, data.getBytes(), -1);
			logger.info( "更新資料成功, path:" + path + ", stat: " + stat );
			return true;
		} catch (KeeperException e) {
			logger.error("更新節點資料失敗:" + e.getMessage() + ",path:" + path ,e);
		} catch (InterruptedException e) {
			logger.error("更新節點資料失敗:" + e.getMessage() + ",path:" + path ,e);
		}
		return false;
	}
	
	/**
     * <p>讀取指定節點資料內容,byte[] getData(path<節點路徑>, watcher<監視器>, stat<資料版本號>)</p>
     * @param path zNode節點路徑
     * @return 節點儲存的值,有值返回,無值返回null
     */
    public String readData( String path ){
        String data = null;
        try {
            data = new String( MyZooKeeper.zooKeeper.getData( path, false, null ) );
            logger.info( "讀取資料成功, path:" + path + ", content:" + data);
        } catch (KeeperException e) {
        	logger.error( "讀取資料失敗,發生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
        	logger.error( "讀取資料失敗,發生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return  data;
    }
    
    /**
     * <p>獲取某個節點下的所有子節點,List getChildren(path<節點路徑>, watcher<監視器>)該方法有多個過載</p>
     * @param path zNode節點路徑
     * @return 子節點路徑集合 說明,這裡返回的值為節點名
     * <pre>
     *     eg.
     *     /node
     *     /node/child1
     *     /node/child2
     *     getChild( "node" )戶的集合中的值為["child1","child2"]
     * </pre>
     * @throws KeeperException
     * @throws InterruptedException
     */
    public List<String> getChild( String path ){
        try{
            List<String> list=MyZooKeeper.zooKeeper.getChildren( path, false );
            if(list.isEmpty()){
            	logger.info( "中沒有節點" + path );
            }
            return list;
        }catch (KeeperException e) {
            logger.error( "讀取子節點資料失敗,發生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
        	logger.error( "讀取子節點資料失敗,發生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return null;
    }
    
    /**
     * <p>判斷某個zNode節點是否存在, Stat exists(path<節點路徑>, watch<並設定是否監控這個目錄節點,這裡的 watcher 是在建立 ZooKeeper 例項時指定的 watcher>)</p>
     * @param path zNode節點路徑
     * @return 存在返回true,反之返回false
     */
    public boolean isExists( String path ){
        try {
            Stat stat = MyZooKeeper.zooKeeper.exists( path, false );
            return null != stat;
        } catch (KeeperException e) {
            logger.error( "讀取資料失敗,發生KeeperException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        } catch (InterruptedException e) {
        	logger.error( "讀取資料失敗,發生InterruptedException! path: " + path
                    + ", errMsg:" + e.getMessage(), e );
        }
        return false;
    }
}
客戶端測試:ZKClientTest.java
package com.lyh.zk;

import java.util.List;

/**
 * 客戶端測試
 * @author liuyuehu
 */
public class ZKClientTest {

	public static void main(String[] args) {
		 // 定義父子類節點路徑
        String rootPath = "/TestZookeeper";
        String child1Path = rootPath + "/hello1";
        String child2Path = rootPath + "/word1";

        //ZKOperate操作API
        ZKOperate zkWatchAPI = new ZKOperate();

        // 連線zk伺服器
        MyZooKeeper zooKeeper = new MyZooKeeper();
        zooKeeper.connect("127.0.0.1:2181");

        // 建立節點資料
        if ( zkWatchAPI.createZNode(rootPath, "<父>節點資料" ) ) {
            System.out.println( "節點[" + rootPath + "]資料內容[" + zkWatchAPI.readData( rootPath ) + "]" );
        }
        // 建立子節點, 讀取 + 刪除
        if ( zkWatchAPI.createZNode( child1Path, "<父-子(1)>節點資料" ) ) {
            System.out.println( "節點[" + child1Path + "]資料內容[" + zkWatchAPI.readData( child1Path ) + "]" );
            zkWatchAPI.deteleZNode(child1Path);
            System.out.println( "節點[" + child1Path + "]刪除值後[" + zkWatchAPI.readData( child1Path ) + "]" );
        }

        // 建立子節點, 讀取 + 修改
        if ( zkWatchAPI.createZNode(child2Path, "<父-子(2)>節點資料" ) ) {
            System.out.println( "節點[" + child2Path + "]資料內容[" + zkWatchAPI.readData( child2Path ) + "]" );
            zkWatchAPI.updateZNodeData(child2Path, "<父-子(2)>節點資料,更新後的資料" );
            System.out.println( "節點[" + child2Path+ "]資料內容更新後[" + zkWatchAPI.readData( child2Path ) + "]" );
        }

        // 獲取子節點
        List<String> childPaths = zkWatchAPI.getChild(rootPath);
        if(null != childPaths){
            System.out.println( "節點[" + rootPath + "]下的子節點數[" + childPaths.size() + "]" );
            for(String childPath : childPaths){
                System.out.println(" |--節點名[" +  childPath +  "]");
            }
        }
        // 判斷節點是否存在
        System.out.println( "檢測節點[" + rootPath + "]是否存在:" + zkWatchAPI.isExists(rootPath)  );
        System.out.println( "檢測節點[" + child1Path + "]是否存在:" + zkWatchAPI.isExists(child1Path)  );
        System.out.println( "檢測節點[" + child2Path + "]是否存在:" + zkWatchAPI.isExists(child2Path)  );


        zooKeeper.close();
	}
}