【原創】redis庫存操作,分散式鎖的四種實現方式[連載二]--基於Redisson實現分散式鎖
一、redisson介紹
redisson實現了分散式和可擴充套件的java資料結構,支援的資料結構有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。並且是執行緒安全的,底層使用Netty 4實現網路通訊。和jedis相比,功能比較簡單,不支援排序,事務,管道,分割槽等redis特性,可以認為是jedis的補充,不能替換jedis。
二、redisson幾種鎖介紹
1、可重入鎖(Reentrant Lock)
基於Redis的Redisson分散式可重入鎖RLock
java.util.concurrent.locks.Lock
介面
1 RLock lock = redisson.getLock("anyLock"); 2 // 最常見的使用方法 3 lock.lock();
另外Redisson還通過加鎖的方法提供了leaseTime
的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
1 // 加鎖以後10秒鐘自動解鎖 2 // 無需呼叫unlock方法手動解鎖 3 lock.lock(10, TimeUnit.SECONDS); 4 5 // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);7 ... 8 lock.unlock();
Redisson同時還為分散式鎖提供了非同步執行的相關方法:
1 RLock lock = redisson.getLock("anyLock"); 2 lock.lockAsync(); 3 lock.lockAsync(10, TimeUnit.SECONDS); 4 Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);
2.公平鎖(Fair Lock)
基於Redis的Redisson分散式可重入公平鎖也是實現了java.util.concurrent.locks.Lock
RLock
物件。它保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。
1 RLock fairLock = redisson.getFairLock("anyLock"); 2 // 最常見的使用方法 3 fairLock.lock();
同樣的,Fair lock也提供加鎖時間
1 // 10秒鐘以後自動解鎖 2 // 無需呼叫unlock方法手動解鎖 3 fairLock.lock(10, TimeUnit.SECONDS); 4 5 // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 6 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); 7 ... 8 fairLock.unlock();
Redisson同時還為分散式可重入公平鎖提供了非同步執行的相關方法:
1 RLock fairLock = redisson.getFairLock("anyLock"); 2 fairLock.lockAsync(); 3 fairLock.lockAsync(10, TimeUnit.SECONDS); 4 Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
3.聯鎖(MultiLock)
基於Redis的Redisson分散式聯鎖RedissonMultiLock
物件可以將多個RLock
物件關聯為一個聯鎖,每個RLock
物件例項可以來自於不同的Redisson例項。
1 RLock lock1 = redissonInstance1.getLock("lock1"); 2 RLock lock2 = redissonInstance2.getLock("lock2"); 3 RLock lock3 = redissonInstance3.getLock("lock3"); 4 5 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); 6 // 同時加鎖:lock1 lock2 lock3 7 // 所有的鎖都上鎖成功才算成功。 8 lock.lock(); 9 ... 10 lock.unlock();
另外Redisson還通過加鎖的方法提供了leaseTime
的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
1 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); 2 // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開 3 lock.lock(10, TimeUnit.SECONDS); 4 5 // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開 6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); 7 ... 8 lock.unlock();
4.紅鎖(Red Lock)
基於Redis的Redisson紅鎖RedissonRedLock
物件實現了Redlock介紹的加鎖演算法。該物件也可以用來將多個RLock
物件關聯為一個紅鎖,每個RLock
物件例項可以來自於不同的Redisson例項。
1 RLock lock1 = redissonInstance1.getLock("lock1"); 2 RLock lock2 = redissonInstance2.getLock("lock2"); 3 RLock lock3 = redissonInstance3.getLock("lock3"); 4 5 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); 6 // 同時加鎖:lock1 lock2 lock3 7 // 紅鎖在大部分節點上加鎖成功就算成功。 8 lock.lock(); 9 ... 10 lock.unlock();
另外Redisson還通過加鎖的方法提供了leaseTime
的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
1 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); 2 // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開 3 lock.lock(10, TimeUnit.SECONDS); 4 5 // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開 6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); 7 ... 8 lock.unlock();
5. 讀寫鎖(ReadWriteLock)
基於Redis的Redisson分散式可重入讀寫鎖RReadWriteLock
Java物件實現了java.util.concurrent.locks.ReadWriteLock
介面。同時還支援自動過期解鎖。該物件允許同時有多個讀取鎖,但是最多隻能有一個寫入鎖。
1 RReadWriteLock rwlock = redisson.getLock("anyRWLock"); 2 // 最常見的使用方法 3 rwlock.readLock().lock(); 4 // 或 5 rwlock.writeLock().lock();
另外Redisson還通過加鎖的方法提供了leaseTime
的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。
// 10秒鐘以後自動解鎖 // 無需呼叫unlock方法手動解鎖 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();
6. 閉鎖(CountDownLatch)
基於Redisson的Redisson分散式閉鎖(CountDownLatch)Java物件RCountDownLatch
採用了與java.util.concurrent.CountDownLatch
相似的介面和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.trySetCount(1); latch.await(); // 在其他執行緒或其他JVM裡 RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch"); latch.countDown();
三、redisson分散式鎖在業務中的應用
1、引入相關pom
<!-- redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.5.0</version> </dependency>
2、將redisson交由spring管理,本文采用redisson的叢集模式裝配,另外可配置哨兵模式,單點等
/** * redisson客戶端引數配置類 */ @Configuration @Data public class RedissonProperties { private int idleConnectionTimeout = 10000; private int pingTimeout = 1000; private int connectTimeout = 10000; private int timeout = 3000; private int retryAttempts = 3; private int retryInterval = 1500; private int reconnectionTimeout = 3000; private int failedAttempts = 3; private int subscriptionsPerConnection = 5; private String clientName = "none"; private int subscriptionConnectionMinimumIdleSize = 64; private int subscriptionConnectionPoolSize = 256; private int slaveConnectionMinimumIdleSize = 64; private int slaveConnectionPoolSize = 256; private int masterConnectionMinimumIdleSize = 64; private int masterConnectionPoolSize = 256; private ReadMode readMode = ReadMode.MASTER; private SubscriptionMode subscriptionMode = SubscriptionMode.MASTER; private int scanInterval = 1000; @Value("${rediscluster.pwd}") private String password; @Value("${redis.cluster}") private String nodeAddress; @Value("${redis.cluster1}") private String nodeAddress1; @Value("${redis.cluster2}") private String nodeAddress2; }
/** * 初始化redisson Bean * * @author LiJunJun * @date 2018/10/19 */ @Configuration public class RedissonAutoConfiguration { @Autowired private RedissonProperties redssionProperties; /** * 叢集模式自動裝配 * * @return */ @Bean public RedissonClient redissonClient() { Config config = new Config(); String passWord = redssionProperties.getPassword(); ClusterServersConfig serverConfig = config.useClusterServers(); serverConfig.addNodeAddress(redssionProperties.getNodeAddress(), redssionProperties.getNodeAddress1(), redssionProperties.getNodeAddress2()); serverConfig.setPingTimeout(redssionProperties.getPingTimeout()); serverConfig.setConnectTimeout(redssionProperties.getConnectTimeout()); serverConfig.setTimeout(redssionProperties.getTimeout()); serverConfig.setRetryAttempts(redssionProperties.getRetryAttempts()); serverConfig.setRetryInterval(redssionProperties.getRetryInterval()); serverConfig.setReconnectionTimeout(redssionProperties.getReconnectionTimeout()); serverConfig.setFailedAttempts(redssionProperties.getFailedAttempts()); serverConfig.setSubscriptionsPerConnection(redssionProperties.getSubscriptionsPerConnection()); serverConfig.setClientName(redssionProperties.getClientName()); serverConfig.setSubscriptionConnectionMinimumIdleSize(redssionProperties.getSubscriptionConnectionMinimumIdleSize()); serverConfig.setSubscriptionConnectionPoolSize(redssionProperties.getSubscriptionConnectionPoolSize()); serverConfig.setSlaveConnectionMinimumIdleSize(redssionProperties.getSlaveConnectionMinimumIdleSize()); serverConfig.setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize()); serverConfig.setMasterConnectionMinimumIdleSize(redssionProperties.getMasterConnectionMinimumIdleSize()); serverConfig.setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize()); serverConfig.setReadMode(redssionProperties.getReadMode()); serverConfig.setSubscriptionMode(redssionProperties.getSubscriptionMode()); serverConfig.setScanInterval(redssionProperties.getScanInterval()); serverConfig.setPassword(StringUtils.isNotBlank(passWord) && !"null".equals(passWord) ? passWord : null); return Redisson.create(config); } }
3、業務程式碼中的應用。此處使用的是悲觀鎖,即必須拿到鎖之後才能繼續往下執行,也可使用樂觀鎖,tryLock,利用重試去獲取鎖
/** * redissonClient */ @Resource private RedissonClient redissonClient; /** * 減庫存 * * @param trace 請求流水 * @param stockManageReq(stockId、decrNum) * @return -1為失敗,大於-1的正整數為減後的庫存量,-2為庫存不足無法減庫存 */ @Override @ApiOperation(value = "減庫存", notes = "減庫存") @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE) public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) { long startTime = System.currentTimeMillis(); LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq)); int res = 0; String stockId = stockManageReq.getStockId(); Integer decrNum = stockManageReq.getDecrNum(); // 新增分散式鎖 RLock stockLock = null; try { if (null != stockId && null != decrNum) { stockId = PREFIX + stockId; // 新增分散式鎖 stockLock = redissonClient.getFairLock(stockId); stockLock.lock(); // redis 減庫存邏輯 String vStock = redisStockPool.get(stockId); long realV = 0L; if (StringUtils.isNotEmpty(vStock)) { realV = Long.parseLong(vStock); } //庫存數 大於等於 要減的數目,則執行減庫存 if (realV >= decrNum) { Long v = redisStockPool.decrBy(stockId, decrNum); res = v.intValue(); } else { res = -2; } stockLock.unlock(); } } catch (Exception e) { LOGGER.error(trace, "decr sku stock failure.", e); res = -1; } finally { if (stockLock != null && stockLock.isLocked() && stockLock.isHeldByCurrentThread()) { stockLock.unlock(); } LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res)); } return res; }
四、ab壓測及結果分析
同樣的,我們以5000的請求量100的併發量來壓、tps在330左右,相對於zk做分散式鎖來看,提升了10倍的效能,但仍然不能滿足我們的要求
五、總結
redisson提供了豐富的分散式鎖實現機制,並且使用起來相對比較簡單方便,具體選用哪種鎖,可以根據業務來選擇,但在高併發的情況下,效能還是有些差強人意,下一篇,我們使用redis的watch來實現分散式鎖。