1. 程式人生 > >跟著例項學習ZooKeeper的用法: Curator擴充套件庫

跟著例項學習ZooKeeper的用法: Curator擴充套件庫

還記得Curator提供哪幾個元件嗎? 我們不妨回顧一下:

  • Recipes
  • Framework
  • Utilities
  • Client
  • Errors
  • Extensions

前面的例子其實前五個元件都涉及到了, 比如Utilities例子的TestServer, Client裡的CuratorZookeeperClient, Errors裡的ConnectionStateListener等。 還有最後一個元件我們還沒有介紹,那就是Curator擴充套件元件。

Recipes元件包含了豐富的Curator應用的元件。 但是這些並不是ZooKeeper Recipe的全部。 大量的分散式應用已經抽象出了許許多多的的Recipe,其中有些還是可以通過Curator來實現。 如果不斷都將這些Recipe都增加到Recipes中, Recipes會變得越來越大。 為了避免這種狀況, Curator把一些其它的Recipe放在單獨的包中, 命名方式就是curator-x-<name>,比如curator-x-discovery, curator-x-rpc。 本文就是介紹curator-x-discovery。

這是一個服務發現的Recipe。
我們在介紹臨時節點Ephemeral Node的時候就講到, 可以通過臨時節點建立一個服務註冊機制。 服務啟動後建立臨時節點, 服務斷掉後臨時節點就不存在了。 這個擴充套件抽象了這種功能,聽過了一套API,可以實現服務發現機制。

服務類

我們先介紹一下例子中的服務類。 InstanceDetails定義了服務例項的基本資訊,實際中可能會定義更詳細的資訊。

package com.colobu.zkrecipe.discovery;

import org.codehaus.jackson.map.annotate.JsonRootName;

/**
 * In a real application, the Service payload will most likely be more detailed
 * than this. But, this gives a good example.
 */
@JsonRootName("details") public class InstanceDetails { private String description; public InstanceDetails() { this(""); } public InstanceDetails(String description) { this.description = description; } public void setDescription(String description)
{ this.description = description; } public String getDescription() { return description; } }

ExampleServer相當與你在分散式環境中的服務應用。 每個服務應用例項都類似這個類, 應用啟動時呼叫start, 關閉時呼叫close。

package com.colobu.zkrecipe.discovery;

import java.io.Closeable;
import java.io.IOException;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.UriSpec;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;

/**
 * This shows a very simplified method of registering an instance with the
 * service discovery. Each individual instance in your distributed set of
 * applications would create an instance of something similar to ExampleServer,
 * start it when the application comes up and close it when the application
 * shuts down.
 */
public class ExampleServer implements Closeable {
    private final ServiceDiscovery<InstanceDetails> serviceDiscovery;
    private final ServiceInstance<InstanceDetails> thisInstance;

    public ExampleServer(CuratorFramework client, String path, String serviceName, String description) throws Exception {
        // in a real application, you'd have a convention of some kind for the
        // URI layout
        UriSpec uriSpec = new UriSpec("{scheme}://foo.com:{port}");
        thisInstance = ServiceInstance.<InstanceDetails> builder().name(serviceName).payload(new InstanceDetails(description))
                .port((int) (65535 * Math.random())) // in a real application,
                                                        // you'd use a common
                                                        // port
                .uriSpec(uriSpec).build();
        // if you mark your payload class with @JsonRootName the provided
        // JsonInstanceSerializer will work
        JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
        serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(path).serializer(serializer)
                .thisInstance(thisInstance).build();
    }

    public ServiceInstance<InstanceDetails> getThisInstance() {
        return thisInstance;
    }

    public void start() throws Exception {
        serviceDiscovery.start();
    }

    @Override
    public void close() throws IOException {
        CloseableUtils.closeQuietly(serviceDiscovery);
    }
}

發現中心

DiscoveryExample提供了增加,刪除,顯示,註冊已有的服務的功能。 注意此處服務註冊是由ExampleServer自己完成的, 這比較符合實際的情況。 實際情況是服務自己起來後主動註冊服務。 但是此處啟動又是由DiscoveryExample來呼叫, 純粹為了演示使用。 你可以根據你自己的情況合理安排服務的註冊和啟動。

random命令提供了一個完全由DiscoveryExample控制的服務。 它負責註冊一個服務並啟動。

呼叫close就關閉了服務。

package com.colobu.zkrecipe.discovery;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.ServiceProvider;
import org.apache.curator.x.discovery.details.JsonInstanceSerializer;
import org.apache.curator.x.discovery.strategies.RandomStrategy;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

public class DiscoveryExample {
    private static final String PATH = "/discovery/example";

