1. 程式人生 > >zookeeper開源客戶端Curator典型應用場景之-Master選舉(十)

zookeeper開源客戶端Curator典型應用場景之-Master選舉(十)

在生產環境中,一般要保證服務的高可用,有時候只需要選出一臺機器來執行,其餘機器處於備用狀態,比如,在分散式系統中很常見的一個問題就是定時任務的執行。如果多臺機器同時執行相同的定時任務,業務複雜則可能出現災難性的後果。我使用的是噹噹網的elastic-job分散式定時任務框架,分一片,部署兩臺機器,其中一臺處於備用狀態,只有一臺機器是工作的,當這臺機器宕機了,備用機器才開始工作。

分散式鎖和Master選舉相似點

上一篇部落格講了curator的分散式鎖應用,分散式鎖和 Master選舉有幾種相似點,實際上其實現機制也相近:

同一時刻只有一個獲取鎖 / 只能有一個leader

對於分散式排他鎖來說,任意時刻,只能有一個程序(對於單程序內的鎖是單執行緒)可以獲得鎖。
對於領導選舉來說,任意時刻,只能有一個成功當選為leader。否則就會出現腦裂。

鎖重入 / 確認自己是leader

對於分散式鎖,需要保證獲得鎖的程序在釋放鎖之前可再次獲得鎖,即鎖的可重入性。
對於領導選舉,Leader需要能夠確認自己已經獲得領導權,即確認自己是Leader。

釋放鎖 / 放棄領導權

鎖的獲得者應該能夠正確釋放已經獲得的鎖,並且當獲得鎖的程序宕機時,鎖應該自動釋放,從而使得其它競爭方可以獲得該鎖,從而避免出現死鎖的狀態。
領導應該可以主動放棄領導權,並且當領導所在程序宕機時,領導權應該自動釋放,從而使得其它參與者可重新競爭領導而避免進入無主狀態。

感知鎖釋放 / 感知領導權釋放

當獲得鎖的一方釋放鎖時,其它對於鎖的競爭方需要能夠感知到鎖的釋放,並再次嘗試獲取鎖。
原來的Leader放棄領導權時,其它參與方應該能夠感知該事件,並重新發起選舉流程。

Curator中選舉分為兩種:

Leader Latch和Leader Election

Leader Latch

LeaderLatch方式就是以一種搶佔方式來決定選主,是一種非公平的領導選舉,誰搶到就是誰,會隨機從候選者中選擇一臺作為leader, 選中後除非leader自己 呼叫close()釋放leadership,否則其他的候選者不能成為leader。

選主過程

假設現在又三個zookeeper的客戶端,如下圖所示,同時競爭leader。這三個客戶端同時向zookeeper叢集註冊Ephemeral且Non-sequence型別的節點,路徑都為/zkroot/leader。
在這裡插入圖片描述
如上圖所示,由於是Non-sequence節點,這三個客戶端只會有一個建立成功,其它節點均建立失敗。此時,建立成功的客戶端(即上圖中的Client 1)即成功競選為 Leader 。其它客戶端(即上圖中的Client 2和Client 3)此時勻為 Follower。

放棄領導權

如果Leader打算主動放棄領導權,直接刪除/zkroot/leader節點即可。
如果Leader程序意外宕機,其與Zookeeper間的Session也結束,該節點由於是Ephemeral型別的節點,因此也會自動被刪除。
此時/zkroot/leader節點不復存在,對於其它參與競選的客戶端而言,之前的Leader已經放棄了領導權。

感知領導權的放棄

由上圖可見,建立節點失敗的節點,除了成為 Follower 以外,還會向/zkroot/leader註冊一個 Watch ,一旦 Leader 放棄領導權,也即該節點被刪除,所有的 Follower 會收到通知。

重新選舉

感知到舊 Leader 放棄領導權後,所有的 Follower 可以再次發起新一輪的領導選舉,如下圖所示。
在這裡插入圖片描述
從上圖中可見
新一輪的領導選舉方法與最初的領導選舉方法完全一樣,都是發起節點建立請求,建立成功即為Leader,否則為Follower,且Follower會Watch該節點。
新一輪的選舉結果,無法預測,與它們在第一輪選舉中的順序無關。這也是該方案被稱為非公平模式的原因。

