1. 程式人生 > >ZooKeeper學習第五期--ZooKeeper管理分散式環境中的資料

ZooKeeper學習第五期--ZooKeeper管理分散式環境中的資料

引言

本節本來是要介紹ZooKeeper的實現原理,但是ZooKeeper的原理比較複雜,它涉及到了paxos演算法、Zab協議、通訊協議等相關知 識,理解起來比較抽象所以還需要藉助一些應用場景,來幫我們理解。由於內容比較多,一口氣吃不成胖子,得慢慢來一步一個腳印,因此我對後期 ZooKeeper的學習規劃如下:

第一階段:

|---理解ZooKeeper的應用

    |---ZooKeeper是什麼

    |---ZooKeeper能幹什麼

    |---

ZooKeeper 怎麼使用

第二階段:

|---理解ZooKeeper原理準備

    |---瞭解paxos

    |---理解 zab原理

    |---理解選舉/同步流程

第三階段:

    |---深入ZooKeeper原理

        |---分析原始碼

        |---

嘗試開發分散式應用

由於內容較多,而且理解較為複雜,所以每個階段分開來學習和介紹,那麼本文主要介紹的的是第一階段,該階段一般 應該放在前面介紹,但感覺像一些ZooKeeper應用案例,如果沒有一定的ZooKeeper基礎,理解起來也比較抽象, 所以放在這介紹。大家可以對比一下前面的應用程式,來對比理解一下前面的那些應用到底用到ZooKeeper的那些功能,來進一步理解ZooKeeper 的實現理念,由於網上關於這方面的介紹比較多,如果一些可愛的博友對該內容已經比較瞭解,那麼您可以不用往下看了,繼續下一步學習。

一、ZooKeeper產生背景

1.1 分散式的發展

分散式這個概念我想大家並不陌生,但真正實戰開始還要從google說起,很早以前在實驗室中分散式被人提出,可是說是計算機內入行較為複雜學習較為困難的技術,並且市場也並不成熟,因此大規模的商業應用一直未成出現,但從Google 釋出了MapReduce 和DFS 以及Bigtable的論文之後,分散式在計算機界的格局就發生了變化,從架構上實現了分散式的難題,並且成熟的應用在了海量資料儲存和計算上,其叢集的規模也是當前世界上最為龐大的。

以DFS 為基礎的分散式計算框架和key、value 資料高效的解決運算的瓶頸, 而且開發人員不用再寫複雜的分散式程式,只要底層框架完備開發人員只要用較少的程式碼就可以完成分散式程式的開發,這使得開發人員只需要關注業務邏輯的即 可。Google 在業界技術上的領軍地位,讓業界望塵莫及的技術實力,IT 因此也是對Google 所退出的技術十分推崇。在最近幾年中分散式則是成為了海量資料儲存以及計算、高併發、高可靠性、高可用性的解決方案。

1.2 ZooKeeper的產生

眾所周知通常分散式架構都是中心化的設計,就是一個主控機連線多個處理節點。 問題可以從這裡考慮,當主控機失效時,整個系統則就無法訪問了,所以保證系統的高可用性是非常關鍵之處,也就是要保證主控機的高可用性。分散式鎖就是一個 解決該問題的較好方案,多主控機搶一把鎖。在這裡我們就涉及到了我們的重點Zookeeper。

ZooKeeper是什麼,chubby 我想大家都不會陌生的,chubby 是實現Google 的一個分散式鎖的實現,運用到了paxos 演算法解決的一個分散式事務管理的系統。Zookeeper 就是雅虎模仿強大的Google chubby 實現的一套分散式鎖管理系統。同時,Zookeeper 分散式服務框架是Apache Hadoop的一個子專案,它是一個針對大型分散式系統的可靠協調系統,它主要是用來解決分散式應用中經常遇到的一些資料管理問題,可以高可靠的維護元資料。提供的功能包括:配置維護、名字服務、分散式同步、組服務等。ZooKeeper的設計目標就是封裝好複雜易出錯的關鍵服務,將簡單易用的介面和效能高效、功能穩定的系統提供給使用者。

1.3 ZooKeeper的使用

