1. 程式人生 > >利用Redisson實現分散式鎖及其底層原理解析

利用Redisson實現分散式鎖及其底層原理解析

Redis介紹

redis是一個key-value儲存系統。和Memcached類似,它支援儲存的value型別相對更多,包括string(字串)、list(連結串列)、set(集合)、zset(sorted set --有序集合)和hash(雜湊型別)。這些資料型別都支援push/pop、add/remove及取交集並集和差集及更豐富的操作,而且這些操作都是原子性的。在此基礎上,redis支援各種不同方式的排序。與memcached一樣,為了保證效率,資料都是快取在記憶體中。區別的是redis會週期性的把更新的資料寫入磁碟或者把修改操作寫入追加的記錄檔案,並且在此基礎上實現了master-slave(主從)同步。

Redis的優缺點

優點

  1. 速度快,因為資料存在記憶體中,類似於HashMap,HashMap的優勢就是查詢和操作的時間複雜度都是O(1)
  2. 支援豐富資料型別,支援string,list,set,sorted set,hash
  3. 支援事務,操作都是原子性,所謂的原子性就是對資料的更改要麼全部執行,要麼全部不執行
  4. 豐富的特性:可用於快取,訊息,按key設定過期時間,過期後將會自動刪除

缺點

在不使用框架的情況下使用起來較為麻煩

分散式Redis的搭建

搭建叢集的第一件事情我們需要一些執行在叢集模式的Redis例項. 這意味這叢集並不是由一些普通的Redis例項組成的,叢集模式需要通過配置啟用,開啟叢集模式後的Redis例項便可以使用叢集特有的命令和特性了.
下面是一個最少選項的叢集的配置檔案:

port 7000

cluster-enabled yes

cluster-config-file nodes.conf

cluster-node-timeout 5000

appendonly yes

檔案中的 cluster-enabled 選項用於開例項的叢集模式, 而 cluster-conf-file 選項則設定了儲存節點配置檔案的路徑, 預設值為 nodes.conf.節點配置檔案無須人為修改, 它由 Redis 叢集在啟動時建立, 並在有需要時自動進行更新。

要讓叢集正常運作至少需要三個主節點,不過在剛開始試用叢集功能時, 強烈建議使用六個節點: 其中三個為主節點, 而其餘三個則是各個主節點的從節點。

為了方便測試,直接在同一臺計算機內建立六個資料夾7000到7005,分別表示六個Redis例項,在資料夾 7000 至 7005 中, 各建立一個 redis.conf 檔案, 檔案的內容使用上面的示例配置檔案, 但記得將配置中的埠號從 7000 改為與資料夾名字相同的號碼。

從 Redis Github 頁面 的 unstable 分支中取出最新的 Redis 原始碼, 編譯出可執行檔案 redis-server , 並將檔案複製到 cluster-test 資料夾, 然後使用類似以下命令, 在每個標籤頁中開啟一個例項:

/redis-server ./redis.conf

現在我們已經有了六個正在執行中的 Redis 例項, 接下來我們需要使用這些例項來建立叢集, 併為每個節點編寫配置檔案。

通過使用 Redis 叢集命令列工具 redis-trib , 編寫節點配置檔案的工作可以非常容易地完成: redis-trib 位於 Redis 原始碼的 src 資料夾中, 它是一個 Ruby 程式, 這個程式通過向例項傳送特殊命令來完成建立新叢集, 檢查叢集, 或者對叢集進行重新分片(reshared)等工作。

./redis-trib.rb create --replicas 1 127.0.0.1:7000 127.0.0.1:7001
127.0.0.1:7002 127.0.0.1:7003 127.0.0.1:7004 127.0.0.1:7005

這個命令在這裡用於建立一個新的叢集, 選項–replicas 1 表示我們希望為叢集中的每個主節點建立一個從節點。

之後跟著的其他引數則是這個叢集例項的地址列表,3個master3個slave redis-trib 會打印出一份預想中的配置給你看, 如果你覺得沒問題的話, 就可以輸入 yes , redis-trib 就會將這份配置應用到叢集當中,讓各個節點開始互相通訊,最後可以得到如下資訊:

[OK] All 16384 slots covered

