前言

分散式訊號量,之前在 Redisson 中也介紹過,Redisson 的訊號量是將計數維護在 Redis 中的,那現在來看一下 Curator 是如何基於 ZooKeeper 實現訊號量的。

使用 Demo

  1. public class CuratorDemo {
  2. public static void main(String[] args) throws Exception {
  3. String connectString = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183";
  4. RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
  5. CuratorFramework client = CuratorFrameworkFactory
  6. .builder()
  7. .connectString(connectString)
  8. .retryPolicy(retryPolicy)
  9. .build();
  10. client.start();
  11. InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/semaphores/semaphore_01", 3);
  12. for (int i = 0; i < 10; i++) {
  13. new Thread(() -> {
  14. try {
  15. System.out.println(Thread.currentThread() + " 執行緒 start - " + LocalTime.now());
  16. Lease lease = semaphore.acquire();
  17. System.out.println(Thread.currentThread() + " 執行緒 execute - " + LocalTime.now());
  18. Thread.sleep(3000);
  19. System.out.println(Thread.currentThread() + " 執行緒 over -" + LocalTime.now());
  20. semaphore.returnLease(lease);
  21. } catch (Exception e) {
  22. }
  23. }).start();
  24. }
  25. Thread.sleep(1000000);
  26. }
  27. }

控制檯輸出資料如下:

原始碼

獲取憑證

核心原始碼:InterProcessSemaphoreV2#internalAcquire1Lease

這裡僅介紹大概邏輯,有興趣的小夥伴可以自行閱讀原始碼。

lock 是 InterProcessMutexInterProcessSemaphoreV2 訊號量,也是藉助於最基礎的加鎖。

通過圖也可以看出,使用 InterProcessSemaphoreV2 時,會先建立 /semaphores/semaphore_01 路徑,並在路徑下建立 locks 節點。也就是 /semaphores/semaphore_01/locks 路徑下,有 10 個臨時順序節點。

緊接著會在 /semaphores/semaphore_01 路徑下建立 leases 節點,所以建立鎖的臨時順序節點之後,會緊接著在 /semaphores/semaphore_01/leases 下建立臨時順序節點。

/semaphores/semaphore_01/leases 節點進行監聽,同時獲取 /semaphores/semaphore_01/leases 下面的子節點數量。

  1. 如果子節點數量小於等於訊號量計數,則直接結束迴圈;
  2. 如果大於,則會進入 wait 等待喚醒。

釋放憑證

釋放憑證就是呼叫 Lease 的 close 方法,刪除節點,這樣 /semaphores/semaphore_01/leases 上的監聽器就會觸發,然後其他執行緒獲取憑證。

互斥鎖

互斥鎖 InterProcessSemaphoreMutex,不支援重入,其他的和可重入鎖並沒有什麼區別。就是基於 InterProcessSemaphoreV2 實現的。

就是把計數的值 maxLeases 設定為了 1。

總結

訊號量 InterProcessSemaphoreV2 其實是通過判斷節點下的子節點數量來實現控制訊號量,同時內部加鎖是基於可重入鎖 InterProcessMutex 實現的。

互斥鎖 InterProcessSemaphoreMutex 則是將訊號量的技術設定為 1 來實現互斥功能。

相關推薦