1. 程式人生 > >Zookeeper機制詳解與實現

Zookeeper機制詳解與實現

為什麼要有Zookeeper?

      電視裡經常會有一些狗血的設定,隊長和副隊長一起出去執行任務,執行完任務後副隊長回來報到了,但是隊長可能因為天氣原因導致航班延期了,暫時回不來,這個時候副隊長左等右等還等不到隊長回來,而且副隊長擔心隊長如果出事了,下面的隊員沒有人約束,大家可能就會鬆懈下來,副隊長等了一個星期後,自己當隊長了。

      結果過了兩個星期後,隊長回來了,這個時候就產生了連個隊長,這個時候組員就麻煩了,他們不知道聽誰的,假設也有可能寫一份報告時,都給兩位隊長。

      這裡面如果還有一個支援保障小隊,發現隊長還沒回來啊,那麼這個支援保障小隊就去查隊長是不是遇到什麼事了,結果一查,隊長沒出事,平安大吉,那麼支援保障小隊就告訴副隊長,你不用擔心了,他沒事,我們會告知委員會,委員會會選舉出一個隊長,選到你了會正式通知你的。

 

什麼是Zookeeper?

      Zookeeper是一個Apache下的開源分散式協調框架,就是為分散式應用提供協調服務的。

      Zookeeper從設計模式的角度來理解,是一個基於觀察者模式設計的分散式服務管理框架,負責儲存和管理都關心的資料(狀態),然後接受觀察者的註冊,一旦這些資料的狀態發生了變化,Zookeeper就通知已經在Zookeeper上註冊的那些觀察者這做出相應的反應,從而實現叢集中類似Master/Slave管理模式。

 

 

特點

      1、Zookeeper是由一個Leader和多個Follower組成的叢集。

      2、Leader負責進行投票發起和發起決議,更新系統狀態

      3、Follower用於接收客戶端的請求並返回結果,在選舉Leader過程中參與投票。

      4、Follower負責讀請求,寫請求則要提交給Leader決定。

      5、叢集中只要有半數以上節點存活,Zookeeper叢集

就能正常服務。

      6、更新請求順序執行,來自同一個客戶端的更新請求時按照發送順序依次執行。

      7、資料更新的原子性,要麼更新資料成功,要麼失敗。

      8、實時性,在一定時間的範圍內客戶端可以讀到最新的資料

 

 

選舉機制(Leader)

      配置檔案中沒有指定Leader和Follower的話,Zookeeper會內部進行選舉。

      假設:

伺服器1、2、3。

           伺服器1啟動,但是隻有自己一個人,選舉狀態為Looking。

           伺服器2啟動,首先和已經啟動的伺服器1進行通訊,交換選舉資訊,但是伺服器1和2都沒有歷史資料,那麼他們都投票給了自己,但是id值大的伺服器2勝出。

           伺服器3啟動,和伺服器1、2進行通訊,發現已經選舉出了2作為Leader,那麼就成為Follower了,但是如果伺服器2發生網路問題了,一直沒有傳送出去互動資訊,到把訊息傳送出去的時候,發現別人的投票已經處於第二輪,而它發出去的是第一輪,這裡就涉及到邏輯時鐘(時間同步)了,所以則伺服器3成為Leader。

 

 

應用場景

      Zookeeper提供的服務資料釋出訂閱、負載均衡、命中服務、分散式協調/通知、分散式鎖、分散式佇列等功能。

      Zookeeper就是一個監控、通知、輔助(其他).

如:分散式鎖,其他使用者在操作,通知其他伺服器有人在操作,等使用者操作完了就告訴其他伺服器,他操作完了。

 

案例

 

配置檔案

      tickTime伺服器和伺服器之間,或伺服器和客戶端之間傳送心跳訊息的時間間隔,session的最小超時時間為2*tickTime

      initLimitLeader和Follower的初始連線時心跳數(時間=initLimit * tickTime);Follower在啟動的時候,會從Leader同步所有最新的資料,以確定自己對外服務的狀態;投票選舉新Leader的初始化時間;

      syncLimitLeader和Follower的之間的最大等待響應時間(syncLimit * tickTime),超過這個時間,Leader就可以認為這個Follower掛了。

      dataDir資料檔案存放路徑

      clientPort客戶端連線ZK服務端的埠

 

 

資料結構

      Zookeeper的資料結構是一個樹狀化的結構,每個節點稱為Znode,每個ZNode預設儲存1MB的資料可以通過路徑進行唯一標識。(/apps/app1)

 

 

