鎖:分散式的鎖全域性同步,這意味著任何一個時間點不會有兩個客戶端都擁有相同的鎖。

1.可重入鎖Shared Reentrant Lock

    首先我們先看一個全域性可重入的鎖(可以多次獲取,不會被阻塞)。Shared意味著鎖是全域性可見的,客戶端都可以請求鎖。Reentrant和JDK的ReentrantLock類似,意味著同一個客戶端在擁有鎖的同時,可以多次獲取,不會被阻塞。
1.可重入鎖相關類介紹
    它是由類InterProcessMutex來實現。它的主要方法:
  1. // 構造方法
  2. public InterProcessMutex(CuratorFramework client, String path)
  3. public InterProcessMutex(CuratorFramework client, String path, LockInternalsDriver driver)
  4. // 通過acquire獲得鎖,並提供超時機制:
  5. public void acquire() throws Exception
  6. public boolean acquire(long time, TimeUnit unit) throws Exception
  7. // 撤銷鎖
  8. public void makeRevocable(RevocationListener<InterProcessMutex> listener)
  9. public void makeRevocable(final RevocationListener<InterProcessMutex> listener, Executor executor)
錯誤處理:還是強烈推薦你使用ConnectionStateListener處理連線狀態的改變。當連線LOST時你不再擁有鎖。
2.編寫示例程式
    首先讓我們建立一個模擬的共享資源, 這個資源期望只能單執行緒的訪問,否則會有併發問題。
  1. public class FakeLimitedResource
  2. {
  3. private final AtomicBoolean inUse = new AtomicBoolean(false);
  4. // 模擬只能單執行緒操作的資源
  5. public void use() throws InterruptedException
  6. {
  7. if (!inUse.compareAndSet(false, true))
  8. {
  9. // 在正確使用鎖的情況下,此異常不可能丟擲
  10. throw new IllegalStateException("Needs to be used by one client at a time");
  11. }
  12. try
  13. {
  14. Thread.sleep((long) (3 * Math.random()));
  15. }
  16. finally
  17. {
  18. inUse.set(false);
  19. }
  20. }
  21. }
    然後建立一個ExampleClientThatLocks類,它負責請求鎖,使用資源,釋放鎖這樣一個完整的訪問過程。
  1. public class ExampleClientThatLocks
  2. {
  3. private final InterProcessMutex lock;
  4. private final FakeLimitedResource resource;
  5. private final String clientName;
  6. public ExampleClientThatLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
  7. {
  8. this.resource = resource;
  9. this.clientName = clientName;
  10. lock = new InterProcessMutex(client, lockPath);
  11. }
  12. public void doWork(long time, TimeUnit unit) throws Exception
  13. {
  14. if (!lock.acquire(time, unit))
  15. {
  16. throw new IllegalStateException(clientName + " 不能得到互斥鎖");
  17. }
  18. try
  19. {
  20. System.out.println(clientName + " 已獲取到互斥鎖");
  21. resource.use(); // 使用資源
  22. Thread.sleep(1000 * 1);
  23. }
  24. finally
  25. {
  26. System.out.println(clientName + " 釋放互斥鎖");
  27. lock.release(); // 總是在finally中釋放
  28. }
  29. }
  30. }
    最後建立主程式來測試:
  1. public class InterProcessMutexExample
  2. {
  3. private static final int QTY = 5;
  4. private static final int REPETITIONS = QTY * 10;
  5. private static final String PATH = "/examples/locks";
  6. public static void main(String[] args) throws Exception
  7. {
  8. final FakeLimitedResource resource = new FakeLimitedResource();
  9. final List<CuratorFramework> clientList = new ArrayList<CuratorFramework>();
  10. for (int i = 0; i < QTY; i++)
  11. {
  12. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  13. client.start();
  14. clientList.add(client);
  15. }
  16. System.out.println("連線初始化完成!");
  17. ExecutorService service = Executors.newFixedThreadPool(QTY);
  18. for (int i = 0; i < QTY; ++i)
  19. {
  20. final int index = i;
  21. Callable<Void> task = new Callable<Void>()
  22. {
  23. @Override
  24. public Void call() throws Exception
  25. {
  26. try
  27. {
  28. final ExampleClientThatLocks example = new ExampleClientThatLocks(clientList.get(index), PATH, resource, "Client " + index);
  29. for (int j = 0; j < REPETITIONS; ++j)
  30. {
  31. example.doWork(10, TimeUnit.SECONDS);
  32. }
  33. }
  34. catch (Throwable e)
  35. {
  36. e.printStackTrace();
  37. }
  38. finally
  39. {
  40. CloseableUtils.closeQuietly(clientList.get(index));
  41. }
  42. return null;
  43. }
  44. };
  45. service.submit(task);
  46. }
  47. service.shutdown();
  48. service.awaitTermination(10, TimeUnit.MINUTES);
  49. System.out.println("OK!");
  50. }
  51. }
