1. 程式人生 > >跟著例項學習ZooKeeper的用法: Leader選舉

跟著例項學習ZooKeeper的用法: Leader選舉

ZooKeeper官方給出了使用zookeeper的幾種用途

  • Leader Election
  • Barriers
  • Queues
  • Locks
  • Two-phased Commit
  • 其它應用如Name Service, Configuration, Group Membership

在實際使用ZooKeeper開發中,我們最常用的是Apache Curator。 它由Netflix公司貢獻給Apache,目前版本2.7.0。 相信你在使用ZK API開發時會遇到讓人頭疼的幾個問題,ZK連線管理、SESSION失效等一些異常問題的處理,Curator替我們解決了這些問題,通過對ZK連線狀態的監控來做出相應的重連等操作,並觸發事件。 更好的地方是Curator對ZK的一些應用場景提供了非常好的實現,而且有很多擴充,這些都符合ZK使用規範。 它的主要元件為:

  • Recipes, ZooKeeper的系列recipe實現, 基於 Curator Framework.
  • Framework, 封裝了大量ZooKeeper常用API操作,降低了使用難度, 基於Zookeeper增加了一些新特性,對ZooKeeper連結的管理,對連結丟失自動重新連結。
  • Utilities,一些ZooKeeper操作的工具類包括ZK的叢集測試工具路徑生成等非常有用,在Curator-Client包下org.apache.curator.utils。
  • Client,ZooKeeper的客戶端API封裝,替代官方 ZooKeeper class,解決了一些繁瑣低階的處理,提供一些工具類。
  • Errors,異常處理, 連線異常等
  • Extensions,對curator-recipes的擴充套件實現,拆分為 curator-:stuck_out_tongue_closed_eyes:iscovery和 curator-:stuck_out_tongue_closed_eyes:iscovery-server提供基於RESTful的Recipes WEB服務.

Recipe 詞典的意思是食譜,配方,美食菜譜,烹飪法, 延伸用法:某項計劃或步驟來取得預先給定的結果。 在計算機領域沒有合適的漢語對應,如果把ZooKeeper看成菜的話,recipe就相當於菜譜, 比如麻婆豆腐, 宮保雞丁。

由於內容較多, 將會分成多篇文章進行介紹。

除了ZK 的”Two-phased Commit”的recipe外, Curator提供了全部的ZK的recipe, 而且分類更詳細。 這篇文章將會以例項的方式介紹這些Recipe。 一旦你領會了這些Recipe,就可以在專案中很好的使用ZooKeeper的強大威力。

leader選舉

在分散式計算中, leader election是很重要的一個功能, 這個選舉過程是這樣子的: 指派一個程序作為組織者,將任務分發給各節點。 在任務開始前, 哪個節點都不知道誰是leader或者coordinator. 當選舉演算法開始執行後, 每個節點最終會得到一個唯一的節點作為任務leader. 除此之外, 選舉還經常會發生在leader意外宕機的情況下,新的leader要被選舉出來。

Curator 有兩種選舉recipe, 你可以根據你的需求選擇合適的。

Leader latch

首先我們看一個使用LeaderLatch類來選舉的例子。 它的建構函式如下:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

必須啟動LeaderLatch: leaderLatch.start(); 一旦啟動, LeaderLatch會和其它使用相同latch path的其它LeaderLatch交涉,然後隨機的選擇其中一個作為leader。 你可以隨時檢視一個給定的例項是否是leader:

public boolean hasLeadership()

類似JDK的CountDownLatch, LeaderLatch在請求成為leadership時有block方法:

public void await()
          throws InterruptedException,
                 EOFException
Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.

public boolean await(long timeout,
                     TimeUnit unit)
             throws InterruptedException

一旦不使用LeaderLatch了,必須呼叫close方法。 如果它是leader,會釋放leadership, 其它的參與者將會選舉一個leader。

異常處理 LeaderLatch例項可以增加ConnectionStateListener來監聽網路連線問題。 當 SUSPENDED 或 LOST 時, leader不再認為自己還是leader.當LOST 連線重連後 RECONNECTED,LeaderLatch會刪除先前的ZNode然後重新建立一個. LeaderLatch使用者必須考慮導致leadershi丟失的連線問題。 強烈推薦你使用ConnectionStateListener。

