1. 程式人生 > >ZooKeeper分散式鎖與程式碼實現

ZooKeeper分散式鎖與程式碼實現

1.zk的核心機制之一:分散式鎖        分散式鎖能夠在一組程序之間提供互斥機制,使得在任何時候只有一個程序可以持有鎖。分散式鎖可以用於在大型分散式系統中實現領導者選舉,在任何時間點,持有鎖的那個程序就是系統的領導者。注意:不要將zookeeper自己的領導者選舉和使用ZooKeeper基本操作實現的一般領導者選舉服務混為一談。事實上,zookeepr自己的領導者選舉機制是不對外公開的。         2.鎖的具體實現         程式碼實現一個分散式鎖。 - 客戶端A public class DistributedClient {     // 超時時間     private static final int SESSION_TIMEOUT = 5000;
    // zookeeper server列表     private String hosts = "localhost:4180,localhost:4181,localhost:4182";     private String groupNode = "locks";     private String subNode = "sub";     private ZooKeeper zk;     // 當前client建立的子節點     private String thisPath;     // 當前client等待的子節點     private String waitPath;     private CountDownLatch latch = new CountDownLatch(1);
    /**      * 連線zookeeper      */     public void connectZookeeper() throws Exception {         zk = new ZooKeeper(hosts, SESSION_TIMEOUT, new Watcher() {             public void process(WatchedEvent event) {                 try {                     // 連線建立時, 開啟latch, 喚醒wait在該latch上的執行緒                     if (event.getState() == KeeperState.SyncConnected) {
                        latch.countDown();                     }                     // 發生了waitPath的刪除事件                     if (event.getType() == EventType.NodeDeleted && event.getPath().equals(waitPath)) {                         doSomething();                     }                 } catch (Exception e) {                     e.printStackTrace();                 }             }         });         // 等待連線建立         latch.await();         // 建立子節點         thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,                 CreateMode.EPHEMERAL_SEQUENTIAL);         // wait一小會, 讓結果更清晰一些         Thread.sleep(10);         // 注意, 沒有必要監聽"/locks"的子節點的變化情況         List<String> childrenNodes = zk.getChildren("/" + groupNode, false);         // 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖         if (childrenNodes.size() == 1) {             doSomething();         } else {             String thisNode = thisPath.substring(("/" + groupNode + "/").length());             // 排序             Collections.sort(childrenNodes);             int index = childrenNodes.indexOf(thisNode);             if (index == -1) {                 // never happened             } else if (index == 0) {                 // inddx == 0, 說明thisNode在列表中最小, 當前client獲得鎖    doSomething();             } else {                 // 獲得排名比thisPath前1位的節點                 this.waitPath = "/" + groupNode + "/" + childrenNodes.get(index - 1);                 // 在waitPath上註冊監聽器, 當waitPath被刪除時, zookeeper會回撥監聽器的process方法                 zk.getData(waitPath, true, new Stat());             }         }     }     private void doSomething() throws Exception {         try {             System.out.println("gain lock: " + thisPath);             Thread.sleep(2000);             // do something         } finally {             System.out.println("finished: " + thisPath);             // 將thisPath刪除, 監聽thisPath的client將獲得通知             // 相當於釋放鎖             zk.delete(this.thisPath, -1);         }     }     public static void main(String[] args) throws Exception {         for (int i = 0; i < 10; i++) {             new Thread() {                 public void run() {                     try {                         DistributedClient dl = new DistributedClient();                         dl.connectZookeeper();                     } catch (Exception e) {                         e.printStackTrace();                     }                 }             }.start();         }         Thread.sleep(Long.MAX_VALUE);     } } - 分散式多程序模式實現: public class DistributedClientMy {      // 超時時間      private static final int SESSION_TIMEOUT = 5000;      // zookeeper server列表      private String hosts = "spark01:2181,spark02:2181,spark03:2181";      private String groupNode = "locks";      private String subNode = "sub";      private boolean haveLock = false;      private ZooKeeper zk;      // 當前client建立的子節點      private volatile String thisPath;      /**       * 連線zookeeper       */      public void connectZookeeper() throws Exception {          zk = new ZooKeeper("spark01:2181", SESSION_TIMEOUT, new Watcher() {               public void process(WatchedEvent event) {                    try {                        // 子節點發生變化                        if (event.getType() == EventType.NodeChildrenChanged && event.getPath().equals("/" + groupNode)) {                             // thisPath是否是列表中的最小節點                             List<String> childrenNodes = zk.getChildren("/" + groupNode, true);                             String thisNode = thisPath.substring(("/" + groupNode + "/").length());                             // 排序                             Collections.sort(childrenNodes);                             if (childrenNodes.indexOf(thisNode) == 0) {                                  doSomething();                                  thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,                                           CreateMode.EPHEMERAL_SEQUENTIAL);                             }                        }                    } catch (Exception e) {                        e.printStackTrace();                    }               }          });          // 建立子節點          thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,                    CreateMode.EPHEMERAL_SEQUENTIAL);          // wait一小會, 讓結果更清晰一些          Thread.sleep(new Random().nextInt(1000));          // 監聽子節點的變化          List<String> childrenNodes = zk.getChildren("/" + groupNode, true);          // 列表中只有一個子節點, 那肯定就是thisPath, 說明client獲得鎖          if (childrenNodes.size() == 1) {               doSomething();               thisPath = zk.create("/" + groupNode + "/" + subNode, null, Ids.OPEN_ACL_UNSAFE,                        CreateMode.EPHEMERAL_SEQUENTIAL);          }      }      /**       * 共享資源的訪問邏輯寫在這個方法中       */      private void doSomething() throws Exception {          try {               System.out.println("gain lock: " + thisPath);               Thread.sleep(2000);               // do something          } finally {               System.out.println("finished: " + thisPath);               // 將thisPath刪除, 監聽thisPath的client將獲得通知               // 相當於釋放鎖               zk.delete(this.thisPath, -1);          }      }      public static void main(String[] args) throws Exception {          DistributedClientMy dl = new DistributedClientMy();          dl.connectZookeeper();          Thread.sleep(Long.MAX_VALUE);      } }