1. 程式人生 > >Day16.高效能RPC設計 學習筆記4 - Zookeeper(轉載)

Day16.高效能RPC設計 學習筆記4 - Zookeeper(轉載)

Zookeeper

ZooKeeper 是一個為分散式應用所設計的分佈的、開源的協調服務。可以解決分散式應用中出現常規問題:

同步配置選舉分散式鎖服務命名分組,記住這些問題雖然zookeeper可以幫助使用者解決,並不意味著使用者不需要寫程式碼。使用者如果想使用zookeeper去解決以上出現的問題,需要使用者巧妙利用Zookeeper的節點特性進行程式設計繼而實現以上功能。

【什麼是分散式鎖?在同一時刻,多個程序只有一個程序,執行某個資源;藉由zookeeper來按序列進行建立會話調控;執行結束後自行結束通話】

Zookeeper特性

zookeeper服務在記憶體中維繫一個類似於目錄結構

的名稱空間。這些所謂的目錄或者是檔案在zookeeper中統稱為znode,所有的znode僅僅是依存關係上類似於作業系統目錄,因為所有的znode節點都可以儲存資料。

在這裡插入圖片描述

ZooKeeper的節點是通過像樹樣的結構來維護的,並且每個節點通過路徑來標識訪問。除此之外,每個節點還擁有些資訊包括:資料、資料長度、建立時間、修改時間等等。從這樣既含有資料,又作為路徑表標的節點的特點中,可以看出,ZooKeeper的節點既可以被看做是個檔案件,又可以被看做是個目錄,它同時具有兩者的特點。通常將zookeeper的節點稱為znode

四個節點|一個特性

四個節點持久節點臨時節點

持久序列臨時序列

持久(PERSISTENT):該節點不會因為建立該節點的會話消失而導致節點銷燬【存資料,且資料持久】

臨時:該節點會因為建立該節點的會話消失而導致節點銷燬【存資料,節點消失】

兩個 序列(PERSISTENT_SEQUENTIAL / EPHEMERAL_SEQUENTIAL):系統會自動為序列節點維繫一份建立的順序編號,用於表示某個節點下子節點的建立順序【有序,節點的資源優先權】

節點監測:zookeeper支援節點監測,當用戶關注的節點的資料或者該節點的子節點發生變更,zookeeper可以及時通知給訂閱該節點的客戶端節點。

ps.臨時節點|臨時序列節點下不能建立子節點【面試點】

一個特性:zookeeper存在節點的註冊和監聽,如果節點發生變更(資料、子節點)zookeeper都可以將變更資訊直接推送給所有訂閱該節點的服務。

zookeeper 叢集 (CP 一致性、分割槽容錯誤)

在這裡插入圖片描述

叢集版

