1. 程式人生 > >Zookeeper客戶端Curator使用介紹

Zookeeper客戶端Curator使用介紹

簡介

Curator是Netflix公司開源的一套zookeeper客戶端框架,解決了很多Zookeeper客戶端非常底層的細節開發工作,包括連線重連、反覆註冊Watcher和NodeExistsException異常等等。Patrixck Hunt(Zookeeper)以一句“Guava is to Java that Curator to Zookeeper”給Curator予高度評價。
引子和趣聞:
Zookeeper名字的由來是比較有趣的,下面的片段摘抄自《從PAXOS到ZOOKEEPER分散式一致性原理與實踐》一書:
Zookeeper最早起源於雅虎的研究院的一個研究小組。在當時,研究人員發現,在雅虎內部很多大型的系統需要依賴一個類似的系統進行分散式協調,但是這些系統往往存在分散式單點問題。所以雅虎的開發人員就試圖開發一個通用的無單點問題的分散式協調框架。在立項初期,考慮到很多專案都是用動物的名字來命名的(例如著名的Pig專案),雅虎的工程師希望給這個專案也取一個動物的名字。時任研究院的首席科學家Raghu Ramakrishnan開玩笑說:再這樣下去,我們這兒就變成動物園了。此話一出,大家紛紛表示就叫動物園管理員吧——因為各個以動物命名的分散式元件放在一起,雅虎的整個分散式系統看上去就像一個大型的動物園了,而Zookeeper正好用來進行分散式環境的協調——於是,Zookeeper的名字由此誕生了。

Curator無疑是Zookeeper客戶端中的瑞士軍刀,它譯作”館長”或者”管理者”,不知道是不是開發小組有意而為之,筆者猜測有可能這樣命名的原因是說明Curator就是Zookeeper的館長(腦洞有點大:Curator就是動物園的園長)。
Curator包含了幾個包:
curator-framework:對zookeeper的底層api的一些封裝
curator-client:提供一些客戶端的操作,例如重試策略等
curator-recipes:封裝了一些高階特性,如:Cache事件監聽、選舉、分散式鎖、分散式計數器、分散式Barrier等
Maven依賴(使用curator的版本:2.12.0,對應Zookeeper的版本為:3.4.x,如果跨版本會有相容性問題,很有可能導致節點操作失敗):

引用依賴:

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId
>
<artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>

可重入共享鎖—Shared Reentrant LocK

Shared意味著鎖是全域性可見的, 客戶端都可以請求鎖。 Reentrant和JDK的ReentrantLock類似,即可重入, 意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。 它是由類InterProcessMutex來實現。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

不可重入共享鎖—Shared Lock

這個鎖和上面的InterProcessMutex相比,就是少了Reentrant的功能,也就意味著它不能在同一個執行緒中重入。這個類是InterProcessSemaphoreMutex,使用方法和InterProcessMutex類似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥鎖");
        }
        System.out.println(clientName + " 已獲取到互斥鎖");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥鎖");
        }
        System.out.println(clientName + " 再次獲取到互斥鎖");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 獲取鎖幾次 釋放鎖也要幾次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

訊號量—Shared Semaphore

一個計數的訊號量類似JDK的Semaphore。 JDK中Semaphore維護的一組許可(permits),而Curator中稱之為租約(Lease)。 有兩種方式可以決定semaphore的最大租約數。第一種方式是使用者給定path並且指定最大LeaseSize。第二種方式使用者給定path並且使用SharedCountReader類。如果不使用SharedCountReader, 必須保證所有例項在多程序中使用相同的(最大)租約數量,否則有可能出現A程序中的例項持有最大租約數量為10,但是在B程序中持有的最大租約數量為20,此時租約的意義就失效了。

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

首先我們先獲得了5個租約, 最後我們把它還給了semaphore。 接著請求了一個租約,因為semaphore還有5個租約,所以請求可以滿足,返回一個租約,還剩4個租約。 然後再請求一個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null(租約不足會阻塞到超時,然後返回null,不會主動丟擲異常;如果不設定超時時間,會一致阻塞)。

參考資料:
《從PAXOS到ZOOKEEPER分散式一致性原理與實踐》

總結

最近搞 區塊鏈專案,用java寫的上層邏輯做資料現樓,需要用到zookeeper鎖做分散式功能,故研究使用了下,希望都能有所搜獲。