1. 程式人生 > >redisson分散式鎖redLock原始碼解析【未完】

redisson分散式鎖redLock原始碼解析【未完】

一、準備階段

1、原理

一個客戶端需要做如下操作來獲取鎖:

1.獲取當前時間(單位是毫秒)

2.輪流用相同的key和隨機值在N個節點上請求鎖,在這一步裡,客戶端在每個master上請求鎖時會有一個和總的鎖釋放時間相比小的多的超時時間。比如如果鎖自動釋放時間是10秒鐘,那每個節點鎖請求的超時時間可能是5-50毫秒的範圍,這個可以防止一個客戶端在某個宕掉的master節點上阻塞過長時間,如果一個master節點不可用了,我們應該儘快嘗試下一個master節點

3.客戶端計算第二步中獲取鎖所花的時間,只有當客戶端在大多數master節點上成功獲取了鎖(在這裡是3個),而且總共消耗的時間不超過鎖釋放時間,這個鎖就認為是獲取成功了

4.如果鎖獲取成功了,那現在鎖自動釋放時間就是最初的鎖釋放時間減去之前獲取鎖所消耗的時間

5.如果鎖獲取失敗了,不管是因為獲取成功的鎖不超過一半(N/2+1)還是因為總消耗時間超過了鎖釋放時間,客戶端都會到每個master節點上釋放鎖,即便是那些他認為沒有獲取成功的鎖。

In order to acquire the lock, the client performs the following operations:

1、It gets the current time in milliseconds.

2、It tries to acquire the lock in all the N instances sequentially, using the same key name and random value in all the instances. During step 2, when setting the lock in each instance, the client uses a timeout which is small compared to the total lock auto-release time in order to acquire it. For example if the auto-release time is 10 seconds, the timeout could be in the ~ 5-50 milliseconds range. This prevents the client from remaining blocked for a long time trying to talk with a Redis node which is down: if an instance is not available, we should try to talk with the next instance ASAP.

3、The client computes how much time elapsed in order to acquire the lock, by subtracting from the current time the timestamp obtained in step 1. If and only if the client was able to acquire the lock in the majority of the instances (at least 3), and the total time elapsed to acquire the lock is less than lock validity time, the lock is considered to be acquired.

4、If the lock was acquired, its validity time is considered to be the initial validity time minus the time elapsed, as computed in step 3.

5、If the client failed to acquire the lock for some reason (either it was not able to lock N/2+1 instances or the validity time is negative), it will try to unlock all the instances (even the instances it believed it was not able to lock).

2、依賴包和版本

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.3.2</version>
</dependency>

3、原始碼中使用到的 Redis 命令

SETNX key value (SET if Not eXists):當且僅當 key 不存在,將 key 的值設為 value ,並返回1;若給定的 key 已經存在,則 SETNX 不做任何動作,並返回0。詳見:SETNX commond

GETSET key value:將給定 key 的值設為 value ,並返回 key 的舊值 (old value),當 key 存在但不是字串型別時,返回一個錯誤,當key不存在時,返回nil。詳見:GETSET commond

GET key:返回 key 所關聯的字串值,如果 key 不存在那麼返回 nil 。詳見:GET Commond

DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(integer)。詳見:DEL Commond

HSET key field value:給一個key 設定一個{field=value}的組合值,如果key沒有就直接賦值並返回1,如果field已有,那麼就更新value的值,並返回0.詳見:HSET Commond

HEXISTS key field:當key 中儲存著field的時候返回1,如果key或者field至少有一個不存在返回0。詳見HEXISTS Commond

HINCRBY key field increment:將儲存在 key 中的雜湊(Hash)物件中的指定欄位 field 的值加上增量 increment。如果鍵
key 不存在,一個儲存了雜湊物件的新建將被建立。如果欄位 field 不存在,在進行當前操作前,其將被建立,且對應的值被置為 0。返回值是增量之後的值。詳見:HINCRBY Commond

PEXPIRE key milliseconds:設定存活時間,單位是毫秒。expire操作單位是秒。詳見:PEXPIRE Commond

PUBLISH channel message:向channel post一個message內容的訊息,返回接收訊息的客戶端數。詳見PUBLISH Commond

4、入口類

public class RedisLockTest {