Zookeeper 作為一個分散式的服務框架,主要用來解決分散式叢集中應用系統的一致性問題,它能提供基於類似於檔案系統的目錄節點樹方式的資料儲存,但是 Zookeeper 並不是用來專門儲存資料的,它的作用主要是用來維護和監控你儲存的資料的狀態變化。通過監控這些資料狀態的變化,從而可以達到基於資料的叢集管理,後面將 會詳細介紹 Zookeeper 能夠解決的一些典型問題。

注意一下這裡的"資料"是有限制的:

(1) 從資料大小來看:我們知道ZooKeeper的資料儲存在一個叫ReplicatedDataBase 的 資料庫中,該資料是一個記憶體資料庫,既然是在記憶體當中,我就應該知道該資料量就應該不會太大,這一點上就與hadoop的HDFS有了很大的區 別,HDFS的資料主要儲存在磁碟上,因此資料儲存主要是HDFS的事,而ZooKeeper主要是協調功能,並不是用來儲存資料的。

(2) 從資料型別來看:正如前面所說的,ZooKeeper的資料在記憶體中,由於記憶體空間的限制,那麼我們就不能在上面隨心所欲的儲存資料,所以ZooKeeper儲存的資料都是我們所關心的資料而且資料量還不能太大,而且還會根據我們要以實現的功能來選擇相應的資料。簡單來說,幹什麼事存什麼資料,ZooKeeper所實現的一切功能,都是由ZK節點的性質和該節點所關聯的資料實現的,至於關聯什麼資料那就要看你幹什麼事了。

例如:

  ① 叢集管理:利用臨時節點特性,節點關聯的是機器的主機名、IP地址等相關資訊,叢集單點故障也屬於該範疇。

  ② 統一命名:主要利用節點的唯一性和目錄節點樹結構。

  ③ 配置管理:節點關聯的是配置資訊。

  ④ 分散式鎖:節點關聯的是要競爭的資源。

二、ZooKeeper應用場景

ZooKeeper是一個高可用的分散式資料管理與系統協調框架。基於對Paxos演算法的實現,使該框架保證了分散式環境中資料的強一致性,也正是 基於這樣的特性,使得zookeeper能夠應用於很多場景。需要注意的是,ZK並不是生來就為這些場景設計,都是後來眾多開發者根據框架的特性,摸索出 來的典型使用方法。因此,我們也可以根據自己的需要來設計相應的場景實現。正如前文所提到的,ZooKeeper 實現的任何功能都離不開ZooKeeper的資料結構,任何功能的實現都是利用"Znode結構特性+節點關聯的資料"來實現的,好吧那麼我們就看一下ZooKeeper資料結構有哪些特性。ZooKeeper資料結構如下圖所示:

圖2.1 ZooKeeper資料結構

Zookeeper 這種資料結構有如下這些特點:

 每個子目錄項如 NameService 都被稱作為 znode,這個 znode 是被它所在的路徑唯一標識,如 Server1 這個 znode 的標識為 /NameService/Server1;

 znode 可以有子節點目錄,並且每個 znode 可以儲存資料,注意 EPHEMERAL 型別的目錄節點不能有子節點目錄;

 znode 是有版本的,每個 znode 中儲存的資料可以有多個版本,也就是一個訪問路徑中可以儲存多份資料;

 znode 可以是臨時節點,一旦建立這個 znode 的客戶端與伺服器失去聯絡,這個 znode 也將自動刪除,Zookeeper 的客戶端和伺服器通訊採用長連線方式,每個客戶端和伺服器通過心跳來保持連線,這個連線狀態稱為 session,如果 znode 是臨時節點,這個 session 失效,znode 也就刪除了;

⑤ znode 的目錄名可以自動編號,如 App1 已經存在,再建立的話,將會自動命名為 App2;

⑥ znode 可以被監控,包括這個目錄節點中儲存的資料的修改,子節點目錄的變化等,一旦變化可以通知設定監控的客戶端,這個是 Zookeeper 的核心特性,Zookeeper 的很多功能都是基於這個特性實現的。

2.1資料釋出與訂閱

(1) 典型場景描述

釋出與訂閱即所謂的配置管理,顧名思義就是將資料釋出到ZK節點上,供訂閱者動態獲取資料,實現配置資訊的集中式管理和動態更新。例如全域性的配置資訊地址列表等就非常適合使用。集中式的配置管理在應用叢集中是非常常見的,一般商業公司內部都會實現一套集中的配置管理中心,應對不同的應用叢集對於共享各自配置的需求,並且在配置變更時能夠通知到叢集中的每一個機器。

