1. 程式人生 > >輕鬆構建微服務之分散式鎖

輕鬆構建微服務之分散式鎖

>微信公眾號:核心小王子 關注可瞭解更多關於資料庫,JVM核心相關的知識; 如果你有任何疑問也可以加我pigpdong[^1]

前言

在多執行緒情況下訪問資源,我們需要加鎖來保證業務的正常進行,JDK中提供了很多併發控制相關的工具包,來保證多執行緒下可以高效工作,同樣在分散式環境下,有些互斥操作我們可以藉助分散式鎖來實現兩個操作不能同時執行,必須等到另外一個任務結束了把鎖釋放了才能獲取鎖然後執行,因為跨JVM我們需要一個第三方系統來協助實現分散式鎖,一般我們可以用 資料庫,redis,zookeeper,etcd等來實現.

要實現一把分散式鎖,我們需要先分析下這把鎖有哪些特性

  • 1.在分散式叢集中,也就是不同的JVM中,相互有衝突的方法,可以是不同JVM相同例項內的同一個方法,也可以是不同方法,也就是不同業務間的隔離和同一個業務操作不能並行執行,而分散式鎖需要保證這兩個方法在同一時間只能有一個執行.

  • 2.這把鎖最好是可重入的,因為不可重入的鎖很容易出現死鎖

  • 3.獲取鎖和釋放鎖的效能要很高

  • 4.支援獲取鎖的時候可以阻塞等待,以及等待時間

  • 5.獲取鎖後支援設定一個期限,超過這個期限可以自動釋放,防止程式沒有自己釋放的情況

  • 6.這是一把輕量鎖,對業務侵入小

  • 7.易用

資料庫實現分散式鎖

由於資料庫的鎖無能是在效能高可用上都不及其他方式,這裡我們簡單介紹下可能的方案

  • 1.獲取鎖的時候,往資料庫裡插入一條記錄,可以根據方法名作唯一鍵約束,其他執行緒獲取鎖的時候無法插入所以會等待,釋放鎖的時候刪除,這種方式不支援可重入
  • 2.根據資料庫的排他鎖 for update實現,當commit的時候釋放,這種方式如果鎖不釋放就會一直佔有一個connection,而且加鎖導致效能低
  • 3.將每一個鎖作為表裡的一條記錄,這個記錄加一個狀態,每次獲取鎖的時候都update status = 1 where status = -1,這種類似CAS的方式可以解決排他鎖效能低.但是mysql是一個單點,而且和業務系統關聯,因為兩個業務方可能屬於不同系統不同資料庫,如果做到不和業務關聯還需要增加一次RPC請求,將鎖業務抽為一個單獨系統,不夠輕量

redis的分散式鎖

SET resource_name my_random_value NX PX 30000
  • SET NX 只會在key不存在的時候給key賦值,當多個程序同時爭搶鎖資源的時候,會下發多個SET NX只會有一個返回成功,並且SET NX對外是一個原子操作
  • PX 設定過期時間,代表這個key的存活時間,也就是獲取到的鎖只會佔有這麼長,超過這個時間將會自動釋放
  • my_random_value 一般是全域性唯一值,這個隨機數一般可以用時間戳加隨機數,這種方式在多機器例項上可能不唯一,如果需要保證絕對唯一可以採用UUID,但是效能會有影響,這個值的用途會在鎖釋放的時候用到

我們可以看看下面獲取分散式鎖的使用場景,假設我們釋放鎖,直接del這個key