    public static void main(String[] args) throws InterruptedException {
        Config config = new Config();
        config.useSentinelServers().addSentinelAddress("127.0.0.1:6479", "127.0.0.1:6489").setMasterName("master")
                .setPassword("password").setDatabase(0);
        RedissonClient redissonClient = Redisson.create(config);

        RLock lock = redissonClient.getLock("LOCKER_PREFIX" + "test_lock");
        try {
            boolean isLock = lock.tryLock();
            //            isLock = lock.tryLock(100, 1000, TimeUnit.SECONDS);
            if (isLock) {
                //doBusiness();
            }
        } catch (Exception e) {
        } finally {
            lock.unlock();
        }

    }
}

4、大體分兩種,一種是無參,另一種是帶過期時間的

lock.tryLock() -> tryAcquireOnceAsync - tryLockInnerAsync
lock.tryLock(100, 1000, TimeUnit.SECONDS) ->tryLock(long waitTime, long leaseTime, TimeUnit unit)

二、 原始碼分析-無參

1、RedissonLock.tryAcquireOnceAsync

private RFuture<Boolean> tryAcquireOnceAsync(long leaseTime, TimeUnit unit, final long threadId) {
        if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        }
        RFuture<Boolean> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_NULL_BOOLEAN);
        ttlRemainingFuture.addListener(new FutureListener<Boolean>() {
            @Override
            public void operationComplete(Future<Boolean> future) throws Exception {
                if (!future.isSuccess()) {
                    return;
                }

                Boolean ttlRemaining = future.getNow();
                // lock acquired
                if (ttlRemaining) {
                    scheduleExpirationRenewal(threadId);
                }
            }
        });
        return ttlRemainingFuture;
    }

1.1 、RedissonLock.tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
        internalLockLeaseTime = unit.toMillis(leaseTime);

        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " + //如果鎖名稱不存在
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +//則向redis中新增一個key為test_lock的set,並且向set中新增一個field為執行緒id,值=1的鍵值對,表示此執行緒的重入次數為1
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +//設定set的過期時間,防止當前伺服器出問題後導致死鎖,return nil; end;返回nil 結束
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +//如果鎖是存在的,檢測是否是當前執行緒持有鎖,如果是當前執行緒持有鎖
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +//則將該執行緒重入的次數++
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +//並且重新設定該鎖的有效時間
                      "return nil; " + //返回nil,結束
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);", //鎖存在, 但不是當前執行緒加的鎖,則返回鎖的過期時間
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }
````

其中
KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock
ARGV[1] 表示的是 internalLockLeaseTime 預設值是30s
ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖物件id+執行緒id, 表示當前訪問執行緒,用於區分不同伺服器上的執行緒.





<div class="se-preview-section-delimiter"></div>

#### 1.2、RedissonLock.scheduleExpirationRenewal




<div class="se-preview-section-delimiter"></div>

