1. 程式人生 > >分散式鎖原始碼剖析(2) Redisson實現公平分散式鎖

分散式鎖原始碼剖析(2) Redisson實現公平分散式鎖

Redisson分散式鎖原始碼剖析(公平鎖)

maven配置檔案:

 <dependency>
         <groupId>org.redisson</groupId>
         <artifactId>redisson</artifactId>
         <version>3.8.1</version>
</dependency>

程式碼示例:

Config config = new Config();
config.useClusterServers()
    .addNodeAddress("redis://192.168.31.114:7001")
    .addNodeAddress("redis://192.168.31.184:7002");

RedissonClient redisson = Redisson.create(config);

RLock lock = redisson.getFairLock("anyLock");
lock.lock();
lock.unlock();

lock()方法原始碼剖析

基本邏輯和可重入鎖類似,最大區別是加鎖的邏輯。

核心原始碼:

lock()->RedisssonLock.lock()->lockInterruptibly()->tryAcquire()->tryAcquireAsync()->tryLockInnerAsync()(RedissonFairLock類中的方法)

 if (command == RedisCommands.EVAL_LONG) {
            return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                    // remove stale threads
                    "while true do "
                    + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                    + "if firstThreadId2 == false then "
                        + "break;"
                    + "end; "
                    + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                    + "if timeout <= tonumber(ARGV[4]) then "
                        + "redis.call('zrem', KEYS[3], firstThreadId2); "
                        + "redis.call('lpop', KEYS[2]); "
                    + "else "
                        + "break;"
                    + "end; "
                  + "end;"
                    
                      + "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
                            + "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
                            "redis.call('lpop', KEYS[2]); " +
                            "redis.call('zrem', KEYS[3], ARGV[2]); " +
                            "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                        "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                            "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                            "return nil; " +
                        "end; " +
                            
                        "local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
                        "local ttl; " + 
                        "if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + 
                            "ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" + 
                        "else "
                          + "ttl = redis.call('pttl', KEYS[1]);" + 
                        "end; " + 
                            
                        "local timeout = ttl + tonumber(ARGV[3]);" + 
                        "if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " +
                            "redis.call('rpush', KEYS[2], ARGV[2]);" +
                        "end; " +
                        "return ttl;", 
                        Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName), 
                                    internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
        }

第一段邏輯:死迴圈,彈出佇列中的第一個元素,如果佇列是空,直接跳出迴圈。timeout = set集合裡面執行緒對應的過期時間。如果timeout <= 當前時間,清空集合,清空佇列。

第二段邏輯:如果anyLock鎖不存在並且佇列為空或者佇列的第一個元素是當前執行緒,彈出佇列的第一個元素,從有序set集合中刪除當前執行緒對應的元素,然後給當前執行緒加鎖,設定這個key的有效時間是30秒。判斷anyLock中,是否有當前執行緒:1,如果有,那累加1,即當前執行緒:2,。(重入鎖)

第三段邏輯:得到佇列中的第一個元素,如果佇列中的第一個元素存在並且執行緒id不等於當前執行緒,ttl = 佇列第一個元素過期時間 - 當前時間。如果佇列是空,ttl = anyLock的存活時間。

第四段邏輯:timeout= ttl + 當前時間 + 50秒(等待時間),往set集合插入一個元素,過期時間為timeout,並把它放入佇列中。最終返回ttl。

unlock()方法原始碼剖析

unlock()->RedisssonLock.unlock()->unlockAsync()->unlockInnerAsync()(RedissonFairLock類中的方法)

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                // remove stale threads
                "while true do "
                + "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
                + "if firstThreadId2 == false then "
                    + "break;"
                + "end; "
                + "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
                + "if timeout <= tonumber(ARGV[4]) then "
                    + "redis.call('zrem', KEYS[3], firstThreadId2); "
                    + "redis.call('lpop', KEYS[2]); "
                + "else "
                    + "break;"
                + "end; "
              + "end;"
                
              + "if (redis.call('exists', KEYS[1]) == 0) then " + 
                    "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                    "if nextThreadId ~= false then " +
                        "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                    "end; " +
                    "return 1; " +
                "end;" +
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "end; " +
                    
                "redis.call('del', KEYS[1]); " +
                "local nextThreadId = redis.call('lindex', KEYS[2], 0); " + 
                "if nextThreadId ~= false then " +
                    "redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
                "end; " +
                "return 1; ",
                Arrays.<Object>asList(getName(), threadsQueueName, timeoutSetName, getChannelName()), 
                LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
    }

第一段邏輯:獲取佇列中的第一個元素,如果佇列為空,跳出迴圈。timeout = set集合中執行緒對應的元素存活時間,如果timeout <=當前時間,清空set集合和佇列。

第二段邏輯:如果鎖不存在,取出佇列中的第一個元素,釋出一個解鎖訊息事件,如果anylock的map結構中沒有任何執行緒,則直接返回null。遞減anyLock的threadId:1。(因為鎖的可重入性)

第三段邏輯:刪除anyLock這個key,佇列中的第一個元素通過訊息事件,嘗試去獲取鎖。

Redisson 公平鎖的原理圖

Redisson分散式鎖原理(公平鎖)