節點型別

      短暫(ephemeral):客戶端和伺服器端斷開後,這種型別的節點就會刪除。

      短暫序號:建立這種節點的時候,會自動的加上一個遞增序號。

      持久(persistent):客戶端和伺服器端斷開連線,這種型別的節點不會被刪除。

      持久序號:建立這種節點的時候,會自動的加上一個遞增序號。

 

 

常用命令

應用程式命令

啟動伺服器端:/bin/zkServer.sh start

關閉伺服器端:/bin/zkServer.sh stop

啟動客戶端:bin/zkCli.sh

 

 

客戶端命令

退出客戶端:quit

檢視節點下的子節點:ls /

節點下的子節點變化監聽:ls /test watch

      檢視節點下的子節點詳細資訊:ls2 /

      建立普通節點:create /test “data”

      建立普通節點(序號):create -s /test “data”

      建立短暫節點:create -e /testE “data”

      建立短暫節點(序號):create -e -s /testES "data"

      修改節點資料值:set /test “new data”

      獲取節點值:get /test

      節點值變化監聽(監聽一次,註冊一次):get /test watch

      刪除節點:delete /test

      遞迴刪除節點:rmr /test

      檢視節點詳情:stat /test

           czxid引起這個znode建立的zxid,建立節點的事務zxid

ctimeznode被建立的毫秒數(從1970開始)

mzxidznode最後更新的zxid

mtimeznode最後修改的毫秒數(從1970開始)

pZxidznode最後更新的子節點zxid

cversionznode子節點的變化號,子節點修改次數

dataversionznode資料變化號

aclVersionznode訪問控制列表的變化號

ephemeralOwner如果是臨時節點,這個znode擁有者的sesion id,如果不是臨時節點就為0

dataLengthznod的資料長度

numChildrenznode子節點數量

 

Java – API(模擬Zookeeper監控)

ZkClient

public class ZkClient {

    //連線地址

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";

   

    //會話超時時間

    private int sessionTimeout = 2000;

   

    private ZooKeeper zooKeeper;

   

    /**

     * 初始化

     * @author AimSpeed

     * @Title init

     * @throws IOException void 

     * @date 2018128 下午12:29:13

     */

    public void init() throws IOException {

        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override

            public void process(WatchedEvent event) {

                System.out.println("伺服器下線 - Listener" + event.getType() + " --- " + event.getPath());

                //持續監聽伺服器

                try {

                    zooKeeper.getChildren("/servers", true);

                } catch (KeeperException | InterruptedException e) {

                    e.printStackTrace();

                }

            }

        });

    }

   

    /**

     * 監聽伺服器

     * @author AimSpeed

     * @Title watcherServer

     * @throws KeeperException

     * @throws InterruptedException void 

     * @date 2018128 下午12:30:54

     */

    public void watcherServer() throws KeeperException, InterruptedException {

        List<String> children = zooKeeper.getChildren("/servers", true);

        for (String string : children) {

            System.out.println("Registered Server:" + string);

        }

        Thread.sleep(Long.MAX_VALUE);

    }

   

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

        ZkClient zkClient = new ZkClient();

        zkClient.init();

        zkClient.watcherServer();

    }

}

 

ZkServer

public class ZkServer {

   

    //連線

    private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";

   

    //會話超時時間

    private  int sessionTimeout = 2000;

   

    //客戶端

    private ZooKeeper zooKeeper;

 

    /**

     * 初始化

     * @author AimSpeed

     * @Title init

     * @throws IOException void 

     * @date 2018128 下午12:21:22

     */

    public void init() throws IOException {

        zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {

            @Override

            public void process(WatchedEvent event) {

                System.out.println("Listene" + event.getType() + "---" + event.getPath());

            }

        });

    }

   

    /**

     * 註冊伺服器

     * @author AimSpeed

     * @throws InterruptedException

     * @throws KeeperException

     * @Title register  void 

     * @date 2018128 下午12:22:33

     */

    public void register(String num) throws KeeperException, InterruptedException {

        String create = zooKeeper.create("/servers/server", ("server" + num).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

        System.out.println(create);

    }

   

    public static void main(String[] args) throws KeeperException, InterruptedException, IOException {

        ZkServer zkServer = new ZkServer();

        zkServer.init();

        zkServer.register(args[0]);

        Thread.sleep(Long.MAX_VALUE);

    }

   

}

有興趣可以加QQ群討論:963022008