1. 程式人生 > >zookeeper(4)——zookeeper原生java api基本使用介紹

zookeeper(4)——zookeeper原生java api基本使用介紹

本節介紹

本節裡面,將會利用zookeeper官方的原生java api進行連線,然後演示一些建立、刪除、修改、查詢節點的操作。其實有兩個比較好用的第三方客戶端zkclient和curator,可以彌補原生api的很多不足,使用也比較簡便,但是這兩個東西也都是在原生api的基礎上封裝的,所以瞭解原生api的使用方法還是很有必要的,後面有時間的話會跟大家介紹一下另外兩個客戶端。

zookeeper 原生api 使用介紹

先給大家貼一張圖,裡面介紹了zookeeper原生java api的一些基礎用法,可以先簡單瞭解一下,下面會寫一個demo程式對這幾個方法進行逐個介紹。

準備

建立一個maven工程zkTest,然後新增zookeeper、log4j的maven依賴。

<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.4.6</version>
        </dependency>
<dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.15</version>
</dependency>

將下面的log4j.properties配置檔案放到src/main/resources下面即可

log4j.rootLogger = info,stdout,D
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern =  %d %t [%c{2}] %L [%-5p] %m%n
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ../logs/log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = info
log4j.appender.D.DatePattern= '_'yyyy-MM-dd'.log'
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %d %t [%c{2}] %L [%-5p] %m%n

 zookeeper客戶端處理類

package com.wkp.test.zookeeper.base;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;

public class ZookeeperBase {
    
    private static Logger logger=Logger.getLogger(ZookeeperBase.class);

    private ZooKeeper zookeeper;
    private String connectString;//連線zk服務端的ip:port,如果多個格式為 ip:port,ip:port......
    private int sessionTimeout;//客戶端連線超時時間  單位 ms
    /** 訊號量,阻塞程式執行,用於等待zookeeper連線成功,傳送成功訊號 */
    private CountDownLatch countDown=new CountDownLatch(1);
    
    public ZookeeperBase(String connectString,int sessionTimeout) throws IOException{
        this.connectString=connectString;
        this.sessionTimeout=sessionTimeout;
        
        zookeeper=new ZooKeeper(this.connectString, this.sessionTimeout, new Watcher() {
            
            public void process(WatchedEvent event) {
                //影響的路徑
                String path = event.getPath();
                //獲取事件的狀態
                KeeperState state = event.getState();
                //獲取事件的型別
                EventType type = event.getType();
                if(KeeperState.SyncConnected.equals(state)){
                    if(EventType.None.equals(type)){
                        //連線建立成功,則釋放訊號量,讓阻塞的程式繼續向下執行
                        countDown.countDown();
                        logger.info("zk建立連線成功========");
                    }
                }
            }
        });
    }
    
    public ZooKeeper getZkClient() throws InterruptedException{
        //計數器到達0之前,一直阻塞,只有當訊號量被釋放,才會繼續向下執行
        countDown.await();
        return zookeeper;
    }
    
    public void closeClient() throws InterruptedException{
        if(zookeeper!=null){
            zookeeper.close();
            logger.info("zk關閉連線成功=======");
        }
    }
    
    /**
     * 建立節點
     * @param path 節點path
     * @param data 節點資料
     * @return
     * @throws InterruptedException
     * @throws KeeperException
     */
    public String createNode(String path,String data) throws InterruptedException, KeeperException{
        ZooKeeper zk = this.getZkClient();
        String str = zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        return str;
    }
    
    /**
     * 更新節點資料
     * @param path 節點path
     * @param data 節點資料
     * @return
     * @throws InterruptedException
     * @throws KeeperException
     */
    public Stat updateNode(String path,String data) throws InterruptedException, KeeperException{
        ZooKeeper zk = this.getZkClient();
        Stat stat = zk.setData(path, data.getBytes(),-1);
        return stat;
    }
    