(2) 應用

 索引資訊和叢集中機器節點狀態存放在ZK的一些指定節點,供各個客戶端訂閱使用。

② 系統日誌(經過處理後的)儲存,這些日誌通常2-3天后被清除。

 應用中用到的一些配置資訊集中管理,在應用啟動的時候主動來獲取一次,並且在節點上註冊一個Watcher,以後每次配置有更新,實時通知到應用,獲取最新配置資訊。

 業務邏輯中需要用到的一些全域性變數,比如一些訊息中介軟體的訊息佇列通常有個offset,這個offset存放在zk上,這樣叢集中每個傳送者都能知道當前的傳送進度。

 系統中有些資訊需要動態獲取,並且還會存在人工手動去修改這個資訊。以前通常是暴露出介面,例如JMX介面,有了ZK後,只要將這些資訊存放到ZK節點上即可。

(3) 應用舉例

例如:同一個應用系統需要多臺 PC Server 執行,但是它們執行的應用系統的某些配置項是相同的,如果要修改這些相同的配置項,那麼就必須同時修改每臺執行這個應用系統的 PC Server,這樣非常麻煩而且容易出錯。將配置資訊儲存在 Zookeeper 的某個目錄節點中,然後將所有需要修改的應用機器監控配置資訊的狀態,一旦配置資訊發生變化,每臺應用機器就會收到 Zookeeper 的通知,然後從 Zookeeper 獲取新的配置資訊應用到系統中。ZooKeeper配置管理服務如下圖所示:

圖2.2 配置管理結構圖

Zookeeper很容易實現這種集中式的配置管理,比如將所需要的配置資訊放到/Configuration 節點上,叢集中所有機器一啟動就會通過Client對/Configuration這個節點進行監控【zk.exist("/Configuration″,true)】,並且實現Watcher回撥方法process(),那麼在zookeeper上/Configuration節點下資料發生變化的時候,每個機器都會收到通知,Watcher回撥方法將會被執行,那麼應用再取下資料即可【zk.getData("/Configuration″,false,null)】。

2.2統一命名服務(Name Service)

(1) 場景描述

分散式應用中,通常需要有一套完整的命名規則,既能夠產生唯一的名稱又便於人識別和記住,通常情況下用樹形的名稱結構是一個理想的選擇,樹形的名稱 結構是一個有層次的目錄結構,既對人友好又不會重複。說到這裡你可能想到了 JNDI,沒錯 Zookeeper 的 Name Service 與 JNDI 能夠完成的功能是差不多的,它們都是將有層次的目錄結構關聯到一定資源上,但是Zookeeper的Name Service 更加是廣泛意義上的關聯,也許你並不需要將名稱關聯到特定資源上,你可能只需要一個不會重複名稱,就像資料庫中產生一個唯一的數字主鍵一樣。

(2) 應用

在分散式系統中,通過使用命名服務,客戶端應用能夠根據指定的名字來獲取資源服務的地址,提供者等資訊。被命名的實體通常可以是叢集中的機器,提供的服務地址,程序物件等等,這些我們都可以統稱他們為名字(Name)。其中較為常見的就是一些分散式服務框架中的服務地址列表。 通過呼叫ZK提供的建立節點的API,能夠很容易建立一個全域性唯一的path,這個path就可以作為一個名稱。Name Service 已經是Zookeeper 內建的功能,你只要呼叫 Zookeeper 的 API 就能實現。如呼叫 create 介面就可以很容易建立一個目錄節點。

(3) 應用舉例

阿里開源的分散式服務框架Dubbo中使用ZooKeeper來作為其命名服務,維護全域性的服務地址列表。在Dubbo實現中: 服務提供者在啟動的時候,向ZK上的指定節點/dubbo/${serviceName}/providers目錄下寫入自己的URL地址,這個操作就完成了服務的釋出。 服務消費者啟 動的時候,訂閱/dubbo/${serviceName}/providers目錄下的提供者URL地址, 並向/dubbo/${serviceName} /consumers目錄下寫入自己的URL地址。 注意,所有向ZK上註冊的地址都是臨時節點,這樣就能夠保證服務提供者和消費者能夠自動感應資源的變化。 另外,Dubbo還有針對服務粒度的監控,方法是訂閱/dubbo/${serviceName}目錄下所有提供者和消費者的資訊。

2.3分佈通知/協調(Distribution of notification/coordination)

