1. 程式人生 > >zookeeper 共享鎖問題—— 監聽最小目錄節點時,並準備向監聽器列表中註冊監聽器時,最小目錄節點被刪除。

zookeeper 共享鎖問題—— 監聽最小目錄節點時,並準備向監聽器列表中註冊監聽器時,最小目錄節點被刪除。

前言: zookeeper 分佈鎖的原來是 如圖:


            

使用zookeeper 來實現分散式鎖,發現一個問題,當客戶端A執行緒發出命令刪除最小的目錄lock_001被刪除時,並不會立刻刪除,因為命令是通過網路協議傳輸過去的,中間會產生一定的時間段,雖然這個時間段很小很小,但是在這個時間段內會發生一件特殊的事:此時會有另外一個客戶端B執行緒 在A執行緒 沒有傳送到zookeeper伺服器的時候,B執行緒查詢了最小目錄lock_001並且還存在,此時B執行緒準備向監聽器列表新增監聽器lock_001的watcher,就在此時執行緒A的命令執行了,lock_001 目錄刪除了,但是B執行緒還以為lock_001存在,並在監聽器列表新增監聽器lock_001的watcher,然後B執行緒等待進入wait() 等待,直到lock_001 節點被刪除的時候對應的watcher被呼叫,在process中 將B執行緒 notify 喚醒,B執行緒才能執行,可是此時的lock_001已經被刪除了,那麼B執行緒將會一直等待下去。
 
 
 
 按順序啟動:  ShareLock1  ShareLock2  ShareLock3  ShareLock4  : 這4個類的邏輯相同,
 /ElecLock 下面的子節點都是臨時有序的節點。
 
ShareLock1 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_001  並獲得ElecLock中的子節點中的最小節點是否等於自己穿件的節點,如果等於則執行業務邏輯
如果不是則監聽最小的目錄,在監聽器中重新執行這個邏輯判斷
ShareLock2 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_002
ShareLock3 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_003
ShareLock4 會向 zookeeper 中的下注冊目錄 /ElecLock/lock_004

ShareLock2  ShareLock3  ShareLock4 與 ShareLock1 一樣,唯一的區別就是 ShareLock1 中模擬業務邏輯消耗了20秒

  
  在ShareLock1 類中,判斷建立的節點是否是ElecLock 中的最小節點,是最小的,並執行邏輯,模擬業務邏輯的消耗時間是20秒,當執行完以後刪除lock_001,
  ShareLock2  ShareLock3  ShareLock4 中的註冊的監聽器被觸發,同理ShareLock2 是最小的 開始執行,但是此時沒有模擬業務邏輯的消耗時間是20秒,只是簡單的列印一下,業務開始,業務結束,刪除節點lock_002,此時上述所寫的問題出現了,
  ShareLock3,ShareLock4 這兩個下的業務執行緒,因為判斷 節點 lock_003 ,lock_003 不是為最小的,並且開始向 最小的lock_002 註冊監聽器,並且還註冊成功了,但是註冊成功的同時,lock_002也被刪掉了,所以造成ShareLock3,ShareLock4 永久的等待。(本人常事不監聽最小目錄換為監聽父目錄ElecLock,這種情況任然存在。)
  
  
 下面是我的程式碼: 
  
  
  package com.jvm.others.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

/**利用zookeeper實現共享鎖
 * 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
 * @author pc
 *
 */
public class ShareLock {
    