```java
private void scheduleExpirationRenewal(final long threadId) {
        if (expirationRenewalMap.containsKey(getEntryName())) {
            return;
        }

        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {

                RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return 1; " +
                        "end; " +
                        "return 0;",
                          Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

                future.addListener(new FutureListener<Boolean>() {
                    @Override
                    public void operationComplete(Future<Boolean> future) throws Exception {
                        expirationRenewalMap.remove(getEntryName());
                        if (!future.isSuccess()) {
                            log.error("Can't update lock " + getName() + " expiration", future.cause());
                            return;
                        }

                        if (future.getNow()) {
                            // reschedule itself
                            scheduleExpirationRenewal(threadId);
                        }
                    }
                });
            }
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

        if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
            task.cancel();
        }
    }

三、 原始碼分析-過期時間

 1、RedissonLock.tryLock

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        final long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        time -= (System.currentTimeMillis() - current);
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();
        final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                    @Override
                    public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                        if (subscribeFuture.isSuccess()) {
                            unsubscribe(subscribeFuture, threadId);
                        }
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= (System.currentTimeMillis() - current);
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }

            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= (System.currentTimeMillis() - currentTime);
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

1.1 tryAcquire

tryLock -> tryAcquire -> tryAcquireAsync -> tryAcquireAsync -> tryLockInnerAsync

1.2 subscribe

tryLock -> subscribe(threadId) -> PUBSUB.subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager)

PublishSubscribe.subscribe

public RFuture<E> subscribe(final String entryName, final String channelName, final ConnectionManager connectionManager) {
        final AtomicReference<Runnable> listenerHolder = new AtomicReference<Runnable>();
        final AsyncSemaphore semaphore = connectionManager.getSemaphore(channelName);
        final RPromise<E> newPromise = new PromiseDelegator<E>(connectionManager.<E>newPromise()) {
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                return semaphore.remove(listenerHolder.get());
            }
        };

        Runnable listener = new Runnable() {

            @Override
            public void run() {
                E entry = entries.get(entryName);
                if (entry != null) {
                    entry.aquire();
                    semaphore.release();
                    entry.getPromise().addListener(new TransferListener<E>(newPromise));
                    return;
                }

                E value = createEntry(newPromise);
                value.aquire();

                E oldValue = entries.putIfAbsent(entryName, value);
                if (oldValue != null) {
                    oldValue.aquire();
                    semaphore.release();
                    oldValue.getPromise().addListener(new TransferListener<E>(newPromise));
                    return;
                }
                // 1.2.1
                RedisPubSubListener<Object> listener = createListener(channelName, value);
                // 1.2.2
                connectionManager.subscribe(LongCodec.INSTANCE, channelName, listener, semaphore);
            }
        };
        semaphore.acquire(listener);
        listenerHolder.set(listener);

        return newPromise;
    }
1.2.1 PublishSubscribe.createListener

tryLock -> subscribe(threadId) -> PUBSUB.subscribe -> PublishSubscribe.createListener

PublishSubscribe.createListener

private RedisPubSubListener<Object> createListener(final String channelName, final E value) {
        RedisPubSubListener<Object> listener = new BaseRedisPubSubListener() {

            @Override
            public void onMessage(String channel, Object message) {
                if (!channelName.equals(channel)) {
                    return;
                }

                PublishSubscribe.this.onMessage(value, (Long)message);
            }

            @Override
            public boolean onStatus(PubSubType type, String channel) {
                if (!channelName.equals(channel)) {
                    return false;
                }

                if (type == PubSubType.SUBSCRIBE) {
                    value.getPromise().trySuccess(value);
                    return true;
                }
                return false;
            }

        };
        return listener;
    }
1.2.2 PublishSubscribe.createListener

tryLock -> subscribe(threadId) -> PUBSUB.subscribe -> PublishSubscribe.createListener -> LockPubSub.onMessage

public class LockPubSub extends PublishSubscribe<RedissonLockEntry> {

    public static final Long unlockMessage = 0L;

    @Override
    protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) {
        return new RedissonLockEntry(newPromise);
    }

    @Override
    protected void onMessage(RedissonLockEntry value, Long message) {
        if (message.equals(unlockMessage)) {
            // 釋放一個許可,喚醒等待的entry.getLatch().tryAcquire去再次嘗試獲取鎖。
            value.getLatch().release();
            // 如果entry還有其他Listeners回撥,也喚醒執行。
            while (true) {
                Runnable runnableToExecute = null;
                synchronized (value) {
                    Runnable runnable = value.getListeners().poll();
                    if (runnable != null) {
                        if (value.getLatch().tryAcquire()) {
                            runnableToExecute = runnable;
                        } else {
                            value.addListener(runnable);
                        }
                    }
                }

                if (runnableToExecute != null) {
                    runnableToExecute.run();
                } else {
                    return;
                }
            }
        }
    }
}
1.2.2 MasterSlaveConnectionManager.subscribe

tryLock -> subscribe(threadId) -> PUBSUB.subscribe -> MasterSlaveConnectionManager.subscribe

public RFuture<PubSubConnectionEntry> subscribe(Codec codec, String channelName, RedisPubSubListener<?> listener, AsyncSemaphore semaphore) {
        RPromise<PubSubConnectionEntry> promise = newPromise();
        subscribe(codec, channelName, listener, promise, PubSubType.SUBSCRIBE, semaphore);
        return promise;
    }
1.2.2.1 MasterSlaveConnectionManager.subscribe

tryLock -> subscribe(threadId) -> PUBSUB.subscribe -> MasterSlaveConnectionManager.subscribe -> MasterSlaveConnectionManager.subscribe(final Codec codec, final String channelName, final RedisPubSubListener

private void subscribe(final Codec codec, final String channelName, final RedisPubSubListener<?> listener,
            final RPromise<PubSubConnectionEntry> promise, final PubSubType type, final AsyncSemaphore lock) {
        final PubSubConnectionEntry connEntry = name2PubSubConnection.get(channelName);
        if (connEntry != null) {
            connEntry.addListener(channelName, listener);
            connEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    lock.release();
                    promise.trySuccess(connEntry);
                }
            });
            return;
        }

        freePubSubLock.acquire(new Runnable() {

            @Override
            public void run() {
                if (promise.isDone()) {
                    return;
                }

                final PubSubConnectionEntry freeEntry = freePubSubConnections.peek();
                if (freeEntry == null) {
                    connect(codec, channelName, listener, promise, type, lock);
                    return;
                }

                int remainFreeAmount = freeEntry.tryAcquire();
                if (remainFreeAmount == -1) {
                    throw new IllegalStateException();
                }

                final PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, freeEntry);
                if (oldEntry != null) {
                    freeEntry.release();
                    freePubSubLock.release();

                    oldEntry.addListener(channelName, listener);
                    oldEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                        @Override
                        public void operationComplete(Future<Void> future) throws Exception {
                            lock.release();
                            promise.trySuccess(oldEntry);
                        }
                    });
                    return;
                }

                if (remainFreeAmount == 0) {
                    freePubSubConnections.poll();
                }
                freePubSubLock.release();

                freeEntry.addListener(channelName, listener);
                freeEntry.getSubscribeFuture(channelName, type).addListener(new FutureListener<Void>() {
                    @Override
                    public void operationComplete(Future<Void> future) throws Exception {
                        lock.release();
                        promise.trySuccess(freeEntry);
                    }
                });

                if (PubSubType.PSUBSCRIBE == type) {
                    freeEntry.psubscribe(codec, channelName);
                } else {
                    freeEntry.subscribe(codec, channelName);
                }
            }

        });
    }

三、RedissonLock解鎖 unlock原始碼

@Override
    public void unlock() {
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                        "if (redis.call('exists', KEYS[1]) == 0) then " +//如果鎖已經不存在(可能是因為過期導致不存在,也可能是因為已經解鎖)
                            "redis.call('publish', KEYS[2], ARGV[1]); " +//則釋出鎖解除的訊息
                            "return 1; " + //返回1結束
                        "end;" +
                        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + //如果鎖存在,但是若果當前執行緒不是加鎖的線
                            "return nil;" + //則直接返回nil 結束
                        "end; " +
                        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + //如果是鎖是當前執行緒所新增,定義變數counter,表示當前執行緒的重入次數-1,即直接將重入次數-1
                        "if (counter > 0) then " + //如果重入次數大於0,表示該執行緒還有其他任務需要執行
                            "redis.call('pexpire', KEYS[1], ARGV[2]); " + //則重新設定該鎖的有效時間
                            "return 0; " + //返回0結束
                        "else " +
                            "redis.call('del', KEYS[1]); " + //否則表示該執行緒執行結束,刪除該鎖
                            "redis.call('publish', KEYS[2], ARGV[1]); " + //並且釋出該鎖解除的訊息
                            "return 1; "+ //返回1結束
                        "end; " +
                        "return nil;", //其他情況返回nil並結束
                        Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
        if (opStatus == null) {
            throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + Thread.currentThread().getId());
        }
        if (opStatus) {
            cancelExpirationRenewal();
        }
    }

其中
KEYS[1] 表是的是getName() 代表鎖名test_lock
KEYS[2] 表示getChanelName() 表示的是釋出訂閱過程中使用的Chanel
ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖訊息,實際代表的是數字 0,代表解鎖訊息
ARGV[2] 表示的是internalLockLeaseTime 預設的有效時間 30s
ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當前鎖id+執行緒id

RedissonLock強制解鎖原始碼

@Override
    public void forceUnlock() {
        get(forceUnlockAsync());
    }

    @Override
    public RFuture<Boolean> forceUnlockAsync() {
        cancelExpirationRenewal();
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('del', KEYS[1]) == 1) then "
                + "redis.call('publish', KEYS[2], ARGV[1]); "
                + "return 1 "
                + "else "
                + "return 0 "
                + "end",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage);
    }

以上是強制解鎖的原始碼,在原始碼中並沒有找到forceUnlock()被呼叫的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被呼叫的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則併發布鎖被刪除的訊息,返回1結束,否則返回0結束。