    public static void main(String[] args) throws Exception {
        // This method is scaffolding to get the example up and running
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        ServiceDiscovery<InstanceDetails> serviceDiscovery = null;
        Map<String, ServiceProvider<InstanceDetails>> providers = Maps.newHashMap();
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            JsonInstanceSerializer<InstanceDetails> serializer = new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class);
            serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class).client(client).basePath(PATH).serializer(serializer).build();
            serviceDiscovery.start();
            processCommands(serviceDiscovery, providers, client);
        } finally {
            for (ServiceProvider<InstanceDetails> cache : providers.values()) {
                CloseableUtils.closeQuietly(cache);
            }
            CloseableUtils.closeQuietly(serviceDiscovery);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static void processCommands(ServiceDiscovery<InstanceDetails> serviceDiscovery, Map<String, ServiceProvider<InstanceDetails>> providers,
            CuratorFramework client) throws Exception {
        // More scaffolding that does a simple command line processor
        printHelp();
        List<ExampleServer> servers = Lists.newArrayList();
        try {
            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            boolean done = false;
            while (!done) {
                System.out.print("> ");
                String line = in.readLine();
                if (line == null) {
                    break;
                }
                String command = line.trim();
                String[] parts = command.split("\\s");
                if (parts.length == 0) {
                    continue;
                }
                String operation = parts[0];
                String args[] = Arrays.copyOfRange(parts, 1, parts.length);
                if (operation.equalsIgnoreCase("help") || operation.equalsIgnoreCase("?")) {
                    printHelp();
                } else if (operation.equalsIgnoreCase("q") || operation.equalsIgnoreCase("quit")) {
                    done = true;
                } else if (operation.equals("add")) {
                    addInstance(args, client, command, servers);
                } else if (operation.equals("delete")) {
                    deleteInstance(args, command, servers);
                } else if (operation.equals("random")) {
                    listRandomInstance(args, serviceDiscovery, providers, command);
                } else if (operation.equals("list")) {
                    listInstances(serviceDiscovery);
                }
            }
        } finally {
            for (ExampleServer server : servers) {
                CloseableUtils.closeQuietly(server);
            }
        }
    }

    private static void listRandomInstance(String[] args, ServiceDiscovery<InstanceDetails> serviceDiscovery,
            Map<String, ServiceProvider<InstanceDetails>> providers, String command) throws Exception {
        // this shows how to use a ServiceProvider
        // in a real application you'd create the ServiceProvider early for the
        // service(s) you're interested in
        if (args.length != 1) {
            System.err.println("syntax error (expected random <name>): " + command);
            return;
        }
        String serviceName = args[0];
        ServiceProvider<InstanceDetails> provider = providers.get(serviceName);
        if (provider == null) {
            provider = serviceDiscovery.serviceProviderBuilder().serviceName(serviceName).providerStrategy(new RandomStrategy<InstanceDetails>()).build();
            providers.put(serviceName, provider);
            provider.start();
            Thread.sleep(2500); // give the provider time to warm up - in a real
                                // application you wouldn't need to do this
        }
        ServiceInstance<InstanceDetails> instance = provider.getInstance();
        if (instance == null) {
            System.err.println("No instances named: " + serviceName);
        } else {
            outputInstance(instance);
        }
    }

    private static void listInstances(ServiceDiscovery<InstanceDetails> serviceDiscovery) throws Exception {
        // This shows how to query all the instances in service discovery
        try {
            Collection<String> serviceNames = serviceDiscovery.queryForNames();
            System.out.println(serviceNames.size() + " type(s)");
            for (String serviceName : serviceNames) {
                Collection<ServiceInstance<InstanceDetails>> instances = serviceDiscovery.queryForInstances(serviceName);
                System.out.println(serviceName);
                for (ServiceInstance<InstanceDetails> instance : instances) {
                    outputInstance(instance);
                }
            }
        } finally {
            CloseableUtils.closeQuietly(serviceDiscovery);
        }
    }

    private static void outputInstance(ServiceInstance<InstanceDetails> instance) {
        System.out.println("\t" + instance.getPayload().getDescription() + ": " + instance.buildUriSpec());
    }

    private static void deleteInstance(String[] args, String command, List<ExampleServer> servers) {
        // simulate a random instance going down
        // in a real application, this would occur due to normal operation, a
        // crash, maintenance, etc.
        if (args.length != 1) {
            System.err.println("syntax error (expected delete <name>): " + command);
            return;
        }
        final String serviceName = args[0];
        ExampleServer server = Iterables.find(servers, new Predicate<ExampleServer>() {
            @Override
            public boolean apply(ExampleServer server) {
                return server.getThisInstance().getName().endsWith(serviceName);
            }
        }, null);
        if (server == null) {
            System.err.println("No servers found named: " + serviceName);
            return;
        }
        servers.remove(server);
        CloseableUtils.closeQuietly(server);
        System.out.println("Removed a random instance of: " + serviceName);
    }

    private static void addInstance(String[] args, CuratorFramework client, String command, List<ExampleServer> servers) throws Exception {
        // simulate a new instance coming up
        // in a real application, this would be a separate process
        if (args.length < 2) {
            System.err.println("syntax error (expected add <name> <description>): " + command);
            return;
        }
        StringBuilder description = new StringBuilder();
        for (int i = 1; i < args.length; ++i) {
            if (i > 1) {
                description.append(' ');
            }
            description.append(args[i]);
        }
        String serviceName = args[0];
        ExampleServer server = new ExampleServer(client, PATH, serviceName, description.toString());
        servers.add(server);
        server.start();
        System.out.println(serviceName + " added");
    }

    private static void printHelp() {
        System.out.println("An example of using the ServiceDiscovery APIs. This example is driven by entering commands at the prompt:\n");
        System.out.println("add <name> <description>: Adds a mock service with the given name and description");
        System.out.println("delete <name>: Deletes one of the mock services with the given name");
        System.out.println("list: Lists all the currently registered services");
        System.out.println("random <name>: Lists a random instance of the service with the given name");
        System.out.println("quit: Quit the example");
        System.out.println();
    }
}

其它擴充套件

其它兩個擴充套件Curator RPC Proxy(curator-x-rpc)擴充套件和Service Discovery Server(curator-x-discovery-server)是為了橋接非Java應用的擴充套件,本系列將不再介紹了。感興趣的朋友可以看下面的 文件。 Curator Service Discovery Curator RPC Proxy