1. 程式人生 > >ZooKeeper連接並創建節點以及實現分布式鎖操作節點排序輸出最小節點Demo

ZooKeeper連接並創建節點以及實現分布式鎖操作節點排序輸出最小節點Demo

false bytes roc 永遠 err index public nal kde

class LockThread implements Runnable {
    private DistributedLock lock;
    

    public LockThread(int threadId,CountDownLatch latch) throws Exception {
        this.lock = new DistributedLock(threadId,latch);
    }

    @Override
    public void run() {
        //每一個線程對象啟動後都應該創建一個臨時的節點信息
        try
{ this.lock.handle();//進行具體的操作處理 } catch (Exception e) { e.printStackTrace(); } } } public class TestDistributedLock { public static void main(String[] args) throws Exception { CountDownLatch latch = new CountDownLatch(10); for (int i = 0; i < 10; i++) {
new Thread(new LockThread(i,latch)).start();; } //Thread.sleep(Long.MAX_VALUE);//為了保證可以觀察到所有的臨時節點信息,保證此處先不進行關閉 latch.await(); System.out.println("************* 所有的線程對象操作完畢 *************"); }
public class DistributedLock {//建立一個描述分布式鎖的程序處理類
    public static final String CONNECTION_RUL = "192.168.12.121:2181,192.168.12.122:2181";

    
public static final int SESSION_TIMEOUT = 2000;//設置連接超時時間 public static final String AUTH_INFO = "zkuser:mldnjava";//進行連接的授權信息 public static final String GROUPNODE = "/mldn-lock";//根節點 public static final String SUNBODE = GROUPNODE + "/lockthread-";//子節點 private CountDownLatch latch = null; //本操作的主要目的是為了在取得zookeeper連接之後才能進行後續的處理 private CountDownLatch connectLatch = new CountDownLatch(1); private ZooKeeper zkClient = null; //建立Zookeeper程序控制類 private String selfPath; //保存每次創建的臨時節點信息 private String waitPath; //保存下一個要進行處理的節點 private int threadId = 0; /** 進行一些初始化操作使用 * * @param threadId 隨意給定一個編號信息 * @param latch 進行線程同步處理 * @throws Exception */ public DistributedLock(int threadId, CountDownLatch latch) throws Exception { this.threadId = threadId;//保存每一個線程對象自己的ID信息 this.latch = latch; this.connectionZookeeper();//進行節點的連接 } public void handle() throws Exception {//具體業務處理 this.createSubNode();//創建臨時節點操作 } public void handleSuccess() throws Exception {//表示取得鎖之後進行的處理 if (this.zkClient.exists(this.selfPath, false) == null) { return;//如果當前節點不存在 } this.handleCallback();//執行具體的業務操作 //如果某一個節點操作完畢了,那麽應該立即刪除掉該節點,否則獲得的最小節點永遠都是該節點 this.zkClient.delete(selfPath, -1); this.releaseZookeeper();//釋放連接 this.latch.countDown();//進行減減的操作 } public void handleCallback() throws Exception {//取得分布式鎖之後的目的是要進行具體的操作 Thread.sleep(200);//實現一個延遲處理 System.out .println("****** Thread-" + this.threadId + "獲得操作權,進行具體的業務操作"); } public boolean checkMinPath() throws Exception {//進行最小節點的判斷 List<String> childen = this.zkClient.getChildren(GROUPNODE, false);//取得所有的節點信息 Collections.sort(childen); //進行所有節點的排序,這樣最小的節點就拍到最上面 int index = childen .indexOf(this.selfPath.substring(GROUPNODE.length() + 1)); switch (index) { case 0: { return true; //已經確定好當前的節點為最小節點 } case -1: { return false; //該節點可能已經消失了 } default: {//表示該節點不屬於最小節點,那麽應該向後繼續排查 this.waitPath = GROUPNODE + "/" + childen.get(index - 1);//獲得下一個節點 try { this.zkClient.getData(waitPath, true, new Stat());//取得下一個節點的數據 return false; //本節點不是當前的操作的最小節點 } catch (Exception e) {//如果出現了異常,則表示該節點不存在 if (this.zkClient.exists(waitPath, false) == null) { return this.checkMinPath();//繼續向後檢測 } else { throw e; } } } } } public void createSubNode() throws Exception {//每一個線程對象的啟動都要求創建一個節點信息 this.zkClient.create(SUNBODE, ("Thread-" + this.threadId).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("【Thread-" + this.threadId + "、創建新的臨時節點】" + this.selfPath); //當節點創建完成之後就需要進行最小節點的檢測 if (this.checkMinPath()) {//如果當前的節點為整個項目的最小節點 this.handleSuccess();//進行鎖後的具體操作 } } public void connectionZookeeper() throws Exception {//連接zookeeper服務 this.zkClient = new ZooKeeper(CONNECTION_RUL, SESSION_TIMEOUT, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == EventType.None) {//第一次連接zookeeper的時候會出現none DistributedLock.this.connectLatch.countDown();//表示已經連接成功 } else { //要處理刪除節點操作,並且要確定下一個節點是已經準備出來的節點信息 if (event.getType() == EventType.NodeDeleted && event.getPath().equals( DistributedLock.this.waitPath)) { try { if (DistributedLock.this.checkMinPath()) {//如果當前的節點為整個項目的最小節點 DistributedLock.this.handleSuccess();//進行鎖後的具體操作 } } catch (Exception e) { e.printStackTrace(); } } } } }); this.zkClient.addAuthInfo("digest", AUTH_INFO.getBytes());//進行授權認證 if (this.zkClient.exists(GROUPNODE, false) == null) { this.zkClient.create(GROUPNODE, "LOCKDEMO".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } this.connectLatch.await();//等待連接後才執行後續的功能 } public void releaseZookeeper() {//進行zookeeper的連接釋放 if (this.zkClient != null) { try { this.zkClient.close(); } catch (InterruptedException e) { e.printStackTrace(); } } }

ZooKeeper連接並創建節點以及實現分布式鎖操作節點排序輸出最小節點Demo