(1) 典型場景描述

ZooKeeper中特有watcher註冊與非同步通知機制,能夠很好的實現分散式環境下不同系統之間的通知與協調,實現對資料變更的實時處理。使用方法通常是不同系統都對ZK上同一個znode進行註冊,監聽znode的變化(包括znode本身內容及子節點的),其中一個系統update了znode,那麼另一個系統能夠收到通知,並作出相應處理。

(2) 應用

 另一種心跳檢測機制:檢測系統和被檢測系統之間並不直接關聯起來,而是通過ZK上某個節點關聯,大大減少系統耦合。

② 另一種系統排程模式:某系統由控制檯和推送系統兩部分組成,控制檯的職責是控制推送系統進行相應的推送工作。管理人員在控制檯作的一些操作,實際上是修改了ZK上某些節點的狀態,而ZK就把這些變化通知給他們註冊Watcher的客戶端,即推送系統,於是,作出相應的推送任務。

③ 另一種工作彙報模式:一些類似於任務分發系統,子任務啟動後,到ZK來註冊一個臨時節點,並且定時將自己的進度進行彙報(將進度寫回這個臨時節點),這樣任務管理者就能夠實時知道任務進度。

總之,使用zookeeper來進行分散式通知和協調能夠大大降低系統之間的耦合。

2.4分散式鎖(Distribute Lock)

(1) 場景描述

分散式鎖,這個主要得益於ZooKeeper為我們保證了資料的強一致性,即使用者只要完全相信每時每刻,zk叢集中任意節點(一個zk server)上的相同znode的資料是一定是相同的。鎖服務可以分為兩類,一個是保持獨佔,另一個是控制時序。

保持獨佔,就是所有試圖來獲取這個鎖的客戶端,最終只有一個可以成功獲得這把 鎖。通常的做法是把ZK上的一個znode看作是一把鎖,通過create znode的方式來實現。所有客戶端都去建立 /distribute_lock 節點,最終成功建立的那個客戶端也即擁有了這把鎖。

控制時序,就是所有試圖來獲取這個鎖的客戶端,最終都是會被安排執行,只是有 個全域性時序了。做法和上面基本類似,只是這裡 /distribute_lock 已經預先存在,客戶端在它下面建立臨時有序節點。Zk的父節點(/distribute_lock)維持一份sequence,保證子節點建立的時序性, 從而也形成了每個客戶端的全域性時序。

(2) 應用

共享鎖在同一個程序中很容易實現,但是在跨程序或者在不同 Server 之間就不好實現了。Zookeeper 卻很容易實現這個功能,實現方式也是需要獲得鎖的 Server 建立一個 EPHEMERAL_SEQUENTIAL 目錄節點,然後呼叫 getChildren方法獲取當前的目錄節點列表中最小的目錄節點是不是就是自己建立的目錄節點,如果正是自己建立的,那麼它就獲得了這個鎖,如果不是那麼它就呼叫 exists(String path, boolean watch) 方法並監控 Zookeeper 上目錄節點列表的變化,一直到自己建立的節點是列表中最小編號的目錄節點,從而獲得鎖,釋放鎖很簡單,只要刪除前面它自己所建立的目錄節點就行了。

圖 2.3 ZooKeeper實現Locks的流程圖

程式碼清單1 TestMainClient 程式碼

 

複製程式碼

package org.zk.leader.election;
     
    import org.apache.log4j.xml.DOMConfigurator;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
     
    import java.io.IOException;
     
    /**
     * TestMainClient
     * <p/>
     * Author By: sunddenly工作室
     * Created Date: 2014-11-13
     */
    public class TestMainClient implements Watcher {
        protected static ZooKeeper zk = null;
        protected static Integer mutex;
        int sessionTimeout = 10000;
        protected String root;
        public TestMainClient(String connectString) {
            if(zk == null){
                try {
     
                    String configFile = this.getClass().getResource("/").getPath()+"org/zk/leader/election/log4j.xml";
                    DOMConfigurator.configure(configFile);
                    System.out.println("建立一個新的連線:");
                    zk = new ZooKeeper(connectString, sessionTimeout, this);
                    mutex = new Integer(-1);
                } catch (IOException e) {
                    zk = null;
                }
            }
        }
       synchronized public void process(WatchedEvent event) {
            synchronized (mutex) {
                mutex.notify();
            }
        }
    }