    /**
     * 獲得節點資料
     * @param path 節點path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String getData(String path) throws KeeperException, InterruptedException{
        ZooKeeper zk = this.getZkClient();
        byte[] data = zk.getData(path, false, null);
        return new String(data);
    }
    
    /**
     * 獲取當前節點的子節點(不包含孫子節點)
     * @param path 父節點path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public List<String> getChildren(String path) throws KeeperException, InterruptedException{
        ZooKeeper zk = this.getZkClient();
        List<String> list = zk.getChildren(path, false);
        return list;
    }
    
    /**
     * 判斷節點是否存在
     * @param path
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public Stat exists(String path) throws KeeperException, InterruptedException{
        ZooKeeper zk = this.getZkClient();
        Stat stat = zk.exists("/", false);
        return stat;
    }
    
    /**
     * 刪除節點
     * @param path
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void deleteNode(String path) throws InterruptedException, KeeperException{
        ZooKeeper zk = this.getZkClient();
        zk.delete(path, -1);
        logger.info("刪除 "+path+" 節點成功");
    }
    
    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        ZookeeperBase base = new ZookeeperBase("192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181", 2000);
        ZooKeeper zkClient = base.getZkClient();
        System.out.println("sessionId:"+zkClient.getSessionId()+",sessionTimeOut:"+zkClient.getSessionTimeout());
        base.closeClient();
    }
}

1、建立客戶端連線

ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

 我們看到Zookeeper的構造方法需要三個引數,第一個connectString是連線字串,格式為"ip:port,ip:port......",例如上面程式碼中的"192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181",sessionTimeout為連線超時時間,單位毫秒,watcher是一個監聽物件,zookeeper中對資料的變更操作都會有對應的事件,如果對節點註冊了監聽,當節點狀態發生變化時就會被註冊的監聽物件監控到(關於zookeeper的事件監聽機制下一節會有專門的介紹)。

我們看到程式碼中用到了一個多執行緒輔助類CountDownLatch,CountDownLatch是一個同步輔助類,構造方法初始化計數器值為1,當CountDownLatch值為0之前,其await()方法會一直阻塞,countDown()方法會將計數器的值減1(關於CountDownLatch的詳細使用方法可以自行查詢一下)。我們看到獲取zookeeper客戶端物件的方法中呼叫了CountDownLatch的await方法,就會一直等待watcher物件中監聽的連線事件被監聽到了,並且呼叫countDown方法後await方法才會被放行,這時候才可以獲取到zookeeper客戶端物件。至於為什麼要使用這個同步計數器呢,其實是因為java中new Zookeeper建立物件立馬就會返回了,而客戶端連線到服務端是耗時的,這個時候並沒有真正的連線成功,如果這個時候拿zk客戶端物件去做操作會報錯,所以我們要等待連線建立成功的時候才能使用客戶端物件。

通過zkClient.getSessionId()可以獲取到當前連線物件的ID,執行main方法會看到控制檯出現下面結果(去掉了環境變數的無用日誌輸出)

2018-09-16 16:06:33,215 main [zookeeper.ZooKeeper] 438 [INFO ] Initiating client connection, connectString=192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181 sessionTimeout=2000 [email protected]
2018-09-16 16:06:33,342 main-SendThread(192.168.74.6:2181) [zookeeper.ClientCnxn] 975 [INFO ] Opening socket connection to server 192.168.74.6/192.168.74.6:2181. Will not attempt to authenticate using SASL (unknown error)
2018-09-16 16:06:33,347 main-SendThread(192.168.74.6:2181) [zookeeper.ClientCnxn] 852 [INFO ] Socket connection established to 192.168.74.6/192.168.74.6:2181, initiating session
2018-09-16 16:06:33,431 main-SendThread(192.168.74.6:2181) [zookeeper.ClientCnxn] 1235 [INFO ] Session establishment complete on server 192.168.74.6/192.168.74.6:2181, sessionid = 0x265e106a0720002, negotiated timeout = 4000
2018-09-16 16:06:33,438 main-EventThread [base.ZookeeperBase] 45 [INFO ] zk建立連線成功========
sessionId:172791579301511170,sessionTimeOut:4000
2018-09-16 16:06:33,462 main [zookeeper.ZooKeeper] 684 [INFO ] Session: 0x265e106a0720002 closed
2018-09-16 16:06:33,463 main-EventThread [zookeeper.ClientCnxn] 512 [INFO ] EventThread shut down
2018-09-16 16:06:33,464 main [base.ZookeeperBase] 61 [INFO ] zk關閉連線成功=======

2、建立節點

我們呼叫createNode方法建立節點,path為 "/testRoot",value為 "rootValue"

        ZookeeperBase base = new ZookeeperBase("192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181", 2000);
        String result = base.createNode("/testRoot", "rootValue");
        System.out.println(result);
        base.closeClient();

然後會看到控制檯輸出了剛才建立的節點路徑 "/testRoot",假如我們再執行一次上面的建立節點的程式的話,會報org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = NodeExists for /testRoot,由此可見不能重複建立節點,否則會報節點已經存在的異常。

建立節點的方法create(String path, byte[] data, List<ACL> acl, CreateMode createMode),需要接收四個引數,path是節點路徑,data是節點值,ACL是zookeeper的許可權相關的,我們檢視上面createNode()方法中傳的是Ids.OPEN_ACL_UNSAFE,表示開放所有許可權(具體的下一節做詳細介紹),createMode表示節點的型別,我們檢視CreateMode的原始碼發現其是一個列舉類,裡面定義了四種節點型別,從前到後分別是持久化節點、持久化有序節點、臨時節點、臨時有序節點。

接下來我們嘗試建立一個a節點下的b節點即 "/a/b",此時a節點還沒有建立

        ZookeeperBase base = new ZookeeperBase("192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181", 2000);
        String result = base.createNode("/a/b", "test");
        System.out.println(result);
        base.closeClient();

此時控制檯會報org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /a/b,然後我們嘗試給testRoot節點建立一個子節點 children 即 "/testRoot/children",

ZookeeperBase base = new ZookeeperBase("192.168.74.4:2181,192.168.74.5:2181,192.168.74.6:2181", 2000);
        String result = base.createNode("/testRoot/children", "childrenValue");
        System.out.println(result);

經執行發現children節點建立成功,由此可見:zookeeper的原生java api是不會自動遞迴建立節點的,即當父節點不存在事不可以建立子節點(下面的方法我沒有寫具體的呼叫,您可以自己參照上面程式碼呼叫驗證哦)

3、更新節點

更新節點資料呼叫的方法是 setData(String path, byte[] data, int version),前兩個引數跟前面建立節點的一樣這裡就不解釋了,最後一個version引數代表希望變更的節點的版本號(前面的章節有介紹過,隨著變更節點版本號也會變更),如果版本號與節點實際版本號不對應會報異常(org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /testRoot/children),如果version傳遞值為 "-1",則表示忽略版本號限制。

4、判斷節點是否存在

  判斷節點是否存在呼叫 exists(String path, boolean watch) 方法,如果存在會返回對應節點,第二個引數是一個布林型別,true表示需要對當前節點進行監聽,false表示不需要監聽,具體的會在下一節介紹。

5、獲取節點資料

獲取節點資料方法getData(String path, boolean watch, Stat stat)

6、獲取當前節點的所有子節點資料

呼叫方法getChildren(String path, boolean watch)會返回所有子節點資料的集合,注意是返回所有子節點的資料,不會返回孫子節點的資料,如果想獲取當前節點下的所有子節點、孫子節點、重孫子節點......需要自己寫程式進行遞迴獲取。

7、刪除節點

刪除節點呼叫方法 delete(String path, int version) ,注意刪除節點是不能遞迴刪除節點,即如果當前節點有子節點,必須先把子節點刪除,才可以刪除父節點,否則會報 org.apache.zookeeper.KeeperException$NotEmptyException: KeeperErrorCode = Directory not empty for /testRoot

下一節會介紹一下zookeeper的監聽機制即本節中的watcher,還會對zookeeper的ACL做個簡單介紹。

相關推薦

zookeeper(4)——zookeeper原生java api基本使用介紹

本節介紹 本節裡面,將會利用zookeeper官方的原生java api進行連線,然後演示一些建立、刪除、修改、查詢節點的操作。其實有兩個比較好用的第三方客戶端zkclient和curator,可以彌補原生api的很多不足,使用也比較簡便,但是這兩個東西也都是在原生api的

第6章 使用ZooKeeper原生Java API進行客戶端開發

使用ZooKeeper原生Java API進行客戶端開發 6-1 建立客戶端與zk服務端的連線 6-2 zk會話重連機制 6-3 同步非同步建立zk節點 6-4 修改zk節點資料 6-5 同步非同步刪除zk節點 6-6 CountDownL

Elasticsearch java api 基本搜索部分詳解

ocl 全部 條件 index mod data trace 服務器ip sin 版權聲明:本文非原創文章,轉載出處:http://blog.csdn.net/molong1208/article/details/50512149 一、所使用版本的介紹 使用的是ela

Elasticsearch 5.4.3實戰--Java API調用:搜索

fas ack 聚合 turn result ets str iterator ise ES有多種查詢方式,我自己的業務是需要對多個字段進行查詢,具體實現類代碼如下。 1 package com.cs99lzzs.elasticsearch.service.imp

06_zookeeper原生Java API使用

edev 集群 err main .com 連接狀態 com 通知 bool 【Zookeeper構造方法概述】 /** * 客戶端和zk服務端的連接是一個異步的過程 * 當連接成功後,客戶端會收到一個watch通知 *

4、軟體測試的基本介紹

1.1 軟體測試行業基本介紹一、為什麼需要軟體測試1.一款軟體從無到有會經歷很多的開發階段由不同的人來參與開發,所以最終產出的軟體功能可能會存在問題。因此為了保證軟體的功能是可用的,我們必須要進行測試。2.當前的軟體件行業已經不在是功能為王了,使用者不僅僅只盯著軟體的功能是否滿足需求.還會對軟體是否容易上手.

elasticsearch的java api基本操作

1.新增依賴 預設elasticsearch的配置已經沒有問題了,本文使用elasticsearch版本為6.4.2,依賴如下: <dependency> <groupId>org.elasticsearch</g

HBase Java API 基本操作

學完hbase shell API的基本操作之後,可以通過Java API 對hbase基本操作實現一把。 基本概念 java類 對應資料模型 HBaseConfiguration HBase配置類 HBaseAdmin HBase管理A

大資料實時計算Spark學習筆記(4)—— Spak核心 API 模組介紹

1 Spark 介紹 1.1 Spark 特點 速度:在記憶體中儲存中間結果 支援多種語言 內建 80+ 的運算元 高階分析:MR,SQL/ Streaming/Mlib/Graph 1.2 Spark 模組 core : 通用執行

Elasticsearch java api 基本搜尋部分詳解

一、所使用版本的介紹 使用的是elasticsearch2.1.0版本,在此只是簡單介紹搜尋部分的api使用 二、簡單的搜尋 使用api的時候,基本上可以將DSL搜尋的所有情況均寫出來,在此給出一個最簡單搜尋的全部的過程以及程式碼,之後將對不同的搜尋只是針對函式進行介紹 (

Elasticsearch java api 基本使用之增、刪、改、查

主要參考el的java官方文件:https://www.elastic.co/guide/en/elasticsearch/client/java-api/1.7/generate.html 一篇部落格:http://www.cnblogs.com/huangfox/p/3

zookeeper 基本命令和java api

1.Zookeeper命令工具 啟動zookeeper服務後,連線到zookeeper服務: zkServer -server localhost:2181 執行結果如下: Connecting to localhost:2181 2018-06-10 03:5

Zookeeper使用--Java API(轉)

包括 acl logs 新的 ack max false pen strong [轉載自作者leesf,原文鏈接] 操作示例   1 創建節點   創建節點有異步和同步兩種方式。無論是異步或者同步,Zookeeper都不支持遞歸調用,即無法在父節點不存在的情況下

使用ZooKeeper提供的Java API操作ZooKeeper

zookeeper 服務協調框架 分布式 集群 Java API 建立客戶端與zk服務端的連接 我們先來創建一個普通的maven工程,然後在pom.xml文件中配置zookeeper依賴: <dependencies> <dependency>

使用Java API操作zookeeper的acl權限

zookeeper Java API 分布式 服務協調框架 ACL權限 默認匿名權限 ZooKeeper提供了如下幾種驗證模式(scheme): digest:Client端由用戶名和密碼驗證,譬如user:password,digest的密碼生成方式是Sha1摘要的base64形式 a

ZooKeeper Java API

state exce 處理 art throws 枚舉類 event exception 類型 org.apache.zookeeper.ZookeeperZookeeper 是在 Java 中客戶端主類,負責建立與 zookeeper 集群的會話, 並提供方法進行操作。o

zookeeperzookeeper基本命令及通過Java操作zk

接上講,這節主要講一下zookeeper的常用命令和如何使用java操作zk. 首先連線zookeeper 客戶端: #進入zookeeper安裝目錄下bin目錄,啟動zk客戶端 cd /usr/local/zookeeper/bin ./zkCli.sh 然後不知道zk的常用命令?沒

zookeeper java api(2)

    這裡介紹其他的API對zookeeper的操作。 同步方式獲取子節點資料 public static void getChildrenSync() throws KeeperException, InterruptedException {

zookeeper java api(1)

1 Zookeeper安裝以及啟動     這裡我已經進行了安裝,並且啟動了Zookeeper。埠是2182 2 Zookeeper config tickTime=2000 initLimit=10 syncLimit=5 dataDir=D://zook

Java API 操作 zookeeper

1.需要的jar包: 1.1 zookeeper的核心包:zookeeper-3.4.5.jar 1.2 log4j的jar包:log4j-1.2.15.jar 1.3 netty的jar包:netty-3.2.2.Final.jar 1.4 slf4j的jar