這表示叢集中的 16384 個槽都有至少一個主節點在處理, 叢集運作正常。

分散式Redis的原理

由上文可知,Redis是以雜湊槽的形式對叢集進行劃分的,整個叢集的雜湊槽一共有16384個,在有3個Redis例項的情況下,節點A包含從0到5500的雜湊槽,節點B包含從5501到11000 的雜湊槽,節點C包含從11001到16384的雜湊槽。當有新的節點新增進來的時候,會從當前的各個節點中選取一定的槽分配給新新增的節點,當有節點從叢集中被刪除時,則會將當前節點的槽分配給叢集中其他正在執行的節點。每當有新的key新增到Redis中時,會根據演算法算出相應的雜湊槽來找到對應的叢集節點。

Redisson介紹

什麼是Redisson

Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory Data Grid)。它不僅提供了一系列的分散式的Java常用物件,還提供了許多分散式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

Redisson的使用

Redisson配置(以spring XML為例)

Maven配置

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>2.2.12</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-annotations</artifactId>
    <version>2.6.0</version>
</dependency>

Spring XML配置

<redisson:client
    id="redisson"
    name="redisson1,redisson2"
    threads="0"
    netty-threads="0"
    codec-ref="myCodec" 
    transport-mode="NIO"
    redisson-reference-enabled="true"
    codec-provider-ref="myCodecProvider"
    resolver-provider-ref="myResolverProvider"
    executor-ref="myExecutor"
    event-loop-group-ref="myEventLoopGroup"
>
    <!--
    這裡的name屬性和qualifier子元素不能同時使用。

    id和name的屬性都可以被用來作為qualifier的備選值。
    -->
    <!--<qualifier value="redisson3"/>-->
    <redisson:cluster-servers
        idle-connection-timeout="10000"
        ping-timeout="1000"
        connect-timeout="10000"
        timeout="3000"
        retry-attempts="3"
        retry-interval="1500"
        reconnection-timeout="3000"
        failed-attempts="3"
        password="do_not_use_if_it_is_not_set"
        subscriptions-per-connection="5"
        client-name="none"
        load-balancer-ref="myLoadBalancer"
        subscription-connection-minimum-idle-size="1"
        subscription-connection-pool-size="50"
        slave-connection-minimum-idle-size="10"
        slave-connection-pool-size="64"
        master-connection-minimum-idle-size="10"
        master-connection-pool-size="64"
        read-mode="SLAVE"
        subscription-mode="SLAVE"
        scan-interval="1000"
    >
        <redisson:node-address value="redis://127.0.0.1:6379" />
        <redisson:node-address value="redis://127.0.0.1:6380" />
        <redisson:node-address value="redis://127.0.0.1:6381" />
    </redisson:cluster-servers>
</redisson:client>
<!-- 最基本配置 -->
<redisson:client>
    <redisson:cluster-servers>
        <redisson:node-address value="redis://127.0.0.1:6379" />
        <redisson:node-address value="redis://127.0.0.1:6380" />
        <redisson:node-address value="redis://127.0.0.1:6381" />
        ...
    </redisson:cluster-servers>
</redisson:client>

更多配置方法

Redisson實現分散式鎖

鎖的種類

可重入鎖

RLock lock = redisson.getLock("anyLock");
// 最常見的使用方法
lock.lock();
...
lock.unlock()
// 加鎖以後10秒鐘自動解鎖
// 無需呼叫unlock方法手動解鎖
lock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

公平鎖

RLock fairLock = redisson.getFairLock("anyLock");
// 最常見的使用方法
fairLock.lock();
// 10秒鐘以後自動解鎖
// 無需呼叫unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);

// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();

Redisson同時還為分散式可重入公平鎖提供了非同步執行的相關方法:

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

聯鎖

基於Redis的Redisson分散式聯鎖RedissonMultiLock物件可以將多個RLock物件關聯為一個聯鎖,每個RLock物件例項可以來自於不同的Redisson例項。



RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 所有的鎖都上鎖成功才算成功。
lock.lock();
...
lock.unlock();

紅鎖