[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg

tickTime=2000   
dataDir=/root/zkdata    
clientPort=2181    
initLimit=5
syncLimit=2
server.1=CentOSA:2887:3887    #主從選舉服務埠/資料同步埠
server.2=CentOSB:2887:3887
server.3=CentOSC:2887:3887
[[email protected] ~]# mkdir /root/zkdata
[[email protected] ~]# echo 1 > /root/zkdata/myid
[[email protected] ~]# echo 2 > /root/zkdata/myid
[[email protected] ~]# echo 3 > /root/zkdata/myid

[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
[[email protected] ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: [follower|leader] --你可以看到CentOSA/B/C中有一個節點是Leader,其他兩個是follower

單機版

[[email protected] ~]# tar -zxf zookeeper-3.4.6.tar.gz -C /usr/
[[email protected] ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
tickTime=2000    #監測會話超時2*tickTime
dataDir=/root/zkdata    #zookeeper資料目錄
clientPort=2181    #服務埠
[[email protected] ~]# mkdir /root/zkdata
[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh start zoo.cfg    #啟動
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh status zoo.cfg    #檢視狀態
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: standalone
[[email protected] ~]# jps
1742 QuorumPeerMain 

[[email protected] zookeeper-3.4.6]# ./bin/zkServer.sh stop zoo.cfg    #關閉
JMX enabled by default
Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

zookeeper節點操作命令

[[email protected] zookeeper-3.4.6]# ./bin/zkCli.sh -server CentOS:2181
Connecting to CentOS:2181
[zk: CentOS:2181(CONNECTED) 0] help
ZooKeeper -server host:port cmd args
    set path data [version]
    ls path [watch]
    ls2 path [watch]
    delete path [version]
    rmr path    #遞迴刪除
    get path [watch]
    create [-s] [-e] path data acl
    quit 
    close 
    connect host:port    #close/connect 關/開連線
[zk: CentOS:2181(CONNECTED) 5] ls /
[baizhi, zookeeper]
[zk: CentOS:2181(CONNECTED) 6] ls2 /      
[baizhi, zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x2
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 2
[zk: CentOS:2181(CONNECTED) 7] set /baizhi "nihao"
cZxid = 0x2
ctime = Wed Nov 14 18:50:18 CST 2018
mZxid = 0x3
mtime = Wed Nov 14 18:51:08 CST 2018
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
[zk: CentOS:2181(CONNECTED) 8] get /baizhi
"nihao"
cZxid = 0x2
ctime = Wed Nov 14 18:50:18 CST 2018
mZxid = 0x3
mtime = Wed Nov 14 18:51:08 CST 2018
pZxid = 0x2
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0
[zk: CentOS:2181(CONNECTED) 9] delete /baizhi
[zk: CentOS:2181(CONNECTED) 10] ls /
[zookeeper]
[zk: CentOS:2181(CONNECTED) 11] create -e /baizhi/enode ''
Node does not exist: /baizhi/enode
[zk: CentOS:2181(CONNECTED) 12] create /baizhi ''         
Created /baizhi
[zk: CentOS:2181(CONNECTED) 13] create -e /baizhi/enode ''
Created /baizhi/enode

Java 連線Zookeeper(zkclient|curator-framework)

老版使用zkclient、新版使用curator-framework連線zk的客戶端,方便選舉,分散式鎖

  1. 依賴
<!--訪問zookeeper基礎驅動jar包-->
 <dependency>
     <groupId>org.apache.curator</groupId>
     <artifactId>curator-framework</artifactId>
     <version>2.7.1</version>
</dependency>
<!--curator提供額外功能jars 分散式鎖、選舉等-->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.7.1</version>
</dependency>
<!--其他-->
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
</dependency>
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.7</version>
</dependency>

參考:http://curator.apache.org/getting-started.html

建立客戶端

private CuratorFramework client;
@Before
public void before(){
    String servers="CentOS:2181";
    RetryPolicy retryPolicy=new RetryNTimes(3,1000);
    //在10秒內,每間隔1秒嘗試一次
    new RetryUntilElapsed(10000,1000);
    //new ExponentialBackoffRetry(1000,3);嘗試三次,每次遞增1秒
    client = CuratorFrameworkFactory.newClient(servers, retryPolicy);
}
@After
public void after(){
    client.close();
}

常規操作

public class CuratorFrameworkDemo {
    private CuratorFramework curatorFramework;
    @Before
    public void before(){
        String servers="CentOS:2181";
     /**
      * RetryPolicy:重試策略
      * new RetryNTimes(3,1000) //重試多少次,間隔時長
      * new RetryUntilElapsed(10000,1000) //多長時間內,間隔時長重試
      * new ExponentialBackoffRetry(1000,10) 增量式,時間間隔重試
      */ 
        RetryPolicy  retryPolicy=new ExponentialBackoffRetry(1000,10);
                //new RetryUntilElapsed(10000,1000);
                //new RetryNTimes(3,1000);
      /**
      * 支援豐富的建立方式 還可以工廠流式建立CuratorFrameworkFactory.builder().xxx.build()
      */
        curatorFramework= CuratorFrameworkFactory.newClient(servers,
                2000
        ,5000,retryPolicy);//引數:訪問服務,會話超時時間,連線超時時間,重試策略
        curatorFramework.start();
    }
    // create  //test
    @Test
    public void testCreate() throws Exception {
        curatorFramework.create()
                .creatingParentsIfNeeded()
                .withMode(CreateMode.PERSISTENT)//建立的模式為持久節點、臨時節點EPHEMERAL(會話關閉,無效可見)
                .forPath("/test",SerializationUtils.serialize(new Date()));
    }
    @Test
    public void testSetData() throws Exception {
        curatorFramework.setData()
                .inBackground()
                .forPath("/test", SerializationUtils.serialize(new Integer(100)));
    }
    @Test
    public void testGetData() throws Exception {
        byte[] bytes = curatorFramework.getData()
                .forPath("/test");
        Object obj = SerializationUtils.deserialize(bytes);
        System.out.println(obj.getClass()+" "+obj);
    }

    @Test
    public void testExits() throws Exception {
        Stat stat = curatorFramework.checkExists().forPath("/baizhi");
        System.out.println(stat==null);
    }
    @Test
    public void testDelete() throws Exception {
        curatorFramework.delete()
                .deletingChildrenIfNeeded()//等同rmr,遞迴刪除
                .forPath("/test");
    }
    @Test
    public void testCreateChild() throws Exception {
        curatorFramework.create().creatingParentsIfNeeded().forPath("/test/192.168.0.1:20881","你好".getBytes());
        curatorFramework.create().creatingParentsIfNeeded().forPath("/test/192.168.0.2:20881","hello".getBytes());
    }
    @Test
    public void testGetChildren() throws Exception {
        List<String> nodes = curatorFramework.getChildren().forPath("/test");
        for (String node : nodes) {
            System.out.println(node);
        }
    }

/**
 * 節點檢測,檢測子節點變化【重要】
 */
    @Test
    public void testChildNodeChange() throws Exception {
        PathChildrenCache pcc=new PathChildrenCache(curatorFramework,"/test",true);
        pcc.start();
        //監測子節點變更
        pcc.getListenable().addListener(new PathChildrenCacheListener() {
            public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
 /**
 * .getType() //事件型別(新增CHILD_ADDED 或 刪除CHILD_REMOVED)
 */
                PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
                System.out.println("事件型別:"+type.name());

                ChildData data = pathChildrenCacheEvent.getData();
                System.out.println("資料資訊:"+data.getPath()+" "+new String(data.getData()));
            }
        });
        System.in.read();//保證main函式不退出,模擬掛起阻塞,不然關閉的化看不到臨時節點的那些狀態
        pcc.close();
    }
  /**
 * 集中配置【重要】
 */
    @Test
    public void testNodeDataChange() throws Exception {
        NodeCache nc=new NodeCache(curatorFramework,"/test");
        nc.start();
        nc.getListenable().addListener(new NodeCacheListener() {
            public void nodeChanged() throws Exception {
                if(curatorFramework.checkExists().forPath("test")!=null){
                    byte[] bytes = curatorFramework.getData().forPath("/test");
                    System.out.println("資料變化了:"+SerializationUtils.deserialize(bytes));
                }else{
                    System.out.println("節點被刪除了...");
                }

            }
        });
        System.in.read();
        nc.close();
    }
  `
  • zookeeper分散式鎖【面試題】
    A釋放鎖,B才可能拿到鎖。跨程序
    高併發,例如搶購場景:叢集服務修改變數,服務公用一個變數;為了保證只有一個服務處理變數,加上分散式鎖保證。
    Redis分散式鎖:利用setnx設定鎖,只有del才能解開鎖並再設定鎖。
  • Zookeeper相比Redis分散式鎖的優勢?【面試題】
    zookeeper可以保證臨界資源安全的同時,還能保證順序;其他redis和synchronized都不能保證順序。
    在這裡插入圖片描述
 /**
 * 分散式鎖【重要】
 */
    @Test
    public void testDistributeLock() throws Exception {
        System.out.println("服務b");
        InterProcessMutex lock = new InterProcessMutex(curatorFramework, "/com.test.service.IUserSevice#updateUser1");//鎖標記,動態獲取類名或方法
        if ( lock.acquire(1, TimeUnit.MINUTES) ) {//引數:最多等待時間,單位
            try {
                System.out.println("獲取鎖...");
                Thread.sleep(10000);
                System.out.println("更新臨界資源");
            } finally {
                System.out.println("釋放鎖...");
                lock.release();
            }
        }

    }
  • zookeeper選舉【瞭解】
    思路:會話斷開,建立節點;比較節點大小,最小的節點呼叫leadership方法,沒呼叫的節點成為follower。
    在這裡插入圖片描述

      @Test
      public void testElection() throws IOException {
          LeaderSelectorListener listener = new LeaderSelectorListenerAdapter(){
            /**
            * 方法中,接管老大的程式碼塊,若歸還程式碼塊就不是leader
            */
              public void takeLeadership(CuratorFramework client) throws Exception {
                  // this callback will get called when you are the leader
                  // do whatever leader work you need to and only exit
                  // this method when you want to relinquish leadership
                  System.out.println("我是LeaderB");
                  Thread.sleep(new Random().nextInt(1000));
                  System.out.println("我要退出了,請其他節點自己選舉新的主機");
              }
          };
    
          LeaderSelector selector = new LeaderSelector(curatorFramework, "/hadoop-ha", listener);
          selector.autoRequeue();  // not required, but this is behavior that you will probably expect
          selector.start();
          System.in.read();
      }
      @After
      public void after(){
          //關閉會話
          curatorFramework.close();
      }
    }