1. 程式人生 > >springboot2整合zookeeper集成curator

springboot2整合zookeeper集成curator

初始化 臨時 except null 取消 turn ted with lose

步驟:

1- pom.xml

<dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

2- yml配置:

zk:
  url: 127.0.0.1:2181
  localPath: /newlock
  timeout: 3000

3- 配置類

package com.test.domi.config;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ZookeeperConf { @Value("${zk.url}") private String zkUrl; @Bean public CuratorFramework getCuratorFramework(){ RetryPolicy retryPolicy
= new ExponentialBackoffRetry(1000,3); CuratorFramework client = CuratorFrameworkFactory.newClient(zkUrl,retryPolicy); client.start(); return client; } }

4- 使用

package com.test.domi.common.utils.lock;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.zookeeper.CreateMode;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Component("zklock")
public class ZKlock implements Lock {

    @Autowired
    private CuratorFramework zkClient;
    @Value("${zk.localPath}")
    private String lockPath;
    private String currentPath;
    private String beforePath;

    @Override
    public boolean tryLock() {
        try {
            //根節點的初始化放在構造函數裏面不生效
            if (zkClient.checkExists().forPath(lockPath) == null) {
                System.out.println("初始化根節點==========>" + lockPath);
                zkClient.create().creatingParentsIfNeeded().forPath(lockPath);
            }
            System.out.println("當前線程" + Thread.currentThread().getName() + "初始化根節點" + lockPath);
        } catch (Exception e) {
        }

        if (currentPath == null) {
            try {
                currentPath = this.zkClient.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                        .forPath(lockPath + "/");
            } catch (Exception e) {
                return false;
            }
        }
        try {
            //此處該如何獲取所有的臨時節點呢?如locks00004.而不是獲取/locks/order中的order作為子節點??
            List<String> childrens = this.zkClient.getChildren().forPath(lockPath);
            Collections.sort(childrens);
            if (currentPath.equals(lockPath + "/" + childrens.get(0))) {
                System.out.println("當前線程獲得鎖" + currentPath);
                return true;
            }else{
               //取前一個節點
                int curIndex = childrens.indexOf(currentPath.substring(lockPath.length() + 1));
                //如果是-1表示children裏面沒有該節點
                beforePath = lockPath + "/" + childrens.get(curIndex - 1);
            }
        } catch (Exception e) {
            return false;
        }
        return false;
    }

    @Override
    public void lock() {
        if (!tryLock()) {
            waiForLock();
            lock();
        }
    }

    @Override
    public void unlock() {
        try {
            zkClient.delete().guaranteed().deletingChildrenIfNeeded().forPath(currentPath);
        } catch (Exception e) {
            //guaranteed()保障機制,若未刪除成功,只要會話有效會在後臺一直嘗試刪除
        }
    }

    private void waiForLock(){
        CountDownLatch cdl = new CountDownLatch(1);
        //創建監聽器watch
          NodeCache nodeCache = new NodeCache(zkClient,beforePath);
        try {
            nodeCache.start(true);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                @Override
                public void nodeChanged() throws Exception {
                    cdl.countDown();
                    System.out.println(beforePath + "節點監聽事件觸發,重新獲得節點內容為:" + new String(nodeCache.getCurrentData().getData()));
                }
            });
        } catch (Exception e) {
        }
        //如果前一個節點還存在,則阻塞自己
        try {
            if (zkClient.checkExists().forPath(beforePath) == null) {
                cdl.await();
            }
        } catch (Exception e) {
        }finally {
            //阻塞結束,說明自己是最小的節點,則取消watch,開始獲取鎖
            try {
                nodeCache.close();
            } catch (IOException e) {
            }
        }
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
    }

    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
        return false;
    }

    @Override
    public Condition newCondition() {
        return null;
    }

}

5- 調用demo

package com.test.domi.controller;

import com.test.domi.common.utils.ZkUtil;
import com.test.domi.common.utils.lock.ZKlock;
import org.I0Itec.zkclient.ZkClient;
import org.apache.curator.framework.CuratorFramework;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/zk")
public class ZKController {

    @Autowired
    private CuratorFramework zkClient;
//    @Autowired
//    private ZkClient zkClient;

    private String url = "127.0.0.1:2181";
    private int timeout = 3000;
    private String lockPath = "/testl";
    @Autowired
    private ZKlock zklock;
    private int k = 1;

    @GetMapping("/lock")
    public Boolean getLock() throws Exception{

        for (int i = 0; i < 10; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                  zklock.lock();

          zklock.unlock();
} }).start(); }
return true; } }

springboot2整合zookeeper集成curator