    static ZooKeeper keeper = null;
    static String parentNode="/ElecLock";
    static{
        try {
            keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() {  //監聽與伺服器建立連線後的回撥函式process
                
                @Override
                public void process(WatchedEvent event) {
                    //不處理邏輯
                    System.out.println(this.toString()+"-----------預設監聽器---------------");
                    
                }
            });
            
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    @Test
    public void RegisterLock() throws Exception {
         
        
        
        //建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如  lock_0000000007,來生成 真正要建立的節點名稱
        String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
        callBack(create);
        
        
        System.out.println(create); 
        
    }

    //重複監聽或者執行業務邏輯
    private void callBack(final String create) throws KeeperException,
            InterruptedException {
        //獲得所有鎖節點
        List<String> children = keeper.getChildren(parentNode, false);
        String[] array = children.toArray(new String[children.size()]);
        
        Arrays.sort(array);
        //獲得最小的鎖節點
        String minNode = array[0];
        minNode=parentNode+"/"+minNode;
        //判斷當前建立的鎖節點是否為最小鎖節點
        if(minNode.equals(create)){
            //如果是進行業務操作
            synchronized (keeper) {
                System.out.println("業務邏輯  開始  執行");
                Thread.sleep(20000);// 模擬執行邏輯的時間
                keeper.notify();  
                System.out.println("業務邏輯  開始  完畢");
                System.out.println("刪除此節點"+ create);
                keeper.delete(create, -1 );
            }
            
        }else{
            //如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
            //如果節點不存在,則返回null, 並且watcher 不回被觸發
            Stat exists = keeper.exists(minNode, new Watcher() {
                
                @Override
                public void process(WatchedEvent watchedevent) {
                    try {
                        System.out.println("重新監聽");
                        callBack(create);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            
             
            /**
             *  高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
             *  此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
             *  可以避免 這種特殊情況
             *  
             *  第二種特殊情況: 當我們獲得了zookeeper中的資料最小的minNode,在執行到keeper.exists(minNode, new Watcher()時,並判斷當前minNode節點存在,準備向註冊監聽器列表中註冊監聽器時,
             *  minNode 節點在這一時刻被刪除了,此時程式還以為minNode 節點存在,並且向註冊監聽器列表中註冊事件,當註冊完畢時,並返回結果,此時程式等待 minNode 被刪除時 監聽器被執行,
             *  但是 minNode 早已經不存在,所以,此時的執行緒會一直處於等待的狀態。並且業務邏輯永遠不會被執行。
             */
            System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
            if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
                callBack(create);
            }
            
            synchronized (keeper) {
                System.out.println(this.toString()+" wait end  業務邏輯執行  等待");
                keeper.wait();
                System.out.println(this.toString()+" wait end   執行緒 結束");
            }
            
        }
    }
    
    
    public static void main(String[] args) throws Exception {
        
        ShareLock shareLock = new ShareLock();
        shareLock.RegisterLock();
        
    }
    
}

package com.jvm.others.zookeeper;

import java.util.Arrays;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

public class ShareLock2 {
    static ZooKeeper keeper = null;
    static String parentNode="/ElecLock";
    static{
        try {
            keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() {  //監聽與伺服器建立連線後的回撥函式process
                
                @Override
                public void process(WatchedEvent event) {
                    //不處理邏輯
                    System.out.println(this.toString()+"-----------預設監聽器---------------");
                }
            });
            
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    @Test
    public void RegisterLock() throws Exception {
         // TODO Auto-generated method stub
       
        
        
        
        //建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如  lock_0000000007,來生成 真正要建立的節點名稱
        String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
        callBack(create);
        
        
        System.out.println(create); 
        
    }

    //重複監聽或者執行業務邏輯
    private void callBack(final String create) throws KeeperException,
            InterruptedException {
        //獲得所有鎖節點
        List<String> children = keeper.getChildren(parentNode, false);
        String[] array = children.toArray(new String[children.size()]);
        
        Arrays.sort(array);
        //獲得最小的鎖節點
        String minNode = array[0];
        minNode=parentNode+"/"+minNode;
        System.out.println("當前最小的節點是: " + minNode);
        //判斷當前建立的鎖節點是否為最小鎖節點
        if(minNode.equals(create)){
            //如果是進行業務操作
            synchronized (keeper) {
                System.out.println("業務邏輯  開始  執行");
                keeper.notify();  
                System.out.println("業務邏輯  開始  完畢");
                System.out.println("刪除此節點"+ create);
                keeper.delete(create, -1 );
            }
            
        }else{
            //如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
            //如果節點不存在,則返回null, 並且watcher 不回被觸發
            Stat exists = keeper.exists(minNode, new Watcher() {
                
                @Override
                public void process(WatchedEvent watchedevent) {
                    try {
                        System.out.println("重新監聽");
                        callBack(create);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            
        
            /**
             *  高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
             *  此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
             *  可以避免 這種特殊情況
             */
            System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
            if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
                callBack(create);
            }
            synchronized (keeper) {
                System.out.println(this.toString()+" wait end  業務邏輯執行  等待");
                keeper.wait();
                System.out.println(this.toString()+" wait end   執行緒 結束");
            }
            
        }
    }
    
    
    public static void main(String[] args) throws Exception {
        
        ShareLock2 shareLock = new ShareLock2();
        shareLock.RegisterLock();
        
    }
}

package com.jvm.others.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

/**利用zookeeper實現共享鎖
 * 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
 * @author pc
 *
 */
public class ShareLock3 {
    
    static ZooKeeper keeper = null;
    static String parentNode="/ElecLock";
    static{
        try {
            keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() {  //客戶端與伺服器建立連線後註冊監聽器並回調函式process,和預設執行監聽器的回掉函式,如keeper.getChildren(parentNode, true); 這裡的true指 使用預設監聽器,如果parentNode 下發生了 增加刪除 會觸發預設監聽器                                
                
                @Override
                public void process(WatchedEvent event) {
                    //不處理邏輯
                    System.out.println(this.toString()+"--------------預設監聽器------------");
                }
            });
            
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    @Test
    public void RegisterLock() throws Exception {
        
        
        
        //建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如  lock_0000000007,來生成 真正要建立的節點名稱
        String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
        callBack(create);
        
        
        System.out.println(create); 
        
    }

    //重複監聽或者執行業務邏輯
    private void callBack(final String create) throws KeeperException,
            InterruptedException {
        //獲得所有鎖節點
        List<String> children = keeper.getChildren(parentNode, false);
        String[] array = children.toArray(new String[children.size()]);
        
        Arrays.sort(array);
        //獲得最小的鎖節點
        String minNode = array[0];
        minNode=parentNode+"/"+minNode;
        System.out.println("當前最小的節點是: " + minNode);
        //判斷當前建立的鎖節點是否為最小鎖節點
        if(minNode.equals(create)){
            //如果是進行業務操作
            synchronized (keeper) {
                System.out.println("業務邏輯  開始  執行");
                keeper.notify();  
                System.out.println("業務邏輯  開始  完畢");
                System.out.println("刪除此節點"+ create);
                keeper.delete(create, -1 );
            }
            
        }else{
            //如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
            //如果節點不存在,則返回null, 並且watcher 不回被觸發,而卻會發生下面所說的情況。
            Stat exists = keeper.exists(minNode, new Watcher() {
                
                @Override
                public void process(WatchedEvent watchedevent) {
                    try {
                        System.out.println("重新監聽");
                        callBack(create);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
            
            
            /**
             *  高併發注意這種情況1,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
             *  此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
             *  可以避免 這種特殊情況
             *  
             *  
             */
            System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
            if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
                callBack(create);
            }
            
            synchronized (keeper) {
                System.out.println(this.toString()+" wait end  業務邏輯執行  等待");
                keeper.wait();
                System.out.println(this.toString()+" wait end   執行緒 結束");
            }
            
        }
    }
    
    
    public static void main(String[] args) throws Exception {
        
        ShareLock3 shareLock = new ShareLock3();
        shareLock.RegisterLock();
        
    }
    
}

package com.jvm.others.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;

/**利用zookeeper實現共享鎖
 * 思想:在zookeeper中的一個Lock目錄下注冊臨時ephemeral節點,每個節點為一把鎖
 * @author pc
 *
 */
public class ShareLock4 {
    
    static ZooKeeper keeper = null;
    static String parentNode="/ElecLock";
    static{
        try {
            keeper = new ZooKeeper("192.168.17.7:2181", 3000000, new Watcher() {  //監聽與伺服器建立連線後的回撥函式process
                
                @Override
                public void process(WatchedEvent event) {
                    //不處理邏輯
                    System.out.println(this.toString()+"-----------預設監聽器---------------");
                }
            });
            
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    
    @Test
    public void RegisterLock() throws Exception {
         // TODO Auto-generated method stub
         
        
        
        
        //建立鎖節點,在建立臨時有序節點時,他會在設定的節點名稱 拼接序列數字 如  lock_0000000007,來生成 真正要建立的節點名稱
        String create = keeper.create("/ElecLock/lock_", "lock".getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL );
        callBack(create);
        
        
        System.out.println(create); 
        
    }

    //重複監聽或者執行業務邏輯
    private void callBack(final String create) throws KeeperException,
            InterruptedException {
        //獲得所有鎖節點
        List<String> children = keeper.getChildren(parentNode, false);
        String[] array = children.toArray(new String[children.size()]);
        
        Arrays.sort(array);
        //獲得最小的鎖節點
        String minNode = array[0];
        minNode=parentNode+"/"+minNode;
        System.out.println("當前最小的節點是: " + minNode);
        //判斷當前建立的鎖節點是否為最小鎖節點
        if(minNode.equals(create)){
            //如果是進行業務操作
            synchronized (keeper) {
                System.out.println("業務邏輯  開始  執行");
                keeper.notify();  
                System.out.println("業務邏輯  開始  完畢");
                System.out.println("刪除此節點"+ create);
                keeper.delete(create, -1 );
            }
            
        }else{
            //如果不是進行等待並且監聽當前最小節點(即鎖庫parentNode)
            //如果節點不存在,則返回null, 並且watcher 不回被觸發
            // exists 只判斷當前節點(不包含子節點)在 刪除增加更新時才會觸發 watcher
            Stat exists = keeper.exists(minNode, new Watcher() {
                
                @Override
                public void process(WatchedEvent watchedevent) {
                    try {
                        System.out.println("重新監聽");
                        callBack(create);
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
            });
          
            /**
             *  高併發注意這種情況,當我們獲得了zookeeper中的資料最小的minNode,在我們還沒有執行到keeper.exists(minNode, new Watcher()時
             *  此時 minNode 正好被其他業務的執行緒執行了,並且此節點已經被刪除。我們需要對 exists 進行判斷,如果為null,我們應該重新呼叫執行callBack 方法
             *  可以避免 這種特殊情況
             *  
             *  第二種特殊情況: 當我們獲得了zookeeper中的資料最小的minNode,在執行到keeper.exists(minNode, new Watcher()時,並判斷當前minNode節點存在,準備向註冊監聽器列表中註冊監聽器時,
             *  minNode 節點在這一時刻被刪除了,此時程式還以為minNode 節點存在,並且向註冊監聽器列表中註冊事件,當註冊完畢時,並返回結果,此時程式等待 minNode 被刪除時 監聽器被執行,
             *  但是 minNode 早已經不存在,所以,此時的執行緒會一直處於等待的狀態。並且業務邏輯永遠不會被執行。
             */
            System.out.println("最小節點"+minNode+(exists ==null?"節點不存在":exists));
            if(exists == null){ //如果當前最小的節點不存在了,則重新執行callBack
                callBack(create);
            }
            
            synchronized (keeper) {
                System.out.println(this.toString()+" wait end  業務邏輯執行  等待");
                keeper.wait();
                System.out.println(this.toString()+" wait end   執行緒 結束");
            }
            
        }
    }
    
    
    public static void main(String[] args) throws Exception {
        
        ShareLock4 shareLock = new ShareLock4();
        shareLock.RegisterLock();
        
    }
    
}

  歡迎大神指點小弟。