Leader Latch模式總結
  1. Leader Latch實現很簡單,每一輪的選舉演算法都一樣。
  2. 非公平模式,每一次選舉都是隨機,誰搶到就是誰的,假如是第二次選舉,每個 Follower 通過 Watch 感知到節點被刪除的時間不完全一樣,只要有一個 Follower 得到通知即發起競選。
  3. 給zookeeper造成的負載大,假如有上萬個客戶端都參與競選,意味著同時會有上萬個寫請求傳送給 Zookeper。同時一旦 Leader 放棄領導權,Zookeeper 需要同時通知上萬個 Follower,負載較大。
使用過程
相關的類

LeaderLatch
構造LeaderLatch ,構造方法如下:

public LeaderLatch(CuratorFramework client, String latchPath);
public LeaderLatch(CuratorFramework client, String latchPath, String id);
啟動

通過start()方法啟動之後,再等待幾秒鐘後,Curator會自動從中選舉出Leader。

public void start() throws Exception;

可以呼叫例項的hasLeadership()判斷該例項是否為leader。

public boolean hasLeadership();
嘗試獲取leadership

呼叫await()方法會使執行緒一直阻塞到獲得leadership為止。

public void await() throws InterruptedException, EOFException;
public boolean await(long timeout, TimeUnit unit) throws InterruptedException;
釋放leadership

只能通過close()釋放leadership, 只有leader將leadership釋放時,其他的候選者才有機會被選為leader

public void close() throws IOException;
public synchronized void close(CloseMode closeMode) throws IOException;
示例程式碼
public class TestLeaderLatch {

  private static final String PATH = "/demo/leader";
  /** 5個客戶端 */
  private static final Integer CLIENT_COUNT = 5;