程式碼也很簡單,生成5個client,每個client重複執行10次 請求鎖--訪問資源--釋放鎖的過程。每個client都在獨立的執行緒中。
結果可以看到,鎖是隨機的被每個例項排他性的使用。
既然是可重入鎖,你可以在一個執行緒中多次呼叫acquire,線上程擁有鎖時它總是返回true。
注意:你不應該在多個執行緒中用同一個InterProcessMutex, 你可以在每個執行緒中都生成一個InterProcessMutex例項,它們的path都一樣,這樣它們可以共享同一個鎖。
3.示例執行結果
    執行結果控制檯如下:
  1. 連線初始化完成!
  2. Client 4 已獲取到互斥鎖
  3. Client 4 釋放互斥鎖
  4. Client 3 已獲取到互斥鎖
  5. Client 3 釋放互斥鎖
  6. ......
  7. Client 2 已獲取到互斥鎖
  8. Client 2 釋放互斥鎖
  9. OK!
    執行時檢視Zookeeper節點資訊如下:

2.不可重入鎖Shared Lock

    這個鎖和上面的相比,就是少了Reentrant的功能,也就意味著它不能在同一個執行緒中重入。這個類是InterProcessSemaphoreMutex使用方法和上面的類類似
    首先我們將上面的例子修改一下,測試一下它的重入。修改ExampleClientThatLocks.doWork,連續兩次acquire:
  1. public void doWork(long time, TimeUnit unit) throws Exception
  2. {
  3. if (!lock.acquire(time, unit))
  4. {
  5. throw new IllegalStateException(clientName + " 不能得到互斥鎖");
  6. }
  7. System.out.println(clientName + " 已獲取到互斥鎖");
  8. if (!lock.acquire(time, unit))
  9. {
  10. throw new IllegalStateException(clientName + " 不能得到互斥鎖");
  11. }
  12. System.out.println(clientName + " 再次獲取到互斥鎖");
  13. try
  14. {
  15. resource.use(); // 使用資源
  16. Thread.sleep(1000 * 1);
  17. }
  18. finally
  19. {
  20. System.out.println(clientName + " 釋放互斥鎖");
  21. lock.release(); // 總是在finally中釋放
  22. lock.release(); // 獲取鎖幾次 釋放鎖也要幾次
  23. }
  24. }
注意:我們也需要呼叫release兩次。這和JDK的ReentrantLock用法一致。如果少呼叫一次release,則此執行緒依然擁有鎖。
上面的程式碼沒有問題,我們可以多次呼叫acquire,後續的acquire也不會阻塞。
但是將上面的InterProcessMutex換成不可重入鎖InterProcessSemaphoreMutex,如果再執行上面的程式碼,結果就會發現執行緒被阻塞在第二個acquire上,直到超時。也就是此鎖不是可重入的。

3.可重入讀寫鎖Shared Reentrant Read Write Lock

    類似JDK的ReentrantReadWriteLock。一個讀寫鎖管理一對相關的鎖。一個負責讀操作,另外一個負責寫操作。讀操作在寫鎖沒被使用時可同時由多個程序使用,而寫鎖在使用時不允許讀(阻塞)。
    此鎖是可重入的。一個擁有寫鎖的執行緒可重入讀鎖,但是讀鎖卻不能進入寫鎖。這也意味著寫鎖可以降級成讀鎖, 比如請求寫鎖 --->讀鎖 ---->釋放寫鎖。從讀鎖升級成寫鎖是不行的。
1.可重入讀寫鎖相關類介紹
    可重入讀寫鎖主要由兩個類實現:InterProcessReadWriteLock、InterProcessMutex。使用時首先建立一個InterProcessReadWriteLock例項,然後再根據你的需求得到讀鎖或者寫鎖,讀寫鎖的型別是InterProcessMutex。

