1. 程式人生 > >基於redis實現的分布式鎖

基於redis實現的分布式鎖

支持 過時 break 幫助 進程 bject cep object tex


RedisLockHelper.java


/**
 * Created by BingZhong on 2017/7/29.
 *
 * 基於Redis實現的分布式鎖
 */
public final class RedisLockHelper {

    private static Logger logger = LoggerFactory.getLogger(RedisLockHelper.class);

    /**
     * redis操作幫助類,可以是其他封裝了redis操作的類
     */
    private RedisHelper redisHelper;

    public static final long DEFAULT_TIMEOUT = 30 * 1000;

    public static final long DEFAULT_SLEEP_TIME = 100;

    private RedisLockHelper(RedisHelper redisHelper) {
        this.redisHelper = redisHelper;
    }

    public static RedisLockHelper getInstance(RedisHelper redisHelper) {
        return new RedisLockHelper(redisHelper);
    }

    /**
     * 創建鎖
     *
     * @param mutex     互斥量
     * @param timeout   鎖的超時時間
     * @param sleepTime 線程自旋嘗試獲取鎖時的休眠時間
     * @param timeUnit  時間單位
     */
    public RedisLock newLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {
        logger.info("創建分布式鎖,互斥量為{}", mutex);
        return new RedisLock(mutex, timeout, sleepTime, timeUnit);
    }

    public RedisLock newLock(String mutex, long timeout, TimeUnit timeUnit) {
        return newLock(mutex, timeout, DEFAULT_SLEEP_TIME, timeUnit);
    }

    public RedisLock newLock(String mutex) {
        return newLock(mutex, DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    public class RedisLock {
        /**
         * 用於創建redis健值對的鍵,相當於互斥量
         */
        private final String mutex;

        /**
         * 鎖過期的絕對時間
         */
        private volatile long lockExpiresTime = 0;

        /**
         * 鎖的超時時間
         */
        private final long timeout;

        /**
         * 每次循環獲取鎖的休眠時間
         */
        private final long sleepTime;

        /**
         * 鎖的線程持有者
         */
        private volatile Thread lockHolder = null;

        private final ReentrantLock threadLock = new ReentrantLock();

        public RedisLock(String mutex, long timeout, long sleepTime, TimeUnit timeUnit) {
            this.mutex = mutex;
            this.timeout = timeUnit.toMillis(timeout);
            this.sleepTime = timeUnit.toMillis(sleepTime);
        }

        /**
         * 加鎖,將會一直嘗試獲取鎖,直到超時
         */
        public boolean lock(long acquireTimeout, TimeUnit timeUnit) throws InterruptedException {
            acquireTimeout = timeUnit.toMillis(acquireTimeout);
            long acquireTime = acquireTimeout + System.currentTimeMillis();
            threadLock.tryLock(acquireTimeout, timeUnit);
            try {
                while (true) {
                    boolean hasLock = tryLock();
                    if (hasLock) {
                        //獲取鎖成功
                        return true;
                    } else if (acquireTime < System.currentTimeMillis()) {
                        break;
                    }
                    Thread.sleep(sleepTime);
                }
            } finally {
                if (threadLock.isHeldByCurrentThread()) {
                    threadLock.unlock();
                }
            }

            return false;
        }

        /**
         * 嘗試獲取鎖,無論是否獲取到鎖都將直接返回而不會阻塞
         * 不支持重入鎖
         */
        public boolean tryLock() {
            if (lockHolder == Thread.currentThread()) {
                throw new IllegalMonitorStateException("不支持重入鎖");
            }
            long currentTime = System.currentTimeMillis();
            String expires = String.valueOf(timeout + currentTime);
            //嘗試設置互斥量
            if (redisHelper.setNx(mutex, expires) > 0) {
                setLockStatus(expires);
                return true;
            } else {
                String currentLockTime = redisHelper.get(mutex);
                //檢查鎖是否超時
                if (Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) < currentTime) {
                    //獲取舊的鎖時間並設置互斥量
                    String oldLockTime = redisHelper.getSet(mutex, expires);
                    //判斷獲取到的舊值是否一致,不一致證明已經有另外的進程(線程)成功獲取到了鎖
                    if (Objects.nonNull(oldLockTime) && Objects.equals(oldLockTime, currentLockTime)) {
                        setLockStatus(expires);
                        return true;
                    }
                }

                return false;
            }
        }

        /**
         * 該鎖是否被鎖住
         */
        public boolean isLock() {
            String currentLockTime = redisHelper.get(mutex);
            //存在互斥量且鎖還為過時即鎖住
            return Objects.nonNull(currentLockTime) && Long.parseLong(currentLockTime) > System.currentTimeMillis();
        }

        public String getMutex() {
            return mutex;
        }

        /**
         * 解鎖
         */
        public boolean unlock() {
            //只有鎖的持有線程才能解鎖
            if (lockHolder == Thread.currentThread()) {
                //判斷鎖是否超時,沒有超時才將互斥量刪除
                if (lockExpiresTime > System.currentTimeMillis()) {
                    redisHelper.del(mutex);
                    logger.info("刪除互斥量[{}]", mutex);
                }
                lockHolder = null;
                logger.info("釋放[{}]鎖成功", mutex);

                return true;
            } else {
                throw new IllegalMonitorStateException("沒有獲取到鎖的線程無法執行解鎖操作");
            }
        }

        private void setLockStatus(String expires) {
            lockExpiresTime = Long.parseLong(expires);
            lockHolder = Thread.currentThread();
            logger.info("獲取[{}]鎖成功", mutex);
        }
    }
}

基於redis實現的分布式鎖