作者:Grey

原文地址: ZooKeeper學習筆記四:使用ZooKeeper實現一個簡單的分散式鎖

前置知識

完成ZooKeeper叢集搭建以及熟悉ZooKeeperAPI基本使用

需求

當多個程序不在同一個系統中,用分散式鎖控制多個程序對資源的訪問。

在單機情況下,可以使用JUC包裡面的工具來進行互斥控制。

但是在分散式系統後,由於分散式系統多執行緒、多程序並且分佈在不同機器上,這將使原單機併發控制鎖策略失效,為了解決這個問題就需要一種跨JVM的互斥機制來控制共享資源的訪問,這就是分散式鎖的由來。

當多個程序不在同一個系統中,就需要用分散式鎖控制多個程序對資源的訪問。

我們可以用ZooKeeper來模擬實現一個簡單的分散式鎖

環境準備

一個zk集權,ip和埠分別為:

  • 192.168.205.145:2181
  • 192.168.205.146:2181
  • 192.168.205.147:2181
  • 192.168.205.148:2181

定義主方法

App.java

  1. public class App {
  2. public static void main(String[] args) {
  3. for (int i = 0; i < 10; i++) {
  4. new Thread(() -> {
  5. ZkLock lock = new ZkLock();
  6. lock.lock(); // 開啟鎖
  7. System.out.println(Thread.currentThread().getName() + " doing work");
  8. lock.release(); // 釋放鎖
  9. }).start();
  10. }
  11. while (true) {
  12. }
  13. }
  14. }

如上,我們設計了一個ZkLock,其中lock方法是鎖定資源,release方法是釋放資源,我們併發了10個執行緒併發訪問來模擬。

  1. public class ZkLock implements AsyncCallback.StringCallback, Watcher, AsyncCallback.StatCallback, AsyncCallback.Children2Callback {
  2. private CountDownLatch latch;
  3. private ZooKeeper zk;
  4. private String identify;
  5. private String lockPath;
  6. private String pathName;
  7. public ZkLock() {
  8. identify = Thread.currentThread().getName();
  9. lockPath = "/lock";
  10. latch = new CountDownLatch(1);
  11. zk = ZookeeperConfig.create(ADDRESS + "/testLock");
  12. }
  13. public void lock() {
  14. try {
  15. zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());
  16. latch.await();
  17. } catch (InterruptedException e) {
  18. e.printStackTrace();
  19. }
  20. }
  21. public void release() {
  22. try {
  23. zk.delete(pathName, -1);
  24. System.out.println(identify + " over work....");
  25. } catch (InterruptedException | KeeperException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. @Override
  30. public void processResult(int rc, String path, Object ctx, String name) {
  31. if (null != name) {
  32. // 建立成功
  33. System.out.println(identify + " created " + name);
  34. pathName = name;
  35. zk.getChildren("/", false, this, "dasdfas");
  36. }
  37. }
  38. @Override
  39. public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
  40. sort(children);
  41. int i = children.indexOf(pathName.substring(1));
  42. if (i == 0) {
  43. // 是第一個,獲得鎖,可以執行
  44. System.out.println(identify + " first...");
  45. try {
  46. zk.setData("/", identify.getBytes(UTF_8), -1);
  47. } catch (KeeperException | InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. latch.countDown();
  51. } else {
  52. zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
  53. }
  54. }
  55. @Override
  56. public void process(WatchedEvent event) {
  57. switch (event.getType()) {
  58. case None:
  59. break;
  60. case NodeCreated:
  61. break;
  62. case NodeDeleted:
  63. zk.getChildren("/", false, this, "sdf");
  64. break;
  65. case NodeDataChanged:
  66. break;
  67. case NodeChildrenChanged:
  68. break;
  69. }
  70. }
  71. @Override
  72. public void processResult(int rc, String path, Object ctx, Stat stat) {
  73. }
  74. }

關於上述程式碼的說明,我們規定建立的zk目錄為/testLock,所以我們可以通過zk客戶端在叢集中先把/testLock目錄建好,後續執行緒爭搶的時候,我們只需要建立序列化的臨時節點(以/lock開頭),因為是序列化的,所以我們可以設定讓第一個建立好節點的執行緒搶到鎖,其他的執行緒排隊等待。

所以lock方法實現如下:

  1. zk.create(lockPath, currentThread().getName().getBytes(UTF_8), OPEN_ACL_UNSAFE, EPHEMERAL_SEQUENTIAL, this, currentThread().getName());

lock方法在執行的時候,會有一個回撥,即:當節點建立成功後,會判斷/testLock節點中有沒有已經建立好的且在當前節點之前的節點,有的話,則註冊一個一個對於/testLock目錄的監聽:

  1. @Override
  2. public void processResult(int rc, String path, Object ctx, String name) {
  3. if (null != name) {
  4. // 建立成功
  5. System.out.println(identify + " created " + name);
  6. pathName = name;
  7. zk.getChildren("/", false, this, "dasdfas");
  8. }
  9. }

一旦發現/testLock目錄下已經有節點了,那麼我們拿到/testLock下的所有節點,並排序,取最小的那個節點執行即可:

  1. @Override
  2. public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
  3. sort(children);
  4. int i = children.indexOf(pathName.substring(1));
  5. if (i == 0) {
  6. // 是第一個,獲得鎖,可以執行
  7. System.out.println(identify + " first...");
  8. try {
  9. zk.setData("/", identify.getBytes(UTF_8), -1);
  10. } catch (KeeperException | InterruptedException e) {
  11. e.printStackTrace();
  12. }
  13. latch.countDown();
  14. } else {
  15. zk.exists("/" + children.get(i - 1), this, this, "ddsdf");
  16. }
  17. }

release方法很簡單,只需要把當前執行完畢的節點刪除即可:

  1. public void release() {
  2. try {
  3. zk.delete(pathName, -1);
  4. System.out.println(identify + " over work....");
  5. } catch (InterruptedException | KeeperException e) {
  6. e.printStackTrace();
  7. }
  8. }

執行效果

確保zk中有/testLock這個節點,如果沒有,請先建立一個:

Run App.java

可以看到控制檯輸出:

  1. Thread-5 created /lock0000000000
  2. Thread-4 created /lock0000000001
  3. Thread-1 created /lock0000000002
  4. Thread-9 created /lock0000000003
  5. Thread-6 created /lock0000000004
  6. Thread-2 created /lock0000000005
  7. Thread-3 created /lock0000000006
  8. Thread-0 created /lock0000000007
  9. Thread-8 created /lock0000000008
  10. Thread-7 created /lock0000000009
  11. Thread-5 first...
  12. Thread-5 doing work
  13. Thread-5 over work....
  14. Thread-4 first...
  15. Thread-4 doing work
  16. Thread-4 over work....
  17. Thread-1 first...
  18. Thread-1 doing work
  19. Thread-1 over work....
  20. Thread-9 first...
  21. Thread-9 doing work
  22. Thread-9 over work....
  23. Thread-6 first...
  24. Thread-6 doing work
  25. Thread-6 over work....
  26. Thread-2 first...
  27. Thread-2 doing work
  28. Thread-2 over work....
  29. Thread-3 first...
  30. Thread-3 doing work
  31. Thread-3 over work....
  32. Thread-0 first...
  33. Thread-0 doing work
  34. Thread-0 over work....
  35. Thread-8 first...
  36. Thread-8 doing work
  37. Thread-8 over work....
  38. Thread-7 first...
  39. Thread-7 doing work
  40. Thread-7 over work....