1. 程式人生 > >Zookeeper實現分布式集群監控

Zookeeper實現分布式集群監控

rpath 集群 用戶 mave 0ms unsafe 2個 his void

Zookeeepr實現分布式集群監控

Zookeeper中節點有兩種:臨時節點和永久節點

從類型上看節點又可以分為四種節點類型:PERSISTPERSIST_SEQUENTIAL,EPHEMERAL,EPHEMERAL_SEQUENTIAL

臨時節點有一個特點:當創建臨時節點的程序停掉之後,這個臨時節點就會消失。

監視器的特點:可以給zk中的節點註冊監視器,見識這個節點的變化情況。

監視器註冊一次,只能使用一次,多次使用就要多次註冊。

我們利用這個Zookeeper的臨時節點特性+監視器(Watch)來實現分布式集群監控

我們在/monitor(永久節點)下創建臨時節點。

實際上,zookeeper

sdk不是特別好用,很多邊界情況需要用戶自己處理。curator是對Zookeepersdk進行了封裝,所以說使用curator操作Zookeeper更加方便

maven官網找到curator的依賴

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework,支持zookeeper3.4.6-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.10
.0</version> </dependency>

我們通過curator來使用zookeeper,那麽我們就必須知道如何使用curator來連上zookeeper,下面代碼是官方文檔所給出

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

開始我們的代碼

TestCurator.java ,實現功能:創建

Zookeeper臨時節點

 1 package zkdemo;
 2 
 3 import java.net.InetAddress;
 4 
 5 import org.apache.curator.RetryPolicy;
 6 import org.apache.curator.framework.CuratorFramework;
 7 import org.apache.curator.framework.CuratorFrameworkFactory;
 8 import org.apache.curator.retry.ExponentialBackoffRetry;  
 9 
10 import org.apache.zookeeper.CreateMode;
11 import org.apache.zookeeper.ZooDefs.Ids;
12 import org.apache.zookeeper.ZooKeeper;
13 import org.junit.Test;
14 
15 
16 public class TestCurator {
17     
18     @Test
19     public void test1() throws Exception{
20         //1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
21         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
22         String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181";
23         int sessionTimeoutMs = 5000;//這個值只能在4000-40000ms之間表示鏈接斷開後多長時間臨時節點會小時
24         int connectionTimeoutMs = 3000;//獲取鏈接的超時時間
25         //創建一個zk連接
26         CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs
27                 ,connectionTimeoutMs,retryPolicy);
28         
29         client.start();
30         
31         InetAddress localHost = InetAddress.getLocalHost();
32         String ip = localHost.getHostAddress();
33         
34         client.create().creatingParentsIfNeeded()
35         .withMode(CreateMode.EPHEMERAL)//指定節點類型
36         .withACL(Ids.OPEN_ACL_UNSAFE)//指定設置節點權限信息
37         .forPath("/monitor/"+ip);//指定節點名稱
38         
39         while(true)
40         {
41             ;
42         }
43     }
44 }

ZkNodeWatcher.java 實現功能:註冊watcher,監視節點的變化

 1 package zk;
 2 
 3 import java.util.List;
 4 import java.util.ArrayList;
 5 
 6 import org.apache.curator.RetryPolicy;
 7 import org.apache.curator.framework.CuratorFramework;
 8 import org.apache.curator.framework.CuratorFrameworkFactory;
 9 import org.apache.curator.retry.ExponentialBackoffRetry;
10 import org.apache.zookeeper.WatchedEvent;
11 import org.apache.zookeeper.Watcher;
12 
13 
14 
15 public class ZkNodeWatcher implements Watcher{
16     CuratorFramework client;
17     List<String> childrenList = new ArrayList<String>();
18     List<String> newChildrenList = new ArrayList<String>();
19 
20     public  ZkNodeWatcher(){
21         //1000:表示curator鏈接zk的時候超時時間是多少 3:表示鏈接zk的最大重試次數
22         RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
23         String connectString = "djt1:2181,djt2:2181,djt3:2181,djt4:2181,djt5:2181";
24         int sessionTimeoutMs = 5000;//這個值只能在4000-40000ms之間表示鏈接斷開後多長時間臨時節點會消失
25         int connectionTimeoutMs = 3000;//獲取鏈接的超時時間
26         //創建一個zk連接
27         client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs
28                 ,connectionTimeoutMs,retryPolicy);
29         client.start();
30         
31         //監視monitor節點,獲取下面的所有子節點的變化情況
32         try {
33             childrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
34         } catch (Exception e) {
35             // TODO Auto-generated catch block
36             e.printStackTrace();
37         }
38     }
39         
40     
41 
42     /**
43      * 實現一個zk監視器,監視某個節點的變化情況
44      * 
45      * 這個監視程序需要一直運行
46      * @CPH
47      */
48     
49     public void process(WatchedEvent event) {
50         System.out.println("我被調用了");
51         try {
52             newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor");
53             for(String ip : childrenList)
54             {
55                 if(!newChildrenList.contains(ip)){
56                     System.out.println("節點消失了"+ip);
57                     //TODO 給管理員發送短信什麽的
58                     
59                 }
60             }
61             
62         for(String ip : newChildrenList){
63             if(!childrenList.contains(ip)){
64                 System.out.println("節點新增"+ip);
65             }
66         }
67         //重要
68         childrenList = newChildrenList;
69         } catch (Exception e) {
70             // TODO Auto-generated catch block
71             e.printStackTrace();
72         }
73         
74         
75         
76     }
77     
78     public void start(){
79         while (true){;}
80         }
81 
82     public static void main(String[] args) {
83         ZkNodeWatcher watcher = new ZkNodeWatcher();
84         watcher.start();
85     }
86 }

我們先開啟Zookeeper集群,啟動/bin/zkCli.sh,然後啟動這2個集群,我們可以看到由對應的ip目錄,這個ip不是虛擬機的ip,而是本地的ip,同時我們console下看到

技術分享

然後暫停TestCurator.java,不一會兒,就會看到

技術分享

這樣,通過Zookeeper實現分布式集群監控的功能就完成了!

Zookeeper實現分布式集群監控