基於Redis的Redisson紅鎖RedissonRedLock物件實現了Redlock介紹的加鎖演算法。該物件也可以用來將多個RLock物件關聯為一個紅鎖,每個RLock物件例項可以來自於不同的Redisson例項。

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同時加鎖:lock1 lock2 lock3
// 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
...
lock.unlock();

另外Redisson還通過加鎖的方法提供了leaseTime的引數來指定加鎖的時間。超過這個時間後鎖便自動解開了。

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 給lock1,lock2,lock3加鎖,如果沒有手動解開的話,10秒鐘後將會自動解開
lock.lock(10, TimeUnit.SECONDS);

// 為加鎖等待100秒時間,並在加鎖成功10秒鐘後自動解開
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

讀寫鎖

RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();

訊號量

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

可過期性訊號量

基於Redis的Redisson可過期性訊號量(PermitExpirableSemaphore)是在RSemaphore物件的基礎上,為每個訊號增加了一個過期時間。每個訊號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個訊號,有效期只有2秒鐘。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

閉鎖

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();

// 在其他執行緒或其他JVM裡
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

鎖的原理

在Redisson中,使用key來作為是否上鎖的標誌,當通過getLock(String key)方法獲得相應的鎖之後,這個key即作為一個鎖儲存到Redis叢集中,在接下來如果有其他的執行緒嘗試獲取名為key的鎖時,便會向叢集中進行查詢,如果能夠查到這個鎖並發現相應的value的值不為0,則表示已經有其他執行緒申請了這個鎖同時還沒有釋放,則當前執行緒進入阻塞,否則由當前執行緒獲取這個鎖並將value值加一,如果是可重入鎖的話,則當前執行緒每獲得一個自身執行緒的鎖,就將value的值加一,而每釋放一個鎖則將value值減一,直到減至0,完全釋放這個鎖。因為底層是基於分散式的Redis叢集,所以Redisson實現了分散式的鎖機制。

加鎖

在Redisson中,加鎖需要以下三個引數:

KEYS[1] :需要加鎖的key,這裡需要是字串型別。

ARGV[1] :鎖的超時時間,防止死鎖

ARGV[2] :鎖的唯一標識,id(UUID.randomUUID()) + “:” + threadId

Future tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId) {
        internalLockLeaseTime = unit.toMillis(leaseTime);
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG,
            // 檢查是否key已經被佔用,如果沒有則設定超時時間和唯一標識,初始化value=1
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            // 如果鎖重入,需要判斷鎖的key field 都一直情況下 value 加一
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                //鎖重入重新設定超時時間
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
            "end; " +
            // 返回剩餘的過期時間
            "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

解鎖

在Redisson中解鎖需要以下五個引數:

KEYS[1] :需要加鎖的key,這裡需要是字串型別。

KEYS[2] :redis訊息的ChannelName,一個分散式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}”

ARGV[1] :reids訊息體,這裡只需要一個位元組的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖訊息的客戶端執行緒申請鎖。

ARGV[2] :鎖的超時時間,防止死鎖

ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId

public void unlock() {
        Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
        // 如果key已經不存在,說明已經被解鎖,直接釋出(publihs)redis訊息
        "if (redis.call('exists', KEYS[1]) == 0) then " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; " +
        "end;" +
        // key和field不匹配,說明當前客戶端執行緒沒有持有鎖,不能主動解鎖。
        "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
        "return nil;" +
        "end; " +
        "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
        // 如果counter>0說明鎖在重入,不能刪除key
        "if (counter > 0) then " +
        "redis.call('pexpire', KEYS[1], ARGV[2]); " +
        "return 0; " +
        "else " +
        // 刪除key並且publish 解鎖訊息
        "redis.call('del', KEYS[1]); " +
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; "+
        "end; " +
        "return nil;",
        Arrays.asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId()));
        if (opStatus == null) {
                throw new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                        + id + " thread-id: " + Thread.currentThread().getId());
        }
        // 解鎖成功之後取消更新鎖expire的時間任務
        if (opStatus) {
                cancelExpirationRenewal();
        }
    }

注意點

Redisson 預設的 CommandExecutor 實現是通過 eval 命令來執行 Lua 指令碼,所以要求 Redis 的版本必須為 2.6 或以上,否則可能要自己來實現