1. 程式人生 > >【原創】redis庫存操作,分散式鎖的四種實現方式[連載二]--基於Redisson實現分散式鎖

【原創】redis庫存操作,分散式鎖的四種實現方式[連載二]--基於Redisson實現分散式鎖

一、redisson介紹

redisson實現了分散式和可擴充套件的java資料結構,支援的資料結構有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。並且是執行緒安全的,底層使用Netty 4實現網路通訊。和jedis相比,功能比較簡單,不支援排序,事務,管道,分割槽等redis特性,可以認為是jedis的補充,不能替換jedis。

二、redisson幾種鎖介紹

1、可重入鎖(Reentrant Lock)

基於Redis的Redisson分散式可重入鎖RLock

 Java物件實現了java.util.concurrent.locks.Lock介面

1 RLock lock = redisson.getLock("anyLock");
2 // 最常見的使用方法
3 lock.lock();

另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

1 // 加鎖以後10秒鐘自動解鎖
2 // 無需呼叫unlock方法手動解鎖
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ... 8 lock.unlock();

Redisson同時還為分散式鎖提供了非同步執行的相關方法:

1 RLock lock = redisson.getLock("anyLock");
2 lock.lockAsync();
3 lock.lockAsync(10, TimeUnit.SECONDS);
4 Future<Boolean> res = lock.tryLockAsync(100, 10, TimeUnit.SECONDS);

2.公平鎖(Fair Lock)

基於Redis的Redisson分散式可重入公平鎖也是實現了java.util.concurrent.locks.Lock

介面的一種RLock物件。它保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。

1 RLock fairLock = redisson.getFairLock("anyLock");
2 // 最常見的使用方法
3 fairLock.lock();

同樣的,Fair lock也提供加鎖時間

1 // 10秒鐘以後自動解鎖
2 // 無需呼叫unlock方法手動解鎖
3 fairLock.lock(10, TimeUnit.SECONDS);
4 
5 // 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
6 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 fairLock.unlock();

Redisson同時還為分散式可重入公平鎖提供了非同步執行的相關方法:

1 RLock fairLock = redisson.getFairLock("anyLock");
2 fairLock.lockAsync();
3 fairLock.lockAsync(10, TimeUnit.SECONDS);
4 Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3.聯鎖(MultiLock)

基於Redis的Redisson分散式聯鎖RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件例項可以來自於不同的Redisson例項。

 1 RLock lock1 = redissonInstance1.getLock("lock1");
 2 RLock lock2 = redissonInstance2.getLock("lock2");
 3 RLock lock3 = redissonInstance3.getLock("lock3");
 4 
 5 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
 6 // 同時加鎖:lock1 lock2 lock3
 7 // 所有的鎖都上鎖成功才算成功。
 8 lock.lock();
 9 ...
10 lock.unlock();

另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

1 RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
2 // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 lock.unlock();

4.紅鎖(Red Lock)

基於Redis的Redisson紅鎖RedissonRedLock物件實現了Redlock介紹的加鎖演算法。該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件例項可以來自於不同的Redisson例項。

 1 RLock lock1 = redissonInstance1.getLock("lock1");
 2 RLock lock2 = redissonInstance2.getLock("lock2");
 3 RLock lock3 = redissonInstance3.getLock("lock3");
 4 
 5 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
 6 // 同時加鎖:lock1 lock2 lock3
 7 // 紅鎖在大部分節點上加鎖成功就算成功。
 8 lock.lock();
 9 ...
10 lock.unlock();

另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

1 RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
2 // 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開
3 lock.lock(10, TimeUnit.SECONDS);
4 
5 // 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
6 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
7 ...
8 lock.unlock();

5. 讀寫鎖(ReadWriteLock)

基於Redis的Redisson分散式可重入讀寫鎖RReadWriteLock Java物件實現了java.util.concurrent.locks.ReadWriteLock介面。同時還支援自動過期解鎖。該物件允許同時有多個讀取鎖,但是最多隻能有一個寫入鎖。

1 RReadWriteLock rwlock = redisson.getLock("anyRWLock");
2 // 最常見的使用方法
3 rwlock.readLock().lock();
4 //
5 rwlock.writeLock().lock();

另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

// 10秒鐘以後自動解鎖
// 無需呼叫unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
//
rwlock.writeLock().lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
//
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6. 閉鎖(CountDownLatch)

基於Redisson的Redisson分散式閉鎖(CountDownLatch)Java物件RCountDownLatch採用了與java.util.concurrent.CountDownLatch相似的介面和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他執行緒或其他JVM裡
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

三、redisson分散式鎖在業務中的應用

1、引入相關pom

        <!-- redisson -->
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.5.0</version>
        </dependency>

2、將redisson交由spring管理,本文采用redisson的叢集模式裝配,另外可配置哨兵模式,單點等

/**
 * redisson客戶端引數配置類
 */
@Configuration
@Data
public class RedissonProperties {

    private int idleConnectionTimeout = 10000;

    private int pingTimeout = 1000;

    private int connectTimeout = 10000;

    private int timeout = 3000;

    private int retryAttempts = 3;

    private int retryInterval = 1500;

    private int reconnectionTimeout = 3000;

    private int failedAttempts = 3;

    private int subscriptionsPerConnection = 5;

    private String clientName = "none";

    private int subscriptionConnectionMinimumIdleSize = 64;

    private int subscriptionConnectionPoolSize = 256;

    private int slaveConnectionMinimumIdleSize = 64;

    private int slaveConnectionPoolSize = 256;

    private int masterConnectionMinimumIdleSize = 64;

    private int masterConnectionPoolSize = 256;

    private ReadMode readMode = ReadMode.MASTER;

    private SubscriptionMode subscriptionMode = SubscriptionMode.MASTER;