下面看例子:

package com.colobu.zkrecipe.leaderelection;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.List;
import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

import com.google.common.collect.Lists;

public class LeaderLatchExample {
    private static final int CLIENT_QTY = 10;
    private static final String PATH = "/examples/leader";

    public static void main(String[] args) throws Exception {

        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; ++i) {
                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                clients.add(client);
                LeaderLatch example = new LeaderLatch(client, PATH, "Client #" + i);
                examples.add(example);
                client.start();
                example.start();
            }

            Thread.sleep(20000);

            LeaderLatch currentLeader = null;
            for (int i = 0; i < CLIENT_QTY; ++i) {
                LeaderLatch example = examples.get(i);
                if (example.hasLeadership())
                    currentLeader = example;
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();
            examples.get(0).await(2, TimeUnit.SECONDS);
            System.out.println("Client #0 maybe is elected as the leader or not although it want to be");
            System.out.println("the new leader is " + examples.get(0).getLeader().getId());

            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderLatch exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

首先我們建立了10個LeaderLatch,啟動後它們中的一個會被選舉為leader。 因為選舉會花費一些時間,start後並不能馬上就得到leader。 通過hasLeadership檢視自己是否是leader, 如果是的話返回true。 可以通過.getLeader().getId()可以得到當前的leader的ID。 只能通過close釋放當前的領導權。 await是一個阻塞方法, 嘗試獲取leader地位,但是未必能上位。

Leader Election

Curator還提供了另外一種選舉方法。 注意涉及以下四個類:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

重要的是LeaderSelector類,它的建構函式為:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

類似LeaderLatch,必須start: leaderSelector.start(); 一旦啟動,當例項取得領導權時你的listener的takeLeadership()方法被呼叫. 而takeLeadership()方法只有領導權被釋放時才返回。 當你不再使用LeaderSelector例項時,應該呼叫它的close方法。

異常處理 LeaderSelectorListener類繼承ConnectionStateListener.LeaderSelector必須小心連線狀態的 改變. 如果例項成為leader, 它應該相應SUSPENDED 或 LOST. 當 SUSPENDED 狀態出現時, 例項必須假定在重新連線成功之前它可能不再是leader了。 如果LOST狀態出現, 例項不再是leader, takeLeadership方法返回.

重要: 推薦處理方式是當收到SUSPENDED 或 LOST時丟擲CancelLeadershipException異常. 這會導致LeaderSelector例項中斷並取消執行takeLeadership方法的異常. 這非常重要, 你必須考慮擴充套件LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter提供了推薦的處理邏輯。

這個例子摘自官方。 首先建立一個ExampleClient類, 它繼承LeaderSelectorListenerAdapter, 它實現了takeLeadership方法:

package com.colobu.zkrecipe.leaderelection;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


public class ExampleClient extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public ExampleClient(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership進行任務的分配等等,並且不要返回,如果你想要要此例項一直是leader的話可以加一個死迴圈。 leaderSelector.autoRequeue();保證在此例項釋放領導權之後還可能獲得領導權。 在這裡我們使用AtomicInteger來記錄此client獲得領導權的次數, 它是”fair”, 每個client有平等的機會獲得領導權。

測試程式碼:

package com.colobu.zkrecipe.leaderelection;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.test.TestingServer;
import org.apache.curator.utils.CloseableUtils;

import com.google.common.collect.Lists;

public class LeaderSelectorExample {
    private static final int CLIENT_QTY = 10;
    private static final String PATH = "/examples/leader";

    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<ExampleClient> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; ++i) {
                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                clients.add(client);
                ExampleClient example = new ExampleClient(client, PATH, "Client #" + i);
                examples.add(example);
                client.start();
                example.start();
            }

            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (ExampleClient exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

與LeaderLatch, 通過LeaderSelectorListener可以對領導權進行控制, 在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。 而LeaderLatch一根筋到死, 除非呼叫close方法,否則它不會釋放領導權。

參考文件