2.編寫示例程式
    示例程式仍使用上面的FakeLimitedResource、InterProcessMutexExample類
  1. public class ExampleClientReadWriteLocks
  2. {
  3. private final InterProcessReadWriteLock lock;
  4. private final InterProcessMutex readLock;
  5. private final InterProcessMutex writeLock;
  6. private final FakeLimitedResource resource;
  7. private final String clientName;
  8. public ExampleClientReadWriteLocks(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName)
  9. {
  10. this.resource = resource;
  11. this.clientName = clientName;
  12. lock = new InterProcessReadWriteLock(client, lockPath);
  13. readLock = lock.readLock();
  14. writeLock = lock.writeLock();
  15. }
  16. public void doWork(long time, TimeUnit unit) throws Exception
  17. {
  18. // 注意只能先得到寫鎖再得到讀鎖,不能反過來!!!
  19. if (!writeLock.acquire(time, unit))
  20. {
  21. throw new IllegalStateException(clientName + " 不能得到寫鎖");
  22. }
  23. System.out.println(clientName + " 已得到寫鎖");
  24. if (!readLock.acquire(time, unit))
  25. {
  26. throw new IllegalStateException(clientName + " 不能得到讀鎖");
  27. }
  28. System.out.println(clientName + " 已得到讀鎖");
  29. try
  30. {
  31. resource.use(); // 使用資源
  32. Thread.sleep(1000 * 1);
  33. }
  34. finally
  35. {
  36. System.out.println(clientName + " 釋放讀寫鎖");
  37. readLock.release();
  38. writeLock.release();
  39. }
  40. }
  41. }
    在這個類中我們首先請求了一個寫鎖,然後降級成讀鎖。執行業務處理,然後釋放讀寫鎖。修改InterProcessMutexExample類中的ExampleClientThatLocks為ExampleClientReadWriteLocks然後執行示例。
3.示例執行結果
    執行結果控制檯:
  1. 連線初始化完成!
  2. Client 1 已得到寫鎖
  3. Client 1 已得到讀鎖
  4. Client 1 釋放讀寫鎖
  5. ......
  6. Client 3 已得到寫鎖
  7. Client 3 已得到讀鎖
  8. Client 3 釋放讀寫鎖
  9. OK!
    此時檢視Zookeeper資料節點如下:

4.訊號量Shared Semaphore

    一個計數的訊號量類似JDK的Semaphore。JDK中Semaphore維護的一組許可(permits),而Cubator中稱之為租約(Lease)
    有兩種方式可以決定semaphore的最大租約數。第一種方式是有使用者給定的path決定。第二種方式使用SharedCountReader類。
    如果不使用SharedCountReader,沒有內部程式碼檢查程序是否假定有10個租約而程序B假定有20個租約。 所以所有的例項必須使用相同的numberOfLeases值.
1.訊號量實現類說明
主要類有:
  • InterProcessSemaphoreV2 - 訊號量實現類
  • Lease - 租約(單個訊號)
  • SharedCountReader - 計數器,用於計算最大租約數量
    這次呼叫acquire會返回一個租約物件。客戶端必須在finally中close這些租約物件,否則這些租約會丟失掉。但是,如果客戶端session由於某種原因比如crash丟掉,那麼這些客戶端持有的租約會自動close,這樣其它客戶端可以繼續使用這些租約。
租約還可以通過下面的方式返還:
  1. public void returnLease(Lease lease)
  2. public void returnAll(Collection<Lease> leases)
    注意一次你可以請求多個租約,如果Semaphore當前的租約不夠,則請求執行緒會被阻塞。同時還提供了超時的過載方法。
  1. public Lease acquire() throws Exception
  2. public Collection<Lease> acquire(int qty) throws Exception
  3. public Lease acquire(long time, TimeUnit unit) throws Exception
  4. public Collection<Lease> acquire(int qty, long time, TimeUnit unit) throws Exception