複製程式碼

 

清單 2 Locks 程式碼

 

 

複製程式碼

package org.zk.locks;
     
    import org.apache.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
    import org.zk.leader.election.TestMainClient;
     
    import java.util.Arrays;
    import java.util.List;
     
    /**
     * locks
     * <p/>
     * Author By: sunddenly工作室
     * Created Date: 2014-11-13 16:49:40
     */
    public class Locks extends TestMainClient {
        public static final Logger logger = Logger.getLogger(Locks.class);
        String myZnode;
     
        public Locks(String connectString, String root) {
            super(connectString);
            this.root = root;
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    logger.error(e);
                } catch (InterruptedException e) {
                    logger.error(e);
                }
            }
        }
        void getLock() throws KeeperException, InterruptedException{
            List<String> list = zk.getChildren(root, false);
            String[] nodes = list.toArray(new String[list.size()]);
            Arrays.sort(nodes);
            if(myZnode.equals(root+"/"+nodes[0])){
                doAction();
            }
            else{
                waitForLock(nodes[0]);
            }
        }
        void check() throws InterruptedException, KeeperException {
            myZnode = zk.create(root + "/lock_" , new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            getLock();
        }
        void waitForLock(String lower) throws InterruptedException, KeeperException {
            Stat stat = zk.exists(root + "/" + lower,true);
            if(stat != null){
                mutex.wait();
            }
            else{
                getLock();
            }
        }
        @Override
        public void process(WatchedEvent event) {
            if(event.getType() == Event.EventType.NodeDeleted){
                System.out.println("得到通知");
                super.process(event);
                doAction();
            }
        }
        /**
         * 執行其他任務
         */
        private void doAction(){
            System.out.println("同步佇列已經得到同步,可以開始執行後面的任務了");
        }
     
        public static void main(String[] args) {
            String connectString = "localhost:2181";
     
            Locks lk = new Locks(connectString, "/locks");
            try {
                lk.check();
            } catch (InterruptedException e) {
                logger.error(e);
            } catch (KeeperException e) {
                logger.error(e);
            }
        }
    }

複製程式碼

 

2.5 叢集管理(Cluster Management)

(1) 典型場景描述

叢集機器監控

這通常用於那種對叢集中機器狀態,機器線上率有較高要求的場景,能夠快速對叢集中機器變化作出響應。這樣的場景中,往往有一個監控系統,實時檢測集 群機器是否存活。過去的做法通常是:監控系統通過某種手段(比如ping)定時檢測每個機器,或者每個機器自己定時向監控系統彙報"我還活著"。 這種做法可行,但是存在兩個比較明顯的問題:

① 叢集中機器有變動的時候,牽連修改的東西比較多。

 有一定的延時。

利用ZooKeeper中兩個特性,就可以實施另一種叢集機器存活性監控系統:

① 客戶端在節點 x 上註冊一個Watcher,那麼如果 x 的子節點變化了,會通知該客戶端。

② 建立EPHEMERAL型別的節點,一旦客戶端和伺服器的會話結束或過期,那麼該節點就會消失。

Master選舉:

Master選舉則是zookeeper中最為經典的使用場景了,在分散式環境中,相同的業務應用分佈在不同的機器上,有些業務邏輯,例如一些耗時的計算,網路I/O處,往往只需要讓整個叢集中的某一臺機器進行執行,其餘機器可以共享這個結果,這樣可以大大減少重複勞動,提高效能,於是這個master選舉便是這種場景下的碰到的主要問題。

利用ZooKeeper中兩個特性,就可以實施另一種叢集中Master選舉:

① 利用ZooKeeper的強一致性,能夠保證在分散式高併發情況下節點建立的全域性唯一性,即:同時有多個客戶端請求建立 /Master 節點,最終一定只有一個客戶端請求能夠建立成功。利用這個特性,就能很輕易的在分散式環境中進行叢集選舉了。

另外,這種場景演化一下,就是動態Master選舉。這就要用到 EPHEMERAL_SEQUENTIAL型別節點的特性了,這樣每個節點會自動被編號。允許所有請求都能夠建立成功,但是得有個建立順序,每次選取序列號最小的那個機器作為Master 。

(2) 應用

