1. 程式人生 > >ZooKeeper配置及簡單使用

ZooKeeper配置及簡單使用

安裝並使用ZooKeeper API對Znode進行控制。

Zookeeper配置

下載zookeeper

首先在官網下載zookeeper:

wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz

解壓:

sudo tar xzvf zookeeper-3.4.13.tar.gz -C /usr/local

設定許可權:

chown -R hadoop:hadoop zookeeper

配置環境變數

54189586152

export PATH=
$PATH:/usr/local/zookeeper/bin

修改配置檔案

重新命名:

sudo mv zookeeper-3.4.13/ zookeeper
mv zoo_sample.cfg zoo.cfg

修改zoo.cfg

54189599792

可以改conf檔案,建立多個埠-> 多個server:

54189604085

啟動zookeeper

zkServer.sh start

54189606866

驗證是否啟動

telnet localhost 2181

輸入start,若看到Zookeeper version則說明啟動成功

54189616318

輸入jps看是否啟動:

54189621003

關閉zookeeper

zkServer.sh stop

Zookeeper命令列

參考這裡

進入命令列工具:

zkCli.sh -server 

使用 ls 命令來檢視當前 ZooKeeper 中所包含的內容:

54189647501

下面我們通過 set 命令來對 zk 所關聯的字串進行設定:

54189654067

下面我們將剛才建立的 znode 刪除:

  • delete /zk

刪除節點:

  • rmr /zk

編寫Client程式

建立maven專案

修改pox.xml檔案:

修改為:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Dase</groupId> <artifactId>1</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.13</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies> </project>

下載JUnitGenerator V2.0

IDEA中,JUnit已經預設下載好,我們需要新增JUnitGenerator V2.0外掛:

54189841366

編寫程式

import java.util.List;

import org.apache.zookeeper.*;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;


public class Simple {

    private static final String connectString = "localhost:2181";
    private static final int sessionTimeout = 2000;

    private ZooKeeper zkClient = null;

    public void init() throws Exception {
        zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 收到事件通知後的回撥函式(應該是我們自己的事件處理邏輯)
                System.out.println(event.getType() + "---" + event.getPath());
                try {
                    zkClient.getChildren("/", true);
                } catch (Exception e) {
                }
            }
        });

    }

    /**
     * 資料的增刪改查
     *
     * @throws InterruptedException
     * @throws KeeperException
     */

    // 建立資料節點到zk中
    public void Create() throws KeeperException, InterruptedException {
        // 引數1:要建立的節點的路徑 引數2:節點大資料 引數3:節點的許可權 引數4:節點的型別
        String nodeCreated = zkClient.create("/scott", "hellozk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        //上傳的資料可以是任何型別,但都要轉成byte[]
    }

    //判斷znode是否存在

    public void Exist() throws Exception{
        Stat stat = zkClient.exists("/scott", false);
        System.out.println(stat==null?"not exist":"exist");


    }

    // 獲取子節點
    public void getChildren() throws Exception {
        List<String> children = zkClient.getChildren("/", true);
        for (String child : children) {
            System.out.println(child);
        }
        Thread.sleep(Long.MAX_VALUE);
    }

    //獲取znode的資料
    public void getData() throws Exception {

        byte[] data = zkClient.getData("/scott", false, null);
        System.out.println(new String(data));

    }

    //刪除znode
    public void deleteZnode() throws Exception {

        //引數2:指定要刪除的版本,-1表示刪除所有版本
        zkClient.delete("/eclipse", -1);


    }
    //刪除znode
    public void setData() throws Exception {

        zkClient.setData("/scott", "imissyou angelababy".getBytes(), -1);
        byte[] data = zkClient.getData("/scott", false, null);
        System.out.println(new String(data));

    }
}

Junit

使用Junit進行單元測試,這裡首先需要建立測試類:

使用快捷鍵alt+insert

54189850912

會自動在test中建立同名檔案:

54190266430

修改該檔案:

package test;

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import org.junit.Before; 
import org.junit.After;

import java.util.List;

/** 
* Simple Tester. 
* 
* @author <Authors name> 
* @since <pre>Nov 11, 2018</pre> 
* @version 1.0 
*/ 
public class SimpleTest {
    private static final String connectString = "localhost:2181";
    private static final int sessionTimeout = 2000;