2.編寫示例程式
  1. public class InterProcessSemaphoreExample
  2. {
  3. private static final int MAX_LEASE = 10;
  4. private static final String PATH = "/examples/locks";
  5. public static void main(String[] args) throws Exception
  6. {
  7. FakeLimitedResource resource = new FakeLimitedResource();
  8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  9. client.start();
  10. InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
  11. Collection<Lease> leases = semaphore.acquire(5);
  12. System.out.println("獲取租約數量:" + leases.size());
  13. Lease lease = semaphore.acquire();
  14. System.out.println("獲取單個租約");
  15. resource.use();
  16. Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
  17. System.out.println("獲取租約,如果為空則超時: " + leases2);
  18. System.out.println("釋放租約");
  19. semaphore.returnLease(lease);
  20. System.out.println("釋放集合中的所有租約");
  21. semaphore.returnAll(leases);
  22. client.close();
  23. System.out.println("OK!");
  24. }
  25. }
首先我們先獲得了5個租約,接著請求了一個租約,因為semaphore還有5個租約,所以請求可以滿足,返回一個租約,還剩4個租約。
然後再請求一個租約,因為租約不夠,阻塞到超時,還是沒能滿足,返回結果為null。
3.示例執行結果
    執行結果控制檯如下:
  1. 獲取租約數量:5
  2. 獲取單個租約
  3. 獲取租約,如果為空則超時: null
  4. 釋放租約
  5. 釋放集合中的所有租約
  6. OK!
    此時檢視Zookeeper資料節點如下:

注意:上面所講的4種鎖都是公平鎖(fair)。從ZooKeeper的角度看,每個客戶端都按照請求的順序獲得鎖。相當公平。

5.多鎖物件 Multi Shared Lock

    Multi Shared Lock是一個鎖的容器。當呼叫acquire,所有的鎖都會被acquire,如果請求失敗,所有的鎖都會被release。同樣呼叫release時所有的鎖都被release(失敗被忽略)。基本上,它就是組鎖的代表,在它上面的請求釋放操作都會傳遞給它包含的所有的鎖。
1.主要類說明
主要涉及兩個類:
  • InterProcessMultiLock - 對所物件實現類
  • InterProcessLock - 分散式鎖介面類
它的建構函式需要包含的鎖的集合,或者一組ZooKeeper的path。用法和Shared Lock相同。
  1. public InterProcessMultiLock(CuratorFramework client, List<String> paths)
  2. public InterProcessMultiLock(List<InterProcessLock> locks)
2.編寫示例程式
  1. public class InterProcessMultiLockExample
  2. {
  3. private static final String PATH1 = "/examples/locks1";
  4. private static final String PATH2 = "/examples/locks2";
  5. public static void main(String[] args) throws Exception
  6. {
  7. FakeLimitedResource resource = new FakeLimitedResource();
  8. CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", new ExponentialBackoffRetry(1000, 3));
  9. client.start();
  10. InterProcessLock lock1 = new InterProcessMutex(client, PATH1); // 可重入鎖
  11. InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2); // 不可重入鎖
  12. InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
  13. if (!lock.acquire(10, TimeUnit.SECONDS))
  14. {
  15. throw new IllegalStateException("不能獲取多鎖");
  16. }
  17. System.out.println("已獲取多鎖");
  18. System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
  19. System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
  20. try
  21. {
  22. resource.use(); // 資源操作
  23. }
  24. finally
  25. {
  26. System.out.println("釋放多個鎖");
  27. lock.release(); // 釋放多鎖
  28. }
  29. System.out.println("是否有第一個鎖: " + lock1.isAcquiredInThisProcess());
  30. System.out.println("是否有第二個鎖: " + lock2.isAcquiredInThisProcess());
  31. client.close();
  32. System.out.println("OK!");
  33. }
  34. }
新建一個InterProcessMultiLock,包含一個重入鎖和一個非重入鎖。呼叫acquire後可以看到執行緒同時擁有了這兩個鎖。呼叫release看到這兩個鎖都被釋放了。
注意:再重申一遍,強烈推薦使用ConnectionStateListener監控連線的狀態。
3.示例執行結果
    執行結果控制檯如下:
  1. 已獲取多鎖
  2. 是否有第一個鎖: true
  3. 是否有第二個鎖: true
  4. 釋放多個鎖
  5. 是否有第一個鎖: false
  6. 是否有第二個鎖: false
  7. OK!
    此時檢視Zookeeper資料節點如下:

-------------------------------------------------------------------------------------------------------------------------------