  public static void main(String[] args) throws Exception {
    //5個執行緒,5個客戶端
    ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);
    for (int i = 0; i < CLIENT_COUNT ; i++) {
      final int index = i;
      service.submit(new Runnable() {
        @Override
        public void run() {
          try {
            new TestLeaderLatch().schedule(index);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      });
    }
    //休眠50秒之後結束main方法
    Thread.sleep(30 * 1000);
    service.shutdownNow();
  }

  private void schedule(int thread) throws Exception {

    //獲取一個client
    CuratorFramework client = this.getClient(thread);
    //獲取一個latch
    LeaderLatch latch = new LeaderLatch(client, PATH,String.valueOf(thread));

    //給latch新增監聽,在
    latch.addListener(new LeaderLatchListener() {

      @Override
      public void notLeader() {
        //如果不是leader
        System.out.println("Client [" + thread + "] I am the follower !");
      }

      @Override
      public void isLeader() {
        //如果是leader
        System.out.println("Client [" + thread + "] I am the leader !");
      }
    });

    //開始選取 leader
    latch.start();

    //每個執行緒 休眠時間不一樣,但是最大不能超過 main方法中的那個休眠時間,那個是50秒 到時候main方法結束 會中斷休眠時間
    Thread.sleep(2 * (thread + 5) * 1000);
    if (latch != null) {
      //釋放leadership
      //CloseMode.NOTIFY_LEADER 節點狀態改變時,通知LeaderLatchListener
      latch.close(LeaderLatch.CloseMode.NOTIFY_LEADER);
    }
    if (client != null) {
      client.close();
    }
    System.out.println("Client [" + latch.getId() + "] Server closed...");
  }

  private CuratorFramework getClient(final int thread) {
    RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
    // Fluent風格建立
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("192.168.58.42:2181")
        .sessionTimeoutMs(1000000)
        .connectionTimeoutMs(3000)
        .retryPolicy(rp)
        .build();
    client.start();
    System.out.println("Client [" + thread + "] Server connected...");
    return client;
  }

}

程式執行,輸出以下結果:
Client [3] Server connected…
Client [2] Server connected…
Client [4] Server connected…
Client [0] Server connected…
Client [1] Server connected…
Client [1] I am the leader !
Client [0] Server closed…
Client [1] I am the follower !
Client [1] Server closed…
Client [2] I am the leader !
Client [2] I am the follower !
Client [2] Server closed…
Client [4] I am the leader !
Client [3] Server closed…
Client [4] I am the follower !
Client [4] Server closed…

在上面的程式中,啟動了5個zookeeper客戶端,程式會隨機選中其中一個作為leader。通過註冊監聽的方式來判斷自己是否成為leader。呼叫close()方法釋放當前領導權。有可能優先close的並不是leader節點,但是當leader節點close的時候,可以繼續在已有的節點中重新選舉leader節點。

LeaderElection

上面講了怎麼使用LeaderLatch方式進行master選舉,Curator提供了兩種選舉,一種是LeaderLatch,提供的另一種Leader選舉策略是Leader Election。

跟LeaderLatch選舉策略相比,LeaderElection選舉策略不同之處在於每個例項都能公平獲取領導權,而且當獲取領導權的例項在釋放領導權之後,該例項還有機會再次獲取領導權。

另外,選舉出來的leader不會一直佔有領導權,當 takeLeadership(CuratorFramework client) 方法執行結束之後會自動釋放領導權。LeaderElection屬於公平的選舉方式,通過LeaderSelectorListener可以對領導權進行控制, 在適當的時候釋放領導權,這樣每個節點都有可能獲得領導權。 而LeaderLatch則一直持有leadership, 除非呼叫close方法,否則它不會釋放領導權。

選主過程

如下圖所示,LeaderElection選舉中,各客戶端均建立/zkroot/leader節點,且其型別為Ephemeral與Sequence。
在這裡插入圖片描述
由於是Sequence型別節點,故上圖中三個客戶端均建立成功,只是序號不一樣。此時,每個客戶端都會判斷自己建立成功的節點的序號是不是當前最小的。如果是,則該客戶端為 Leader,否則即為 Follower。

在上圖中,Client1 建立的節點序號為1 ,Client2建立的節點序號為2,Client3建立的節點序號為3。由於最小序號為 1 ,且該節點由Client1建立,故Client 1為 Leader 。

放棄領導權

Leader 如果主動放棄領導權,直接刪除其建立的節點即可。
如果 Leader 所在程序意外宕機,其與 Zookeeper 間的 Session 結束,由於其建立的節點為Ephemeral型別,故該節點自動被刪除。

感知領導權的放棄

與LeaderLatch方式不同,每個 Follower 並非都 Watch 由 Leader 創建出來的節點,而是 Watch 序號剛好比自己序號小的節點。
在上圖中,總共有 1、2、3 共三個節點,因此Client 2 Watch /zkroot/leader1,Client 3 Watch /zkroot/leader2。(注:序號應該是10位數字,而非一位數字,序號最大為int最大值)。
一旦Leader棄權或者宕機,/zkroot/leader1被刪除,Client2可得到通知。此時Client3由於 Watch 的是/zkroot/leader2,故不會得到通知。

重新選舉

Client2得到/zkroot/leader1被刪除的通知後,不會立即成為新的 Leader 。而是先判斷自己的序號2是不是當前最小的序號。在該場景下,其序號確為最小。因此Client 2成為新的 Leader 。
在這裡插入圖片描述
這裡要注意,如果在Client1放棄領導權之前,Client2就宕機了,Client3會收到通知。此時Client3不會立即成為Leader,而是要先判斷自己的序號3是否為當前最小序號。很顯然,由於Client1建立的/zkroot/leader1還在,因此Client 3不會成為新的 Leader ,並向Client2序號2 前面的序號,也即 1 建立 Watch。該過程如下圖所示。
在這裡插入圖片描述

LeaderElection模式總結
  1. 擴充套件性好,每個客戶端都只Watch 一個節點且每次節點被刪除只須通知一個客戶端
  2. 舊 Leader 放棄領導權時,其它客戶端根據競選的先後順序(也即節點序號)成為新 Leader,這也是公平模式的由來。
  3. 延遲相對非公平模式要高,因為它必須等待特定節點得到通知才能選出新的 Leader。
使用過程
相關的類

LeaderSelector
LeaderSelectorListener
LeaderSelectorListenerAdapter
CancelLeadershipException

使用方法 建立 LeaderSelector
public LeaderSelector(CuratorFramework client, String leaderPath, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, ExecutorService executorService, LeaderSelectorListener listener);
public LeaderSelector(CuratorFramework client, String leaderPath, CloseableExecutorService executorService, LeaderSelectorListener listener);
啟動
leaderSelector.start();

一旦啟動,如果獲取了leadership的話,takeLeadership()會被呼叫,只有當leader釋放了leadership的時候,takeLeadership()才會返回。

釋放

呼叫close()釋放 leadership

leaderSelector.close();
示例程式碼
LeaderSelectorListener的實現類

實現LeaderSelectorListener 或者 繼承LeaderSelectorListenerAdapter,用於定義獲取領導權後的業務邏輯:

public class CustomLeaderSelectorListenerAdapter extends LeaderSelectorListenerAdapter implements Closeable {

