1. 程式人生 > >跟著例項學習ZooKeeper的用法: 計數器

跟著例項學習ZooKeeper的用法: 計數器

這一篇文章我們將學習使用Curator來實現計數器。 顧名思義,計數器是用來計數的, 利用ZooKeeper可以實現一個叢集共享的計數器。 只要使用相同的path就可以得到最新的計數器值, 這是由ZooKeeper的一致性保證的。Curator有兩個計數器, 一個是用int來計數,一個用long來計數。

SharedCount

這個類使用int型別來計數。 主要涉及三個類。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表計數器, 可以為它增加一個SharedCountListener,當計數器改變時此Listener可以監聽到改變的事件,而SharedCountReader可以讀取到最新的值, 包括字面值和帶版本資訊的值VersionedValue。

例子程式碼:

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import
org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.SharedCountListener; import org.apache.curator.framework.recipes.shared.SharedCountReader; import
org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.test.TestingServer; import com.google.common.collect.Lists; public class SharedCounterExample implements SharedCountListener{ private static final int QTY = 5; private static final String PATH = "/examples/counter"; public static void main(String[] args) throws IOException, Exception { final Random rand = new Random(); SharedCounterExample example = new SharedCounterExample(); try (TestingServer server = new TestingServer()) { CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3)); client.start(); SharedCount baseCount = new SharedCount(client, PATH, 0); baseCount.addListener(example); baseCount.start(); List<SharedCount> examples = Lists.newArrayList(); ExecutorService service = Executors.newFixedThreadPool(QTY); for (int i = 0; i < QTY; ++i) { final SharedCount count = new SharedCount(client, PATH, 0); examples.add(count); Callable<Void> task = new Callable<Void>() { @Override public Void call() throws Exception { count.start(); Thread.sleep(rand.nextInt(10000)); System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))); return null; } }; service.submit(task); } service.shutdown(); service.awaitTermination(10, TimeUnit.MINUTES); for (int i = 0; i < QTY; ++i) { examples.get(i).close(); } baseCount.close(); } } @Override public void stateChanged(CuratorFramework arg0, ConnectionState arg1) { System.out.println("State changed: " + arg1.toString()); } @Override public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception { System.out.println("Counter's value is changed to " + newCount); } }

在這個例子中,我們使用baseCount來監聽計數值(addListener方法)。 任意的SharedCount, 只要使用相同的path,都可以得到這個計數值。 然後我們使用5個執行緒為計數值增加一個10以內的隨機數。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

這裡我們使用trySetCount去設定計數器。 第一個引數提供當前的VersionedValue,如果期間其它client更新了此計數值, 你的更新可能不成功, 但是這時你的client更新了最新的值,所以失敗了你可以嘗試再更新一次。 而setCount是強制更新計數器的值。

注意計數器必須start,使用完之後必須呼叫close關閉它。

在這裡再重複一遍前面講到的, 強烈推薦你監控ConnectionStateListener, 儘管我們的有些例子沒有監控它。 在本例中SharedCountListener擴充套件了ConnectionStateListener。 這一條針對所有的Curator recipes都適用,後面的文章中就不專門提示了。

DistributedAtomicLong

再看一個Long型別的計數器。 除了計數的範圍比SharedCount大了之外, 它首先嚐試使用樂觀鎖的方式設定計數器, 如果不成功(比如期間計數器已經被其它client更新了), 它使用InterProcessMutex方式來更新計數值。 還記得InterProcessMutex是什麼嗎? 它是我們前面跟著例項學習ZooKeeper的用法: 分散式鎖 講的分散式可重入鎖。 這和上面的計數器的實現有顯著的不同。

可以從它的內部實現DistributedAtomicValue.trySet中看出端倪。

    AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此計數器有一系列的操作:

  • get(): 獲取當前值
  • increment(): 加一
  • decrement(): 減一
  • add(): 增加特定的值
  • subtract(): 減去特定的值
  • trySet(): 嘗試設定計數值
  • forceSet(): 強制設定計數值

必須檢查返回結果的succeeded(), 它代表此操作是否成功。 如果操作成功, preValue()代表操作前的值, postValue()代表操作後的值。

我們下面的例子中使用5個執行緒對計數器進行加一操作,如果成功,將操作前後的值打印出來。

package com.colobu.zkrecipe.counter;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;

import com.google.common.collect.Lists;

public class DistributedAtomicLongExample {
    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            List<DistributedAtomicLong> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        try {
                            //Thread.sleep(rand.nextInt(1000));
                            AtomicValue<Long> value = count.increment();
                            //AtomicValue<Long> value = count.decrement();
                            //AtomicValue<Long> value = count.add((long)rand.nextInt(20));
                            System.out.println("succeed: " + value.succeeded());
                            if (value.succeeded())
                                System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                        } catch (Exception e) {
                            e.printStackTrace();
                        }

                        return null;
                    }
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        }

    }

}