    private ZooKeeper zkClient = null;
@Before
public void testInit() throws Exception {
    zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
        @Override
        public void process(WatchedEvent event) {
            // 收到事件通知後的回撥函式(應該是我們自己的事件處理邏輯)
            System.out.println(event.getType() + "---" + event.getPath());
            try {
                zkClient.getChildren("/", true);
            } catch (Exception e) {
            }
        }
    });
    String nodeCreated = zkClient.create("/scott", "hellozk".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}


/** 
* 
* Method: Exist() 
* 
*/ 
@Test
public void testExist() throws Exception {
    Stat stat = zkClient.exists("/scott", false);
    System.out.println(stat==null?"not exist":"exist");
} 

/** 
* 
* Method: getChildren() 
* 
*/ 
@Test
public void testGetChildren() throws Exception {
    List<String> children = zkClient.getChildren("/", true);
    for (String child : children) {
        System.out.println(child);
    }
//    Thread.sleep(Long.MAX_VALUE);
} 

/** 
* 
* Method: getData() 
* 
*/ 
@Test
public void testGetData() throws Exception {
    byte[] data = zkClient.getData("/scott", false, null);
    System.out.println(new String(data));
} 

/** 
* 
* Method: deleteZnode() 
* 
*/ 
@Test
public void testDeleteZnode() throws Exception {
    zkClient.delete("/scott", -1);
} 

/** 
* 
* Method: setData() 
* 
*/ 
@Test
public void testSetData() throws Exception {
    zkClient.setData("/scott", "imissyou angelababy".getBytes(), -1);

    byte[] data = zkClient.getData("/scott", false, null);
    System.out.println(new String(data));
} 
} 

其中,@Before表示在測試前執行,注意,JUnit不能指定@test執行順序,如果非要指定,需要對函式名進行重新命名,具體參考這裡

測試

對每個函式進行測試:

如:對Exist函式進行測試:

54190292035

直到每個函式通過為止

體會分散式共享鎖的實現

編寫程式

src/main/java下建立DistributedClientLock

import java.util.Collections;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class DistributedClientLock {

    // 會話超時
    private static final int SESSION_TIMEOUT = 2000;
    // zookeeper叢集地址
    private String hosts = "localhost:2181";
    private String groupNode = "locks";
    private String subNode = "sub";
    private boolean haveLock = false;

    private ZooKeeper zk;
    // 記錄自己建立的子節點路徑
    private volatile String thisPath;

    /**
     * 連線zookeeper
     */
    private void connectZookeeper() throws Exception {
        zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {
            public void process(WatchedEvent event) {
                try {

                    // 判斷事件型別,此處只處理子節點變化事件
                    if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {
                        //獲取子節點,並對父節點進行監聽
                        List<String> childrenNodes = zk.getChildren("/" + groupNode, true);
                        String thisNode = thisPath.substring(("/" + groupNode + "/").length());
                        // 去比較是否自己是最小id
                        Collections.sort(childrenNodes);
                        if (childrenNodes.indexOf(thisNode) == 0) {
                            //訪問共享資源處理業務,並且在處理完成之後刪除鎖
                            doSomething();

                            //重新註冊一把新的鎖
                            thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                                    CreateMode.EPHEMERAL_SEQUENTIAL);
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

        // 1、程式一進來就先註冊一把鎖到zk上
        thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                CreateMode.EPHEMERAL_SEQUENTIAL);

        // wait一小會,便於觀察
        Thread.sleep(new Random().nextInt(1000));

        // 從zk的鎖父目錄下,獲取所有子節點,並且註冊對父節點的監聽
        List<String> childrenNodes = zk.getChildren("/" + groupNode, true);

        //如果爭搶資源的程式就只有自己,則可以直接去訪問共享資源 
        if (childrenNodes.size() == 1) {
            doSomething();
            thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
        }
    }

    /**
     * 處理業務邏輯,並且在最後釋放鎖
     */
    private void doSomething() throws Exception {
        try {
            System.out.println("gain lock: " + thisPath);
            Thread.sleep(2000);
            // do something
        } finally {
            System.out.println("finished: " + thisPath);
            //釋放鎖
            zk.delete(this.thisPath, -1);
        }
    }

    public static void main(String[] args) throws Exception {
        DistributedClientLock dl = new DistributedClientLock();
        dl.connectZookeeper();
        Thread.sleep(Long.MAX_VALUE);
    }
}

執行

首先需要在命令列中建立Znode:

create /locks locks

54190386676

然後執行程式,程式會拿到鎖、釋放鎖不停交替執行:

54190390309

參考