1. 程式人生 > >【Zookeeper學習六】——開源客戶端ZKClient和Curator介紹與應用

【Zookeeper學習六】——開源客戶端ZKClient和Curator介紹與應用

前言

在真正的專案中通常使用的是zkclient和curator,而不是原生的zookeeper客戶端,因為zookeeper原生的客戶端存在一定的侷限性,本篇小編主要講解一下這兩種zookeeper客戶端的使用!

內容

1.1zk原生api不足之處:

  • 超時重連,不支援自動,需要手動操作
  • Watch註冊一次後失效
  • 不支援遞迴建立節點

1.2zkclient

ZkClient是一個開源客戶端,在Zookeeper原生API介面的基礎上進行了包裝,更便於開發人員使用。內部實現了Session超時重連,Watcher反覆註冊等功能。像dubbo等框架對其也進行了整合使用。

雖然ZkClient對原生API進行了封裝,但也有它自身的不足之處:

  • 幾乎沒有參考文件;
  • 異常處理簡化(丟擲RuntimeException);
  • 重試機制比較難用;
  • 沒有提供各種使用場景的實現;
    zkclient簡單程式碼示例:
package com.zk.dev.zkClient.day1;  

import org.I0Itec.zkclient.IZkDataListener;  
import org.I0Itec.zkclient.ZkClient;  
import org.junit.After;  
import org.junit.Before;  
import
org.junit.Test; import java.util.concurrent.TimeUnit; public class ZKTest { private ZkClient zk; private String nodeName = "/test"; @Before public void initTest() { zk = new ZkClient("localhost:2181"); } @After public void dispose() { zk.close(); } @Test
public void testListener() throws InterruptedException { // 監聽指定節點的資料變化 zk.subscribeDataChanges(nodeName, new IZkDataListener() { public void handleDataChange(String s, Object o) throws Exception { System.out.println("node data changed!"); System.out.println("node=>" + s); System.out.println("data=>" + o); System.out.println("--------------"); } public void handleDataDeleted(String s) throws Exception { System.out.println("node data deleted!"); System.out.println("s=>" + s); System.out.println("--------------"); } }); System.out.println("ready!"); // junit測試時,防止執行緒退出 while (true) { TimeUnit.SECONDS.sleep(5); } } @Test public void testUpdateConfig() throws InterruptedException { if (!zk.exists(nodeName)) { //建立永久 zk.createPersistent(nodeName); } zk.writeData(nodeName, "1"); zk.writeData(nodeName, "2"); zk.delete(nodeName); zk.delete(nodeName); zk.writeData("/test/ba", "bbb"); } }

1.3Apache curator

  • Apache開源
  • 解決watcher的註冊一次就失效
  • Api更加簡單易用
  • 提供跟多解決方案並且實現:比如分散式鎖
  • 提供常用的zookeeper工具類
  • 程式設計風格更爽

curator簡單程式碼示例:

package com.zk.dev.zkClient.day1;  

import org.apache.curator.RetryPolicy;  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.framework.recipes.cache.NodeCache;  
import org.apache.curator.framework.recipes.cache.NodeCacheListener;  
import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
import org.apache.curator.retry.ExponentialBackoffRetry;  
import org.apache.zookeeper.CreateMode;  
import org.apache.zookeeper.ZooDefs.Ids;  


public class CuratorUtils {  

    public String connectString = "localhost:2181";  
    CuratorFramework  zkclient = null ;  
    public CuratorUtils(){  
        /** 
         * connectString連線字串中間用分號隔開,sessionTimeoutMs session過期時間,connectionTimeoutMs連線超時時間,retryPolicyc連線重試策略 
         */  
        //CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs, retryPolicy)  
        // fluent風格aip   
  //    CuratorFrameworkFactory.builder().sessionTimeoutMs(5000).connectString(connectString).namespace("/test").build();  
        // 重連策略,沒1一秒重試一次,最大重試次數3次  
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
        zkclient = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(5000).retryPolicy(retryPolicy).namespace("tests").build();  
        zkclient.start();  
    }  
    /** 
     * 遞迴建立節點 
     * @param path 
     * @param data 
     * @throws Exception 
     */  
    public void createNode(String path, byte[] data) throws Exception{  
        zkclient.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(Ids.OPEN_ACL_UNSAFE).forPath(path, data);  
    }  
    /** 
     * 遞迴刪除節點 
     * @param path 
     * @throws Exception 
     */  
    public void delNode(String path) throws Exception{  
        zkclient.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);  
    }   public void zkClose(){  
        zkclient.close();  
    }  
    public void delNodeCallBack(String path) throws Exception{  
        zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
    }      
    public void dataChanges(String path) throws Exception{  
        final NodeCache  dataWatch =  new NodeCache(zkclient, path);  
        dataWatch.start(true);  
        dataWatch.getListenable().addListener(new NodeCacheListener(){  

            public void nodeChanged() throws Exception {  
                System.out.println("path==>" + dataWatch.getCurrentData().getPath() + "==data==>" + new String(dataWatch.getCurrentData().getData()));  
            }  

        });  
        zkclient.delete().guaranteed().deletingChildrenIfNeeded().inBackground(new DeleteCallBack()).forPath(path);  
    }      
    public void addChildWatcher(String path) throws Exception{  
        final PathChildrenCache pc = new PathChildrenCache(zkclient, path, true);  
        pc.start(StartMode.POST_INITIALIZED_EVENT);  
        System.out.println("節點個數===>" + pc.getCurrentData().size());  
        pc.getListenable().addListener(new  PathChildrenCacheListener() {  

            public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
                System.out.println("事件監聽到"  + event.getData().getPath());  
                if(event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)){  
                    System.out.println("客戶端初始化節點完成"  + event.getData().getPath());  
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){  
                    System.out.println("新增節點完成"  + event.getData().getPath());  
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)){  
                    System.out.println("刪除節點完成"  + event.getData().getPath());  
                }else if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){  
                    System.out.println("修改節點完成"  + event.getData().getPath());  
                }  
            }  
        });  

    }  
    /*執行執行緒*/  
    public static void main(String[] args) throws Exception{  
        CuratorUtils cu = new CuratorUtils();  
        cu.zkclient.setData().forPath("/aa", "love is not".getBytes());  
        cu.addChildWatcher("/aa");  
        try{  
            Thread.sleep(20000000);  
        }catch(Exception e){};  
    }  
 }  