    private int scanInterval = 1000;

    @Value("${rediscluster.pwd}")
    private String password;

    @Value("${redis.cluster}")
    private String nodeAddress;

    @Value("${redis.cluster1}")
    private String nodeAddress1;

    @Value("${redis.cluster2}")
    private String nodeAddress2;

}
/**
 * 初始化redisson Bean
 *
 * @author LiJunJun
 * @date 2018/10/19
 */
@Configuration
public class RedissonAutoConfiguration {

    @Autowired
    private RedissonProperties redssionProperties;

    /**
     * 叢集模式自動裝配
     *
     * @return
     */
    @Bean
    public RedissonClient redissonClient() {

        Config config = new Config();
        String passWord = redssionProperties.getPassword();
        ClusterServersConfig serverConfig = config.useClusterServers();
        serverConfig.addNodeAddress(redssionProperties.getNodeAddress(), redssionProperties.getNodeAddress1(), redssionProperties.getNodeAddress2());
        serverConfig.setPingTimeout(redssionProperties.getPingTimeout());
        serverConfig.setConnectTimeout(redssionProperties.getConnectTimeout());
        serverConfig.setTimeout(redssionProperties.getTimeout());
        serverConfig.setRetryAttempts(redssionProperties.getRetryAttempts());
        serverConfig.setRetryInterval(redssionProperties.getRetryInterval());
        serverConfig.setReconnectionTimeout(redssionProperties.getReconnectionTimeout());
        serverConfig.setFailedAttempts(redssionProperties.getFailedAttempts());
        serverConfig.setSubscriptionsPerConnection(redssionProperties.getSubscriptionsPerConnection());
        serverConfig.setClientName(redssionProperties.getClientName());
        serverConfig.setSubscriptionConnectionMinimumIdleSize(redssionProperties.getSubscriptionConnectionMinimumIdleSize());
        serverConfig.setSubscriptionConnectionPoolSize(redssionProperties.getSubscriptionConnectionPoolSize());
        serverConfig.setSlaveConnectionMinimumIdleSize(redssionProperties.getSlaveConnectionMinimumIdleSize());
        serverConfig.setSlaveConnectionPoolSize(redssionProperties.getSlaveConnectionPoolSize());
        serverConfig.setMasterConnectionMinimumIdleSize(redssionProperties.getMasterConnectionMinimumIdleSize());
        serverConfig.setMasterConnectionPoolSize(redssionProperties.getMasterConnectionPoolSize());
        serverConfig.setReadMode(redssionProperties.getReadMode());
        serverConfig.setSubscriptionMode(redssionProperties.getSubscriptionMode());
        serverConfig.setScanInterval(redssionProperties.getScanInterval());
        serverConfig.setPassword(StringUtils.isNotBlank(passWord) && !"null".equals(passWord) ? passWord : null);
        return Redisson.create(config);
    }
}

3、業務程式碼中的應用。此處使用的是悲觀鎖,即必須拿到鎖之後才能繼續往下執行,也可使用樂觀鎖,tryLock,利用重試去獲取鎖

    /**
     * redissonClient
     */
    @Resource
    private RedissonClient redissonClient;

    /**
     * 減庫存
     *
     * @param trace 請求流水
     * @param stockManageReq(stockId、decrNum)
     * @return -1為失敗,大於-1的正整數為減後的庫存量,-2為庫存不足無法減庫存
     */
    @Override
    @ApiOperation(value = "減庫存", notes = "減庫存")
    @RequestMapping(value = "/decrByStock", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_UTF8_VALUE, produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public int decrByStock(@RequestHeader(name = "Trace") String trace, @RequestBody StockManageReq stockManageReq) {

        long startTime = System.currentTimeMillis();

        LOGGER.reqPrint(Log.CACHE_SIGN, Log.CACHE_REQUEST, trace, "decrByStock", JSON.toJSONString(stockManageReq));

        int res = 0;
        String stockId = stockManageReq.getStockId();
        Integer decrNum = stockManageReq.getDecrNum();

        // 新增分散式鎖
        RLock stockLock = null;

        try {
            if (null != stockId && null != decrNum) {

                stockId = PREFIX + stockId;

                // 新增分散式鎖
                stockLock = redissonClient.getFairLock(stockId);

                stockLock.lock();

                // redis 減庫存邏輯
                String vStock = redisStockPool.get(stockId);
                long realV = 0L;
                if (StringUtils.isNotEmpty(vStock)) {
                    realV = Long.parseLong(vStock);
                }
                //庫存數  大於等於 要減的數目,則執行減庫存
                if (realV >= decrNum) {
                    Long v = redisStockPool.decrBy(stockId, decrNum);
                    res = v.intValue();
                } else {
                    res = -2;
                }

                stockLock.unlock();
            }
        } catch (Exception e) {
            LOGGER.error(trace, "decr sku stock failure.", e);
            res = -1;
        } finally {
            if (stockLock != null && stockLock.isLocked() && stockLock.isHeldByCurrentThread()) {
                stockLock.unlock();
            }
            LOGGER.respPrint(Log.CACHE_SIGN, Log.CACHE_RESPONSE, trace, "decrByStock", System.currentTimeMillis() - startTime, String.valueOf(res));
        }
        return res;
    }

四、ab壓測及結果分析
同樣的,我們以5000的請求量100的併發量來壓、tps在330左右,相對於zk做分散式鎖來看,提升了10倍的效能,但仍然不能滿足我們的要求

 五、總結

redisson提供了豐富的分散式鎖實現機制,並且使用起來相對比較簡單方便,具體選用哪種鎖,可以根據業務來選擇,但在高併發的情況下,效能還是有些差強人意,下一篇,我們使用redis的watch來實現分散式鎖。