1. 程式人生 > >分散式鎖實現的正確開啟方式

分散式鎖實現的正確開啟方式

## 一、分散式鎖概述 ### 1.1、分散式鎖作用 1)在分散式系統環境下,一個方法在同一時間只能被一個機器的一個執行緒執行 2)具備高可用、高效能的獲取鎖與釋放鎖 3)具備鎖失效機制,防止死鎖 4)具備非阻塞鎖(沒有獲取到鎖將直接返回獲取鎖失敗)或堵塞鎖特性(根據業務需求考慮) ### 1.2、分散式鎖應用場景 1)庫存扣減與增加 分散式鎖保證庫存扣減不會超賣,庫存增加不會造成庫存資料不準確 2)積分抵現 防止積分扣減出現溢位的情況 3)會員禮品核銷 防止禮品核銷多次 ### 1.3、實現方式 1)使用Redis,基於setnx命令或其他。 2)使用ZooKeeper,基於臨時有序節點。 3)使用MySQL,基於唯一索引 ## 二、基於Zookeeper實現分散式鎖 ### 2.1、Zookeeper特性介紹 1)有序節點 假如當前有一個父節點為/lock,我們可以在這個父節點下面建立子節點;zookeeper提供了一個可選的有序特性,例如我們可以建立子節點“/lock/node-”並且指明有序,那麼zookeeper在生成子節點時會根據當前的子節點數量自動新增整數序號,也就是說如果是第一個建立的子節點,那麼生成的子節點為/lock/node-0000000000,下一個節點則為/lock/node-0000000001,依次類推。 2)臨時節點 客戶端可以建立一個臨時節點,在會話結束或者會話超時後,zookeeper會自動刪除該節點。 3)事件監聽 在讀取資料時,我們可以同時對節點設定事件監聽,當節點資料或結構變化時,zookeeper會通知客戶端。當前zookeeper有如下四種事件:**節點建立、節點刪除、節點資料修改、子節點變更**。 ### 2.2、Zookeeper分散式鎖實現(方式一) #### 2.2.1、實現原理 1)客戶端連線zookeeper,並在父節點(/lock)下建立臨時的且有序的子節點,第一個客戶端對應的子節點為/lock/lock-1,第二個為/lock/lock-2,以此類推。 2)客戶端獲取/lock下的子節點列表,**判斷自己建立的子節點是否為當前子節點列表中序號最小的子節點**,如果是則認為獲得鎖,否則監聽/lock的子節點變更訊息,**獲得子節點變更通知後重復此步驟直至獲得鎖**; 3)執行業務程式碼; 4)完成業務流程後,刪除對應的子節點釋放鎖。 #### 2.2.2、實現程式碼 **1.基於curator的zookeeper分散式鎖實現** ``` public static void main(String[] args) throws Exception { //建立zookeeper的客戶端 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient("10.21.41.181:2181,10.21.42.47:2181,10.21.49.252:2181", retryPolicy); client.start(); //建立分散式鎖, 鎖空間的根節點路徑為/curator/lock InterProcessMutex mutex = new InterProcessMutex(client, "/curator/lock"); mutex.acquire(); //獲得了鎖, 進行業務流程 System.out.println("Enter mutex"); //完成業務流程, 釋放鎖 mutex.release(); //關閉客戶端 client.close(); } ``` 2.實現方式二 1)定義變數 ``` /** * Zookeeper客戶端 */ private ZooKeeper zookeeper; /** * 鎖的唯一標識 */ private String lockId; /** * 與Zookeeper建立會話的訊號量 */ private CountDownLatch connectedLatch; /** * 建立分散式鎖的過程中,開始和等待請求建立分散式鎖的訊號標誌 */ private CountDownLatch creatingLatch; /** * 分散式鎖路徑字首 */ private String locksRootPath = "/locks"; /** * 排在當前節點前面一位的節點的路徑 */ private String waitNodeLockPath; /** * 為了獲得鎖,本次建立的節點的路徑 */ private String currentNodeLockPath; ``` 2)建構函式 ``` public ZookeeperTempOrderLock(String lockId) { this.lockId = lockId; try { // 會話超時時間 int sessionTimeout = 30000; // zookeeper = new ZooKeeper("192.168.0.93:2181", sessionTimeout, this); connectedLatch.await(); } catch (IOException ioe) { log.error("與Zookeeper建立連線時出現異常", ioe); } catch (InterruptedException ite) { log.error("等待與Zookeeper會話建立完成時出現異常", ite); } } ``` 3)實現Zookeeper的watcher ``` @Override public void process(WatchedEvent event) { if (Event.KeeperState.SyncConnected == event.getState()) { connectedLatch.countDown(); } if (creatingLatch != null) { creatingLatch.countDown(); } } ``` 4)獲取分散式鎖 ``` /** * 獲取鎖 */ public void acquireDistributedLock() { try { while(!tryLock()) { // 等待前一項服務釋放鎖的等待時間 不能超過一次Zookeeper會話的時間 long waitForPreviousLockRelease = 30000; waitForLock(waitNodeLockPath, waitForPreviousLockRelease); } } catch (InterruptedException | KeeperException e) { log.error("等待上鎖的過程中出現異常", e); } } public boolean tryLock() { try { // 建立順序臨時節點 currentNodeLockPath = zookeeper.create(locksRootPath + "/" + lockId, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 檢視剛剛建立的節點是不是最小節點 // 比如針對於這個同名節點,之前有其它服務曾申請建立過,因此Zookeeper中臨時順序節點形如: // /locks/10000000000, /locks/10000000001, /locks/10000000002 List nodePaths = zookeeper.getChildren(locksRootPath, false); Collections.sort(nodePaths); if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(0))) { // 如果是最小節點,則代表獲取到鎖 return true; } // 如果不是最小節點,則找到比自己小1的節點 (緊挨著自己) int previousLockNodeIndex = -1; for (int i = 0; i < nodePaths.size(); i++) { if(currentNodeLockPath.equals(locksRootPath + "/" + nodePaths.get(i))) { previousLockNodeIndex = i-1; break; } } this.waitNodeLockPath = nodePaths.get(previousLockNodeIndex); } catch (KeeperException | InterruptedException e) { log.error("建立臨時順序節點失敗", e); } return false; } ``` 6)等待其他服務釋放鎖 ``` /** * 等待其他服務釋放鎖 * 實際上就是在等待前一個臨時節點被刪除 * * @param nodePath 希望被刪除的節點的相對路徑 * @param waitTime 等待時長 單位:毫秒 */ private boolean waitForLock(String nodePath, long waitTime) throws KeeperException, InterruptedException { Stat stat = zookeeper.exists(locksRootPath + "/" + nodePath, true); if (stat != null) { this.creatingLatch = new CountDownLatch(1); this.creatingLatch.await(waitTime, TimeUnit.MILLISECONDS); this.creatingLatch = null; } return true; } ``` 7)釋放分散式鎖 ``` /** * 釋放鎖 * 實際上就是刪除當前建立的臨時節點 */ public void releaseLock() { log.info("準備刪除的節點路徑: " + currentNodeLockPath); try { zookeeper.delete(currentNodeLockPath, -1); currentNodeLockPath = null; zookeeper.close(); } catch (Exception e) { log.error("刪除節點失敗", e); } } ``` ### 2.3、Zookeeper分散式鎖實現(方式二) #### 2.3.1、實現原理 假設有兩個服務A、B希望獲得同一把鎖,執行過程大致如下: 1)服務A向zookeeper申請獲得鎖,該請求將嘗試在zookeeper內建立一個臨時節點(ephemeral znode),如果沒有同名的臨時節點存在,則znode建立成功,標誌著服務A成功的獲得了鎖。 2) 服務B向zookeeper申請獲得鎖,同樣嘗試在zookeeper內建立一個臨時節點(名稱必須與服務A的相同),由於**同名znode已經存在,因此請求被拒絕。接著,服務B會在zk中註冊一個監聽器,用於監聽臨時節點被刪除的事件**。 3) 若服務A主動向zk發起請求釋放鎖,或者服務A宕機、斷開與zk的網路連線,zk會將服務A(建立者)建立的臨時節點刪除。而刪除事件也將立刻被監聽器捕獲到,並反饋給服務B。最後,服務B再次向zookeeper申請獲得鎖。 #### 2.3.2、實現程式碼 **基於臨時節點實現Zookeeper分散式鎖** 多個服務如果想競爭同一把鎖,那就向Zookeeper發起建立臨時節點的請求,若能成功建立則獲得鎖,否則藉助監聽器,當監聽到鎖被其它服務釋放(臨時節點被刪除),則自己再請求建立臨時節點,反覆這幾個步驟直到成功建立臨時節點或者與zookeeper建立的會話超時。 步驟: 1)定義變數 ``` /** * 與Zookeeper成功建立連線的訊號標誌 */ private CountDownLatch connectedSemaphore = new CountDownLatch(1); /** * 建立分散式鎖的過程中,開始和等待請求建立分散式鎖的訊號標誌 */ private CountDownLatch creatingSemaphore; /** * Zookeeper客戶端 */ private ZooKeeper zookeeper; /** * 分散式鎖的過期時間 單位:毫秒 */ private static final Long DISTRIBUTED_KEY_OVERDUE_TIME = 30000L; ``` 2)建構函式 ``` public ZookeeperLock() { try { this.zookeeper = new ZooKeeper("192.168.0.93:2181", 5000, new ZookeeperWatcher()); try { connectedSemaphore.await(); } catch (InterruptedException ite) { log.error("等待Zookeeper成功建立連線的過程中,執行緒丟擲異常", ite); } log.info("與Zookeeper成功建立連線"); } catch (Exception e) { log.error("與Zookeeper建立連線時出現異常", e); } } ``` 3)獲取分散式鎖 實際上就是在嘗試建立臨時節點znode create(final String path, byte data[], List acl,CreateMode createMod) path: 從根節點"/"到當前節點的全路徑 data: 當前節點儲存的資料 (由於這裡只是藉助臨時節點的建立來實現分散式鎖,因此無需儲存資料) acl: Access Control list 訪問控制列表 主要涵蓋許可權模式(Scheme)、授權物件(ID)、授予的許可權(Permission)這三個方面 OPEN_ACL_UNSAFE 完全開放的訪問控制 對當前節點進行操作時,無需考慮ACL許可權控制 createMode: 節點建立的模式 EPHEMERAL(臨時節點) 當建立節點的客戶端與zk斷開連線後,臨時節點將被刪除 EPHEMERAL_SEQUENTIAL(臨時順序節點) PERSISTENT(持久節點) PERSISTENT_SEQUENTIAL(持久順序節點) ``` public boolean acquireDistributeLock(Long lockId) { String path = "/product-lock-" + lockId; try { zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); log.info("ThreadId=" + Thread.currentThread().getId() + "建立臨時節點成功"); return true; } catch (Exception e) { // 若臨時節點已存在,則會丟擲異常: NodeExistsException while (true) { // 相當於給znode註冊了一個監聽器,檢視監聽器是否存在 try { Stat stat = zookeeper.exists(path, true); if (stat != null) { this.creatingSemaphore = new CountDownLatch(1); this.creatingSemaphore.await(DISTRIBUTED_KEY_OVERDUE_TIME, TimeUnit.MILLISECONDS); this.creatingSemaphore = null; } zookeeper.create(path, "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); return true; } catch (Exception ex) { log.error("ThreadId=" + Thread.currentThread().getId() + ",檢視臨時節點時出現異常", ex); } } } } ``` 4)釋放分散式鎖 ``` public void releaseDistributedLock(Long lockId) { String path = "/product-lock-" + lockId; try { // 第二個引數version是資料版本 每次znode內資料發生變化,都會使version自增,但由於分散式鎖建立的臨時znode沒有存資料,因此version=-1 zookeeper.delete(path, -1); log.info("成功釋放分散式鎖, lockId=" + lockId + ", ThreadId=" + Thread.currentThread().getId()); } catch (Exception e) { log.error("釋放分散式鎖失敗,lockId=" + lockId, e); } } ``` 5)建立Zookeeper的watcher 不論是zk客戶端與伺服器連線成功,還是刪除節點,watcher監聽到的事件都是SyncConnected ``` private class ZookeeperWatcher implements Watcher { @Override public void process(WatchedEvent event) { log.info("接收到事件: " + event.getState() + ", ThreadId=" + Thread.currentThread().getId()); if (Event.KeeperState.SyncConnected == event.getState()) { connectedSemaphore.countDown(); } if (creatingSemaphore != null) { creatingSemaphore.countDown(); } } } ``` 6)main方式運用 建立了兩個執行緒,其中第一個執行緒先執行,且持有鎖5秒鐘才釋放鎖,第二個執行緒後執行,當且僅當第一個執行緒釋放鎖(刪除臨時節點)後,第二個執行緒才能成功獲取鎖。 ``` public static void main(String[] args) throws InterruptedException{ long lockId = 20200730; new Thread(() ->
{ ZookeeperLock zookeeperLock = new ZookeeperLock(); System.out.println("ThreadId1=" + Thread.currentThread().getId()); System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分散式鎖: " + zookeeperLock.acquireDistributeLock(lockId)); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { log.error("ThreadId=" + Thread.currentThread().getId() + "暫停時出現異常", e); } zookeeperLock.releaseDistributedLock(lockId); }).start(); TimeUnit.SECONDS.sleep(1); new Thread(() ->
{ ZookeeperLock zookeeperLock = new ZookeeperLock(); System.out.println("ThreadId2=" + Thread.currentThread().getId()); System.out.println("ThreadId=" + Thread.currentThread().getId() + "獲取到分散式鎖: " + zookeeperLock.acquireDistributeLock(lockId)); }).start(); } ``` ## 三、基於Redis實現分散式鎖 ### 3.1、普通常見實現方式 #### 3.1.1、實現程式碼 ``` public String deductStock() { String lockKey = "product_001"; try { /*Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa"); //jedis.setnx stringRedisTemplate.expire(lockKey, 30, TimeUnit.SECONDS); //設定超時*/ //為解決原子性問題將設定鎖和設定超時時間合併 Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "aaa", 10, TimeUnit.SECONDS); //未設定成功,當前key已經存在了,直接返回錯誤 if (!result) { return "error_code"; } //業務邏輯實現,扣減庫存 .... } catch (Exception e) { e.printStackTrace(); }finally { stringRedisTemplate.delete(lockKey); } return "end"; } ``` #### 3.2.2、問題分析 上述程式碼可以看到,當前鎖的失效時間為10s,如果當前扣減庫存的業務邏輯執行需要15s時,高併發時會出現問題: - 執行緒1,首先執行到10s後,鎖(product_001)失效 - 執行緒2,在第10s後同樣進入當前方法,此時加上鎖(product_001) - 當執行到15s時,執行緒1刪除執行緒2加的鎖(product_001) - 執行緒3,可以加鎖 .... 如此迴圈,實際鎖已經沒有意義 #### 3.2.3、解決方案 定義一個子執行緒,定時去檢視**是否存在主執行緒的持有當前鎖**,如果**存在則為其延長過期時間**。 ### 3.2、基於Redission實現方式 #### 3.2.1、Redission簡介 Jedis是Redis的Java實現的客戶端,其API提供了比較全面的Redis命令的支援。Redission也是Redis的客戶端,相比於Jedis功能簡單。Jedis簡單使用阻塞的I/O和redis互動,**Redission通過Netty支援非阻塞I/O**。 Redission封裝了鎖的實現,其繼承了java.util.concurrent.locks.Lock的介面,讓我們像操作我們的本地Lock一樣去操作Redission的Lock。 常用API: ``` RLock redissonLock = redission.getLock(); redissionLock.lock(30,TmieUnit.SECONDS);加鎖並設定鎖的存活時間 redissionLock.unLock();解鎖 ``` #### 3.2.2、實現原理 ![](https://img2020.cnblogs.com/blog/999804/202103/999804-20210309114453413-1096973407.png) - 多個執行緒去執行lock操作,僅有一個執行緒能夠加鎖成功,其它執行緒迴圈阻塞。 - 加鎖成功,鎖超時時間**預設30s**,並開啟後臺執行緒(子執行緒),加鎖的後臺會**每隔10秒**去檢測執行緒持有的鎖是否存在,還在的話,就延遲鎖超時時間,重新設定為30s,即**鎖延期**。 - 對於原子性,Redis分散式鎖底層藉助**Lua指令碼實現鎖的原子性**。鎖延期是通過在底層用Lua進行延時,延時檢測時間是對超時時間timeout /3。 **1)簡單實現程式碼:** ``` public String deductStockRedission() { String lockKey = "product_001"; RLock rlock = redission.getLock(lockKey); try { rlock.lock(); //業務邏輯實現,扣減庫存 .... } catch (Exception e) { e.printStackTrace(); } finally { rlock.unlock(); } return "end"; } ``` **2)分析Redission適用原因:** 1)redisson所有指令都通過lua指令碼執行,redis支援lua指令碼原子性執行 2)redisson設定一個key的預設過期時間為30s,如果某個客戶端持有一個鎖超過了30s怎麼辦? redisson中有一個**watchdog的概念,翻譯過來就是看門狗**,它會在你獲取鎖之後,每隔10秒幫你把key的超時時間設為30s 這樣的話,就算一直持有鎖也不會出現key過期了,其他執行緒獲取到鎖的問題了。保證了沒有死鎖發生 **3)Redisson的可重入鎖** Redis**儲存鎖的資料型別是** **Hash**型別 Hash資料型別的**key值包含了當前執行緒資訊**。 ![](https://img2020.cnblogs.com/blog/999804/202103/999804-20210309114536319-1514297211.png) #### 3.2.3、問題分析及對應方案 **1)主從同步問題** **問題分析:**  當主Redis加鎖了,開始執行執行緒,若還未將鎖通過非同步同步的方式同步到從Redis節點,主節點就掛了,此時會把某一臺從節點作為新的主節點,此時別的執行緒就可以加鎖了,這樣就出錯了,怎麼辦? **解決方案:** ​ 1)採用zookeeper代替Redis   由於zk叢集的特點,其支援的是CP。而Redis叢集支援的則是AP。 ​ 2)採用RedLock ![](https://img2020.cnblogs.com/blog/999804/202103/999804-20210309114604197-1331495068.png) 假設有3個redis節點,這些節點之間既沒有主從,也沒有叢集關係。客戶端用相同的key和隨機值在3個節點上請求鎖,請求鎖的超時時間應小於鎖自動釋放時間。當在2個(超過半數)redis上請求到鎖的時候,才算是真正獲取到了鎖。如果沒有獲取到鎖,則把部分已鎖的redis釋放掉。 ``` public String deductStockRedlock() { String lockKey = "product_001"; //TODO 這裡需要自己例項化不同redis例項的redission客戶端連線,這裡只是虛擬碼用一個redisson客戶端簡化了 RLock rLock1 = redisson.getLock(lockKey); RLock rLock2 = redisson.getLock(lockKey); RLock rLock3 = redisson.getLock(lockKey); // 向3個redis例項嘗試加鎖 RedissonRedLock redLock = new RedissionRedLock(rLock1, rLock2, rLock3); boolean isLock; try { // 500ms拿不到鎖, 就認為獲取鎖失敗。10000ms即10s是鎖失效時間。 isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS); System.out.println("isLock = " + isLock); if (isLock) { //業務邏輯處理 ... } } catch (Exception e) { } finally { // 無論如何, 最後都要解鎖 redLock.unlock(); } } ``` 不太推薦使用。如果考慮高可用併發推薦使用Redisson,考慮一致性推薦使用zookeeper。 2)**提高併發:分段鎖** 由於Redission實際上就是**將並行的請求,轉化為序列請求**。這樣就降低了併發的響應速度,為了解決這一問題,可以將鎖進行分段處理:例如秒殺商品001,原本存在1000個商品,可以將其分為20段,為每段分配50個商品。 比如: ​ 將庫存進行分段,放入redis中,例如1000庫存,可分10段放入Redis ​ key的設計可以為Product:10001:0 | Product:10001:1 .... ​ Redis底層叢集,將根據key,計算器槽位,放入不同節點中 **參考文章:** https://blog.csdn.net/miaomiao19971215/article/details/107564197 https://www.cnblogs.com/bbgs-xc/p/14412646.html#_label1_0 https://www.cnblogs.com/wei57960/p/14059772.html https://www.cnblogs.com/jay-huaxiao/p/14503018.html