curator結合spring:

pom中引入:

 <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>${curator.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>${curator.version}</version>
            </dependency>
            <!-- curator(zookeeper的客戶端)包 -->
            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-client</artifactId>
                <version>${curator.version}</version>
            </dependency>
  <!--zookeeper重試策略-->
    <bean id="retryPolicy" class="org.apache.curator.retry.RetryNTimes">
      <!--重試次數-->
      <constructor-arg index="0" value="10"></constructor-arg>
     <!--每次的間隔-->
      <constructor-arg index="1" value="5000"></constructor-arg>
  </bean>

    <!--zookeeper客戶端-->
    <bean id="client" class="org.apache.curator.framework.CuratorFrameworkFactory" factory-method="newClient" init-method="start">
        <constructor-arg index="0" value="192.168.25.131:2181"/>
        <!--session timeout 會話超時時間-->
        <constructor-arg index="1" value="10000"/>
        <!--建立連線的超時時間-->
        <constructor-arg index="2" value="5000"/>
        <!--重試策略-->
        <constructor-arg index="3" ref="retryPolicy"/>
    </bean>

參考部落格

相關推薦

Zookeeper學習——開源客戶ZKClientCurator介紹應用

前言 在真正的專案中通常使用的是zkclient和curator,而不是原生的zookeeper客戶端,因為zookeeper原生的客戶端存在一定的侷限性,本篇小編主要講解一下這兩種zookeeper客戶端的使用! 內容 1.1zk原生api不足之

CTP學習筆記CTP客戶開發指南 學習筆記一

1、組播行情        使用函式CreateFtdcMdApi 建立CThostFtdcMdApi 的例項。其中第一個引數是本地流檔案生成的目錄。流檔案是行情介面或交易介面在本地生成的流檔案,字尾名為.con。流檔案中記錄著客戶端收到的所有的資料流的數量。第二個引數描述

Zookeeper 開源客戶 ZkClient 版本 api介紹示例

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。 ZKClient版本及原始碼 maven依賴 ZkClient目前有兩個不同artifactId的系列。  其中最早

Zookeeper學習(三) 客戶原生API

safe ima call proc string 過程 心跳 current catch 前言 在這篇博客裏我會主要總結下兩個部分的操作: 在安裝ZooKeeper的機器上利用ZKClient連接Zookeeper的集群,然後利用相應的命令做一些簡單的操作。相信很多沒有

機器學習貝葉斯NB

程式碼先貼上,後續總結 from numpy import * # 過濾網站的惡意留言 侮辱性:1 非侮辱性:0 # 建立一個實驗樣本 def loadDataSet(): postingList = [['my','dog','has','flea','problems','h

Linux學習使用者管理

環境  虛擬機器:VMware 10   Linux版本:CentOS-6.5-x86_64   客戶端:Xshell4  FTP:Xftp4 一、增加刪除使用者或組新增使用者useradd scott修改使用者密碼passwd scott(root使用者可以修改所有使用者密碼,普通使用者只可以修改自己的)

Katalon學習Katalon IE的配置

為了在IE上執行自動化測試,您需要以下設定: IE 7或以上版本: 對於所有可用區域,啟用保護模式必須相同(選中/未選中)。 要訪問此設定,請在Windows控制面板中選擇Internet選項卡,然後切換到Security選項卡: 此外,必須為IE 10或更高版本禁用增強保護模

Zookeeper(七)開源客戶

經過上面兩節部落格的介紹,朋友們應該會開始簡單地使用ZooKeeper了。 在這一偏文章中,我們將圍繞ZkClient和Curator這兩個開源的ZooKeeper客戶端產品,再來進一步看看如何更好地使用ZooKeeper。 ZkClient ZkClient是Github

Zookeeper開源客戶ZkClient

ZkClient是由Datameer的工程師開發的開源客戶端,對Zookeeper的原生API進行了包裝,實現了超時重連、Watcher反覆註冊等功能。 ZKClient版本及原始碼 maven依賴 ZkClient目前有兩個不同artifactI

ZooKeeper(四)-- 第三方客戶 ZkClient的使用

實現 kafka int java pid () config obj 9.1 前言   zkClient主要做了兩件事情:     一件是在session loss和session expire時自動創建新的ZooKeeper實例進行重連。     另一件是將一次性wat

OS學習筆記一 處理器、記憶體指令

我們已經知道,處理器是一臺電子計算機的核心,它會在振盪器脈衝的激勵下,從記憶體中獲取指令,併發起一系列由該指令所定義的操作。當這些操作結束之後,它接著再取下一條指令。通常情況下,這個過程是連續不斷、迴圈往復的。 1、暫存器和算數邏輯部件 電子計算機能能做很多事情。計算天氣預報,看

python學習筆記35:爬蟲基礎相關產品API(和風天氣)使用例項

學習《Python3爬蟲、資料清洗與視覺化實戰》時自己的一些實踐。 在網站URL後面跟robots.txt一般就可以看到網站允許和禁止爬取的資源。 GET請求獲取響應內容 最基本的爬蟲。 import requests ''' 中國旅遊網 /www.cntour.

python學習筆記45:認識Matplotlibpyecharts資料視覺化

學習《Python3爬蟲、資料清洗與視覺化實戰》時自己的一些實踐。 Matplotlib資料視覺化 資料準備 import pandas as pd import matplotlib.pyplot as plt df = pd.read_csv("E:/Data/p

spring學習-day2IOC-DI-scope-setter構造器注入

【補充】 這是早就寫了的文章,如今有新的理解,想補充完成。 1.scope 2.IOC 3.DI 4.setter注入和構造器注入 5.init和destory 【打頭說明】 IOC說的是控制反轉,意思就是讓spring來建立物件,竟然讓spring來建立物件

python學習筆記列表、元組字典的迭代

在python中,列表和元組的迭代是通過for....in....來完成的; >>> a=[1,2,3,4,5,6,7] >>> for index in a: ... print(index) ... 1 2 3 4 5

Java學習筆記靜態巢狀類內部類

public class Outer { int outer_x = 100;     class Inner{       public int y = 10;       private int z = 9;       int m = 5;       publ

ML學習筆記25:PCA及繪製降維恢復示意圖

主成分分析 簡述 主成分分析意在學習一個對映 U r

日常學習筆記2019/1/(4,7)(SSM再熟悉網頁傳值)

學生管理系統(SSM簡易版)總結 可以用來再次熟悉SSM https://www.jianshu.com/p/6a594fbea51d 頁面傳值 示例:點選a頁面的p標籤,將p標籤內的學號傳遞到b頁面,之後b頁面根據得到的值,再進行ajax資料請求,顯示此學號的詳情。 a

機器學習trickBatch-Normalization的理解研究

Batch-Normalization概述 15年穀歌發表了一篇文章,名字是《Batch Normalization: Accelerating Deep Network Training by Reducing Internal Covariate Shif

Spring學習筆記Spring中Application ContextServlet Context的區別

1. Servlet Context It is initilized when an servlet application is deployed. Servlet Context holds all the configurations (init