    /** 客戶端名稱 */
    private String name;
    /** leaderSelector */
    private LeaderSelector leaderSelector;
    /** 原子性的 用來記錄獲取 leader的次數 */
    public AtomicInteger leaderCount = new AtomicInteger(1);

    public CustomLeaderSelectorListenerAdapter(CuratorFramework client,String path,String name ){
        this.name = name;
        this.leaderSelector = new LeaderSelector(client, path, this);

        /**
         * 自動重新排隊
         * 該方法的呼叫可以確保此例項在釋放領導權後還可能獲得領導權
         */
        leaderSelector.autoRequeue();
    }

    /**
     * 啟動  呼叫leaderSelector.start()
     * @throws IOException
     */
    public void start() throws IOException {
        leaderSelector.start();
    }

    /**
     * 獲取領導權之後執行的業務邏輯,執行完自動放棄領導權
     * @param client
     * @throws Exception
     */
    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = 2;
        System.out.println(name + "成為當前leader" + " 共成為leader的次數:" + leaderCount.getAndIncrement() + "次");
        try{
            //模擬業務邏輯執行2秒
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        }catch ( InterruptedException e ){
            System.err.println(name + "已被中斷");
            Thread.currentThread().interrupt();
        }finally{
            System.out.println(name + "放棄領導權");
        }
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }
}
多個客戶端測試
public class TestLeaderElection {

  private static final String PATH = "/demo/leader";
  /** 3個客戶端 */
  private static final Integer CLIENT_COUNT = 3;

  public static void main(String[] args) throws Exception {
    ExecutorService service = Executors.newFixedThreadPool(CLIENT_COUNT);

    for (int i = 0; i < CLIENT_COUNT; i++) {
      final int index = i;
      service.submit(new Runnable() {
        @Override
        public void run() {
          try {
            new TestLeaderElection().schedule(index);
          } catch (Exception e) {
            e.printStackTrace();
          }
        }
      });
    }

    Thread.sleep(30 * 1000);
    service.shutdownNow();
  }

  private void schedule(final int thread) throws Exception {
    CuratorFramework client = this.getClient(thread);
    CustomLeaderSelectorListenerAdapter leaderSelectorListener =
        new CustomLeaderSelectorListenerAdapter(client, PATH, "Client #" + thread);
    leaderSelectorListener.start();
  }

  private CuratorFramework getClient(final int thread) {
    RetryPolicy rp = new ExponentialBackoffRetry(1000, 3);
    // Fluent風格建立
    CuratorFramework client = CuratorFrameworkFactory.builder()
        .connectString("192.168.58.42:2181")
        .sessionTimeoutMs(1000000)
        .connectionTimeoutMs(3000)
        .retryPolicy(rp)
        .build();
    client.start();
    System.out.println("Client [" + thread + "] Server connected...");
    return client;
  }

}

執行程式,輸出以下內容:
Client [0] Server connected…
Client [1] Server connected…
Client [2] Server connected…
Client #2成為當前leader 共成為leader的次數:1次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:1次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:1次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:2次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:2次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:2次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:3次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:3次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:3次
Client #1放棄領導權
Client #2成為當前leader 共成為leader的次數:4次
Client #2放棄領導權
Client #0成為當前leader 共成為leader的次數:4次
Client #0放棄領導權
Client #1成為當前leader 共成為leader的次數:4次
Client #1放棄領導權

上面只是簡單測試程式碼,並沒有關閉client等操作,每個例項在獲取領導權後,如果 takeLeadership(CuratorFramework client) 方法執行結束,將會釋放其領導權。而且獲取領導權 也是按照 Client #2, Client #0 ,Client #1 順序來的,正好驗證了它的公平性。

LeaderSelectorListener類繼承了ConnectionStateListener。一旦LeaderSelector啟動,它會向curator客戶端新增監聽器。 使用LeaderSelector必須時刻注意連線的變化。一旦出現連線問題如SUSPENDED,curator例項必須確保它可能不再是leader,直至它重新收到RECONNECTED。如果LOST出現,curator例項不再是leader並且其takeLeadership()應該直接退出。

推薦的做法是,如果發生SUSPENDED或者LOST連線問題,最好直接拋CancelLeadershipException,此時,leaderSelector例項會嘗試中斷並且取消正在執行takeLeadership()方法的執行緒。
建議擴充套件LeaderSelectorListenerAdapter, LeaderSelectorListenerAdapter中已經提供了推薦的處理方式 。

參考