分散式鎖解決方案
在多執行緒的軟體世界裡,對共享資源的爭搶過程(Data Race)就是併發,而對共享資源資料進行訪問保護的最直接辦法就是引入鎖。
POSIX threads(簡稱Pthreads)是在多核平臺上進行並行程式設計的一套常用的API。執行緒同步(Thread Synchronization)是並行程式設計中非常重要的通訊手段,其中最典型的應用就是用Pthreads提供的鎖機制(lock)來對多個執行緒之間共 享的臨界區(Critical Section)進行保護(另一種常用的同步機制是barrier)。
無鎖程式設計也是一種辦法,但它不在本文的討論範圍,併發多執行緒轉為單執行緒(Disruptor),函數語言程式設計,鎖粒度控制(ConcurrentHashMap桶),訊號量(Semaphore)等手段都可以實現無鎖或鎖優化。
技術上來說,鎖也可以理解成將大量併發請求序列化,但請注意序列化不能簡單等同為** 排隊 ,因為這裡和現實世界沒什麼不同,排隊意味著大家是公平Fair的領到資源,先到先得,然而很多情況下為了效能考量多執行緒之間還是會不公平Unfair**的去搶。Java中ReentrantLock可重入鎖,提供了公平鎖和非公平鎖兩種實現。
再注意一點,序列也不是意味著只有一個排隊的隊伍,每次只能進一個。當然可以好多個隊伍,每次進入多個。比如餐館一共10個餐桌,服務員可能一次放行最多10個人進去,有人出來再放行同數量的人進去。Java中Semaphore訊號量,相當於同時管理一批鎖。
鎖的型別
自旋鎖(Spin Lock)
自旋鎖是一種非阻塞鎖,也就是說,如果某執行緒需要獲取自旋鎖,但該鎖已經被其他執行緒佔用時,該執行緒不會被掛起,而是在不斷的消耗CPU的時間,不停的試圖獲取自旋鎖。
互斥鎖 (Mutex Lock)
互斥鎖是阻塞鎖,當某執行緒無法獲取互斥鎖時,該執行緒會被直接掛起,不再消耗CPU時間,當其他執行緒釋放互斥鎖後,作業系統會喚醒那個被掛起的執行緒。
可重入鎖 (Reentrant Lock)
可重入鎖是一種特殊的互斥鎖,它可以被同一個執行緒多次獲取,而不會產生死鎖。
鎖舉例
本地鎖
java環境下可以通過synchronized和lock開實現本地鎖。
//synchronized public synchronized void demoMethod(){} public void demoMethod(){ synchronized (this) { //other thread safe code } } private final Object lock = new Object(); public void demoMethod(){ synchronized (lock) { //other thread safe code } } public synchronized static void demoMethod(){} //lock private final Lock queueLock = new ReentrantLock(); public void printJob(Object document) { queueLock.lock(); try { Long duration = (long) (Math.random() * 10000); System.out.println(Thread.currentThread().getName() + ": PrintQueue: Printing a Job during " + (duration / 1000) + " seconds :: Time - " + new Date()); Thread.sleep(duration); } catch (InterruptedException e) { e.printStackTrace(); } finally { System.out.printf("%s: The document has been printed\n", Thread.currentThread().getName()); queueLock.unlock(); } } 複製程式碼
鎖非靜態是鎖了物件的例項;鎖靜態是鎖了物件的型別。
一些特性
- 可重入。如下可以直接進入testWrite方法不用重新申請鎖。synchronized和lock都是可重入鎖。
synchronized void testRead(){ this.testWrite(); } synchronized void testWrite(){} 複製程式碼
- 可中斷鎖。例如A正在執行鎖中的程式碼,另一執行緒B正在等待獲取該鎖如果B可以中斷則該鎖為可中斷鎖。synchronized就不是可中斷鎖,而Lock是可中斷鎖。
- 公平鎖和非公平鎖。以請求鎖的順序來獲取鎖是公平鎖。synchronized是非公平鎖,lock預設是非公平鎖,但是可以設定為公平鎖。
對比
名稱 | 優點 | 缺點 |
---|---|---|
synchronized | 實現簡單,語義清晰,便於JVM堆疊跟蹤,加鎖解鎖過程由JVM自動控制,提供了多種優化方案,使用更廣泛 | 悲觀的排他鎖,不能進行高階功能 |
lock | 可定時的、可輪詢的與可中斷的鎖獲取操作,提供了讀寫鎖、公平鎖和非公平鎖 | 需手動釋放鎖unlock,不適合JVM進行堆疊跟蹤 |
分散式鎖
使用分散式鎖的目的有兩個,一個是避免多次執行冪等操作提升效率;一個是避免多個節點同時執行非冪等操作導致資料不一致。 接下來我們來看如何實現分散式鎖,在java環境下有三種也即通過資料庫,通過redis及通過Zk來實現。
通過資料庫實現
通過主鍵及其他約束使用拋異常來實現分散式鎖不在本文討論範圍。一下為基於資料庫排他鎖來實現分散式鎖
/** * 超時獲取鎖 * @param lockID * @param timeOuts * @return * @throws InterruptedException */ public boolean acquireByUpdate(String lockID, long timeOuts) throws InterruptedException, SQLException { String sql = "SELECT id from test_lock where id = ? for UPDATE "; long futureTime = System.currentTimeMillis() + timeOuts; long ranmain = timeOuts; long timerange = 500; connection.setAutoCommit(false); while (true) { CountDownLatch latch = new CountDownLatch(1); try { PreparedStatement statement = connection.prepareStatement(sql); statement.setString(1, lockID); statement.setInt(2, 1); statement.setLong(1, System.currentTimeMillis()); boolean ifsucess = statement.execute();//如果成功,那麼就是獲取到了鎖 if (ifsucess) return true; } catch (SQLException e) { e.printStackTrace(); } latch.await(timerange, TimeUnit.MILLISECONDS); ranmain = futureTime - System.currentTimeMillis(); if (ranmain <= 0) break; if (ranmain < timerange) { timerange = ranmain; } continue; } return false; } /** * 釋放鎖 * @param lockID * @return * @throws SQLException */ public void unlockforUpdtate(String lockID) throws SQLException { connection.commit(); } 複製程式碼
通過快取系統實現
加鎖
public class RedisTool { private static final String LOCK_SUCCESS = "OK"; private static final String SET_IF_NOT_EXIST = "NX"; private static final String SET_WITH_EXPIRE_TIME = "PX"; /** * 嘗試獲取分散式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @param expireTime 超期時間 * @return 是否獲取成功 */ public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) { String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime); if (LOCK_SUCCESS.equals(result)) { return true; } return false; } } 複製程式碼
第一個為key,我們使用key來當鎖,因為key是唯一的。
第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什麼還要用到value?原因就是我們在上面講到可靠性時,分散式鎖要滿足第四個條件解鈴還須繫鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。requestId可以使用UUID.randomUUID().toString()方法生成。
第三個為nxxx,這個引數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
第四個為expx,這個引數我們傳的是PX,意思是我們要給這個key加一個過期的設定,具體時間由第五個引數決定。
第五個為time,與第四個引數相呼應,代表key的過期時間。
解鎖
public class RedisTool { private static final Long RELEASE_SUCCESS = 1L; /** * 釋放分散式鎖 * @param jedis Redis客戶端 * @param lockKey 鎖 * @param requestId 請求標識 * @return 是否釋放成功 */ public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); if (RELEASE_SUCCESS.equals(result)) { return true; } return false; } } 複製程式碼
第一行程式碼,我們寫了一個簡單的Lua指令碼程式碼
第二行程式碼,我們將Lua程式碼傳到jedis.eval()方法裡,並使引數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua程式碼交給Redis服務端執行。
基於Redlock實現分散式鎖的爭論見
ofollow,noindex">Redlock
通過ZK實現
使用curator來實現分散式鎖。
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException { try { return interProcessMutex.acquire(timeout, unit); } catch (Exception e) { e.printStackTrace(); } return true; } public boolean unlock() { try { interProcessMutex.release(); } catch (Throwable e) { log.error(e.getMessage(), e); } finally { executorService.schedule(new Cleaner(client, path), delayTimeForClean, TimeUnit.MILLISECONDS); } return true; } 複製程式碼