在搜尋系統中,如果叢集中每個機器都生成一份全量索引,不僅耗時,而且不能保證彼此間索引資料一致。因此讓叢集中的Master來迚行全量索引的生 成,然後同步到叢集中其它機器。另外,Master選丼的容災措施是,可以隨時迚行手動挃定master,就是說應用在zk在無法獲取master資訊 時,可以通過比如http方式,向一個地方獲取master。  在Hbase中,也是使用ZooKeeper來實現動態HMaster的選舉。在Hbase實現中,會在ZK上儲存一些ROOT表的地址和HMaster 的地址,HRegionServer也會把自己以臨時節點(Ephemeral)的方式註冊到Zookeeper中,使得HMaster可以隨時感知到各 個HRegionServer的存活狀態,同時,一旦HMaster出現問題,會重新選丼出一個HMaster來執行,從而避免了HMaster的單點問 題的存活狀態,同時,一旦HMaster出現問題,會重新選丼出一個HMaster來執行,從而避免了HMaster的單點問題。

(3) 應用舉例

叢集監控:

應用叢集中,我們常常需要讓每一個機器知道叢集中或依賴的其他某一個叢集中哪些機器是活著的,並且在叢集機器因為宕機,網路斷鏈等原因能夠不在人工 介入的情況下迅速通知到每一個機器,Zookeeper 能夠很容易的實現叢集管理的功能,如有多臺 Server 組成一個服務叢集,那麼必須要一個"總管"知道當前叢集中每臺機器的服務狀態,一旦有機器不能提供服務,叢集中其它叢集必須知道,從而做出調整重新分配服 務策略。同樣當增加叢集的服務能力時,就會增加一臺或多臺 Server,同樣也必須讓"總管"知道,這就是ZooKeeper的叢集監控功能。

圖2.4 叢集管理結構圖

比如我在zookeeper伺服器端有一個znode叫/Configuration,那麼叢集中每一個機器啟動的時候都去這個節點下建立一個EPHEMERAL型別的節點,比如server1建立/Configuration /Server1,server2建立/Configuration /Server1,然後Server1和Server2都watch /Configuration 這個父節點,那麼也就是這個父節點下資料或者子節點變化都會通知對該節點進行watch的客戶端。因為EPHEMERAL型別節點有一個很重要的特性,就 是客戶端和伺服器端連線斷掉或者session過期就會使節點消失,那麼在某一個機器掛掉或者斷鏈的時候,其對應的節點就會消 失,然後叢集中所有對/Configuration進行watch的客戶端都會收到通知,然後取得最新列表即可。

Master選舉:

Zookeeper 不僅能夠維護當前的叢集中機器的服務狀態,而且能夠選出一個"總管",讓這個總管來管理叢集,這就是 Zookeeper 的另一個功能 Leader Election。Zookeeper 如何實現 Leader Election,也就是選出一個 Master Server。和前面的一樣每臺 Server 建立一個 EPHEMERAL 目錄節點,不同的是它還是一個 SEQUENTIAL 目錄節點,所以它是個 EPHEMERAL_SEQUENTIAL 目錄節點。之所以它是 EPHEMERAL_SEQUENTIAL 目錄節點,是因為我們可以給每臺 Server 編號,我們可以選擇當前是最小編號的 Server 為 Master,假如這個最小編號的 Server 死去,由於是 EPHEMERAL 節點,死去的 Server 對應的節點也被刪除,所以當前的節點列表中又出現一個最小編號的節點,我們就選擇這個節點為當前 Master。這樣就實現了動態選擇 Master,避免了傳統意義上單 Master 容易出現單點故障的問題

清單 3 Leader Election程式碼

複製程式碼