if (!redisComponent.acquireLock(lockKey) {
    LOGGER.warn(">>分散式併發鎖獲取失敗");
    return ;
}

try {
      // do  business  ...
} catch (BusinessException e) {
      // exception handler  ...
} finally {
  redisComponent.releaseLock(lockKey);
}

  • 1.程序A獲取到鎖,超時時間為1分鐘
  • 2.1分鐘時間到,程序A還沒有處理完,鎖自動釋放了
  • 3.程序B獲取到鎖,開始進行業務處理
  • 4.程序A處理結束,釋放鎖,這個時候將程序B獲取到的鎖釋放了
  • 5.程序C獲取到鎖,開始業務處理,程序B還沒有處理結束,結果B和C開始並行處理,發生併發

為了解決以上問題,我們可以在釋放鎖的時候,判斷下鎖是否存在,這樣程序A在釋放鎖的時候就不會將程序B加的鎖釋放了, 或者通過以下方式,將過期時間做為value儲存在對應的key中,釋放鎖的時候,判斷當前時間是否小於過期時間,只有小於當前時間才處理,我們也可以在進行del操作的時候判斷下對應的value是否相等,這個時候就需要在del操作的時候傳人 my_random_value

下面我們看下redis實現分散式鎖java程式碼實現,我們採用在del的時候判斷下當前時間是否小於過期時間

 public boolean acquireLock(String lockKey, long expired) {

        ShardedJedis jedis = null;

        try {

            jedis = pool.getResource();
            String value = String.valueOf(System.currentTimeMillis() + expired + 1);
            int tryTimes = 0;

            while (tryTimes++ < 3) {

                /*
                 *  1. 嘗試鎖
                 *  setnx : set if not exist
                 */
                if (jedis.setnx(lockKey, value).equals(1L)) {
                    return true;
                }

                /*
                 * 2. 已經被別的執行緒鎖住,判斷是否失效
                 */
                String oldValue = jedis.get(lockKey);
                if (StringUtils.isBlank(oldValue)) {
                    /*
                     * 2.1 value存的是超時時間,如果為空有2種情況
                     *      1. 異常資料,沒有value 或者 value為空字元
                     *      2. 鎖恰好被別的執行緒釋放了
                     * 此時需要嘗試重新嘗試,為了避免出現情況1時導致死迴圈,只重試3次
                     */
                    continue;
                }

                Long oldValueL = Long.valueOf(oldValue);
                if (oldValueL < System.currentTimeMillis()) {
                    /*
                     * 已超時,重新嘗試鎖
                     *
                     * Redis:getSet 操作步驟:
                     *      1.獲取 Key 對應的 Value 作為返回值,不存在時返回null
                     *      2.設定 Key 對應的 Value 為傳入的值
                     * 這裡如果返回的 getValue != oldValue 表示已經被其它執行緒重新修改了
                     */
                    String getValue = jedis.getSet(lockKey, value);
                    return oldValue.equals(getValue);
                } else {
                    // 未超時,則直接返回失敗
                    return false;
                }
            }

            return false;

        } catch (Throwable e) {
            logger.error("acquireLock error", e);
            return false;

        } finally {
            returnResource(jedis);
        }
    }


	/**
	 * 釋放鎖
	 *
	 * @param lockKey
	 *            key
	 */
	public void releaseLock(String lockKey) {
		ShardedJedis jedis = null;
		try {
			jedis = pool.getResource();
			long current = System.currentTimeMillis();
			// 避免刪除非自己獲取到的鎖
			String value = jedis.get(lockKey);
			if (StringUtils.isNotBlank(value) && current < Long.valueOf(value)) {
				jedis.del(lockKey);
			}
		} catch (Throwable e) {
			logger.error("releaseLock error", e);
		} finally {
			returnResource(jedis);
		}
	}

這種方式沒有用到剛剛說的my_random_value,我們看下如果我們按以下程式碼獲取鎖會有什麼問題

if (!redisComponent.acquireLock(lockKey) {
    LOGGER.warn(">>分散式併發鎖獲取失敗");
    return ;
}

try {
boolean locked = redisComponent.acquireLock(lockKey);
if(locked)
      // do  business  ...
} catch (BusinessException e) {
      // exception handler  ...
} finally {
  redisComponent.releaseLock(lockKey);
}

同樣這種方式當程序A沒有獲取到鎖,之後程序B獲取到鎖,程序A會釋放程序B的鎖,這個時候我們可以藉助my_random_value來實現

	/**
	 * 釋放鎖
	 *
	 * @param lockKey ,value
	 */
	public void releaseLock(String lockKey, long oldvalue) {
		ShardedJedis jedis = null;
		try {
			jedis = pool.getResource();
			String value = jedis.get(lockKey);
			if (StringUtils.isNotBlank(value) && oldvalue == Long.valueOf(value)) {
				jedis.del(lockKey);
			}
		} catch (Throwable e) {
			logger.error("releaseLock error", e);
		} finally {
			returnResource(jedis);
		}
	}

這種方式需要儲存之前獲取鎖時候的value值,並在釋放鎖的帶上value值,不過這種實現方式,value的值為過期時間也不唯一

由於我們用了redis得超時機制來釋放鎖,那麼當程序在鎖租約到期後還沒有執行結束,那麼其他程序獲取到鎖後則會產生併發寫的情況,這種如果業務上需要精確控制,只能用樂觀鎖來控制了,每次寫入資料都帶一個鎖的版本,如果下次獲取鎖的時候版本加1,這樣上面那種情況,鎖到期釋放了新的程序獲取到鎖後會使用新的版本號,之前的程序鎖已經釋放了如果繼續使用該鎖則會發現版本已經不對了

zookeeper實現分散式鎖

可以藉助zookeeper的順序節點,在一個父節點下,所有需要爭搶鎖的資源都去這個目錄下建立一個順序節點,然後判斷這個臨時順序節點是否是兄弟節點中順序最小的,如果是最小的則獲取到鎖,如果不是則監聽這個順序最小的節點的刪除事件,然後在繼續根據這個流程獲取最小節點

 public void lock() {
        try {

            // 建立臨時子節點
            String myNode = zk.create(root + "/" + lockName , data, ZooDefs.Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);

            System.out.println(j.join(Thread.currentThread().getName() + myNode, "created"));

            // 取出所有子節點
            List<string> subNodes = zk.getChildren(root, false);
            TreeSet<string> sortedNodes = new TreeSet&lt;&gt;();
            for(String node :subNodes) {
                sortedNodes.add(root +"/" +node);
            }

            String smallNode = sortedNodes.first();
            String preNode = sortedNodes.lower(myNode);

            if (myNode.equals( smallNode)) {
                // 如果是最小的節點,則表示取得鎖
                System.out.println(j.join(Thread.currentThread().getName(), myNode, "get lock"));
                this.nodeId.set(myNode);
                return;
            }

            CountDownLatch latch = new CountDownLatch(1);
            Stat stat = zk.exists(preNode, new LockWatcher(latch));// 同時註冊監聽。
            // 判斷比自己小一個數的節點是否存在,如果不存在則無需等待鎖,同時註冊監聽
            if (stat != null) {
                System.out.println(j.join(Thread.currentThread().getName(), myNode,
                        " waiting for " + root + "/" + preNode + " released lock"));

                latch.await();// 等待,這裡應該一直等待其他執行緒釋放鎖
                nodeId.set(myNode);
                latch = null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }

    }

    public void unlock() {
        try {
            System.out.println(j.join(Thread.currentThread().getName(), nodeId.get(), "unlock "));
            if (null != nodeId) {
                zk.delete(nodeId.get(), -1);
            }
            nodeId.remove();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

當然如果我們開發環境使用的是etcs也可以用etcd來實現分散式鎖,原理和zookeeper類似</stri