package org.zk.leader.election;
     
    import org.apache.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
     
    import java.net.InetAddress;
    import java.net.UnknownHostException;
     
    /**
     * LeaderElection
     * <p/>
     * Author By: sunddenly工作室
     * Created Date: 2014-11-13
     */
    public class LeaderElection extends TestMainClient {
        public static final Logger logger = Logger.getLogger(LeaderElection.class);
     
        public LeaderElection(String connectString, String root) {
            super(connectString);
            this.root = root;
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    logger.error(e);
                } catch (InterruptedException e) {
                    logger.error(e);
                }
            }
        }
     
        void findLeader() throws InterruptedException, UnknownHostException, KeeperException {
            byte[] leader = null;
            try {
                leader = zk.getData(root + "/leader", true, null);
            } catch (KeeperException e) {
                if (e instanceof KeeperException.NoNodeException) {
                    logger.error(e);
                } else {
                    throw e;
                }
            }
            if (leader != null) {
                following();
            } else {
                String newLeader = null;
                byte[] localhost = InetAddress.getLocalHost().getAddress();
                try {
                    newLeader = zk.create(root + "/leader", localhost, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
                } catch (KeeperException e) {
                    if (e instanceof KeeperException.NodeExistsException) {
                        logger.error(e);
                    } else {
                        throw e;
                    }
                }
                if (newLeader != null) {
                    leading();
                } else {
                    mutex.wait();
                }
            }
        }
     
        @Override
        public void process(WatchedEvent event) {
            if (event.getPath().equals(root + "/leader") && event.getType() == Event.EventType.NodeCreated) {
                System.out.println("得到通知");
                super.process(event);
                following();
            }
        }
     
        void leading() {
            System.out.println("成為領導者");
        }
     
        void following() {
            System.out.println("成為組成員");
        }
     
        public static void main(String[] args) {
            String connectString = "localhost:2181";
     
            LeaderElection le = new LeaderElection(connectString, "/GroupMembers");
            try {
                le.findLeader();
            } catch (Exception e) {
                logger.error(e);
            }
        }
    }

複製程式碼

2.6 佇列管理

Zookeeper 可以處理兩種型別的佇列:

 當一個佇列的成員都聚齊時,這個佇列才可用,否則一直等待所有成員到達,這種是同步佇列。

② 佇列按照 FIFO 方式進行入隊和出隊操作,例如實現生產者和消費者模型。

(1) 同步佇列用 Zookeeper 實現的實現思路如下:

建立一個父目錄 /synchronizing,每個成員都監控標誌(Set Watch)位目錄 /synchronizing/start 是否存在,然後每個成員都加入這個佇列,加入佇列的方式就是建立 /synchronizing/member_i 的臨時目錄節點,然後每個成員獲取 / synchronizing 目錄的所有目錄節點,也就是 member_i。判斷 i 的值是否已經是成員的個數,如果小於成員個數等待 /synchronizing/start 的出現,如果已經相等就建立 /synchronizing/start。

用下面的流程圖更容易理解:

圖 2.5 同步佇列流程圖

 

清單 4 Synchronizing 程式碼

複製程式碼

package org.zk.queue;
     
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.List;
     
    import org.apache.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.data.Stat;
    import org.zk.leader.election.TestMainClient;
     
    /**
     * Synchronizing
     * <p/>
     * Author By: sunddenly工作室
     * Created Date: 2014-11-13
     */
    public class Synchronizing extends TestMainClient {
        int size;
        String name;
        public static final Logger logger = Logger.getLogger(Synchronizing.class);
     
        /**
         * 建構函式
         *
         * @param connectString 伺服器連線
         * @param root 根目錄
         * @param size 佇列大小
         */
        Synchronizing(String connectString, String root, int size) {
            super(connectString);
            this.root = root;
            this.size = size;
     
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    logger.error(e);
                } catch (InterruptedException e) {
                    logger.error(e);
                }
            }
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                logger.error(e);
            }
     
        }
     
        /**
         * 加入佇列
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
     
        void addQueue() throws KeeperException, InterruptedException{
            zk.exists(root + "/start",true);
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
            synchronized (mutex) {
                List<String> list = zk.getChildren(root, false);
                if (list.size() < size) {
                    mutex.wait();
                } else {
                    zk.create(root + "/start", new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                }
            }
        }
     
        @Override
        public void process(WatchedEvent event) {
            if(event.getPath().equals(root + "/start") && event.getType() == Event.EventType.NodeCreated){
                System.out.println("得到通知");
                super.process(event);
                doAction();
            }
        }
     
        /**
         * 執行其他任務
         */
        private void doAction(){
            System.out.println("同步佇列已經得到同步,可以開始執行後面的任務了");
        }
     
        public static void main(String args[]) {
            //啟動Server
            String connectString = "localhost:2181";
            int size = 1;
            Synchronizing b = new Synchronizing(connectString, "/synchronizing", size);
            try{
                b.addQueue();
            } catch (KeeperException e){
                logger.error(e);
            } catch (InterruptedException e){
                logger.error(e);
            }
        }
    }

複製程式碼

(2) FIFO 佇列用 Zookeeper 實現思路如下:

實現的思路也非常簡單,就是在特定的目錄下建立 SEQUENTIAL 型別的子目錄 /queue_i,這樣就能保證所有成員加入佇列時都是有編號的,出佇列時通過 getChildren( ) 方法可以返回當前所有的佇列中的元素,然後消費其中最小的一個,這樣就能保證 FIFO。

下面是生產者和消費者這種佇列形式的示例程式碼

清單 5 FIFOQueue 程式碼

複製程式碼

import org.apache.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.ZooDefs;
    import org.apache.zookeeper.data.Stat;
     
    import java.nio.ByteBuffer;
    import java.util.List;
     
    /**
     * FIFOQueue
     * <p/>
     * Author By: sunddenly工作室
     * Created Date: 2014-11-13
     */
    public class FIFOQueue extends TestMainClient{
        public static final Logger logger = Logger.getLogger(FIFOQueue.class);
     
        /**
         * Constructor
         *
         * @param connectString
         * @param root
         */
        FIFOQueue(String connectString, String root) {
            super(connectString);
            this.root = root;
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    logger.error(e);
                } catch (InterruptedException e) {
                    logger.error(e);
                }
            }
        }
        /**
         * 生產者
         *
         * @param i
         * @return
         */
     
        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);
            return true;
        }
     
     
        /**
         * 消費者
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            if(tempValue < min) min = tempValue;
                        }
                        byte[] b = zk.getData(root + "/element" + min,false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();
                        return retvalue;
                    }
                }
            }
        }
     
        @Override
        public void process(WatchedEvent event) {
            super.process(event);
        }
     
        public static void main(String args[]) {
            //啟動Server
            TestMainServer.start();
            String connectString = "localhost:"+TestMainServer.CLIENT_PORT;
     
            FIFOQueue q = new FIFOQueue(connectString, "/app1");
            int i;
            Integer max = new Integer(5);
     
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){
                    logger.error(e);
                } catch (InterruptedException e){
                    logger.error(e);
                }
     
            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                    logger.error(e);
                } catch (InterruptedException e){
                    logger.error(e);
                }
            }
     
        }
    }

複製程式碼

三、ZooKeeper實際應用

假設我們的叢集有:

(1) 20個搜尋引擎的伺服器:每個負責總索引中的一部分的搜尋任務。

 搜尋引擎的伺服器中的15個伺服器現在提供搜尋服務。

 5個伺服器正在生成索引。

這20個搜尋引擎的伺服器,經常要讓正在提供搜尋服務的伺服器停止提供服務開始生成索引,或生成索引的伺服器已經把索引生成完成可以搜尋提供服務了。

(2) 一個總伺服器:負責向這20個搜尋引擎的伺服器發出搜尋請求併合並結果集。

(3) 一個備用的總伺服器:負責當總伺服器宕機時替換總伺服器。

(4) 一個web的cgi:向總伺服器發出搜尋請求。

使用Zookeeper可以保證:

(1) 總伺服器:自動感知有多少提供搜尋引擎的伺服器,並向這些伺服器發出搜尋請求。

(2) 備用的總伺服器:宕機時自動啟用備用的總伺服器。

(3) web的cgi:能夠自動地獲知總伺服器的網路地址變化。

(4) 實現如下:

① 提供搜尋引擎的伺服器都在Zookeeper中建立znode,zk.create("/search/nodes/node1""hostname".getBytes()Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

② 總伺服器可以從Zookeeper中獲取一個znode的子節點的列表,zk.getChildren("/search/nodes", true);

 總伺服器遍歷這些子節點,並獲取子節點的資料生成提供搜尋引擎的伺服器列表;

 當總伺服器接收到子節點改變的事件資訊,重新返回第二步;

 總伺服器在Zookeeper中建立節點,zk.create("/search/master", "hostname".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateFlags.EPHEMERAL);

⑥ 備用的總伺服器監控Zookeeper中的"/search/master"節點。當這個znode的節點資料改變時,把自己啟動變成總伺服器,並把自己的網路地址資料放進這個節點。

 web的cgi從Zookeeper中"/search/master"節點獲取總伺服器的網路地址資料,並向其傳送搜尋請求。

 web的cgi監控Zookeeper中的"/search/master"節點,當這個znode的節點資料改變時,從這個節點獲取總伺服器的網路地址資料,並改變當前的總伺服器的網路地址。