1. 程式人生 > >Redis實現分散式鎖(設計模式應用實戰)

Redis實現分散式鎖(設計模式應用實戰)

筆者看過網路上各種各樣使用redis實現分散式鎖的程式碼,要麼錯誤,要麼片段化,沒有一個完整的例子,借這個週末給大家總結一下redis實現分散式鎖的兩種機制

自旋鎖和排他鎖

鑑於實現鎖的方式不同,那麼這裡使用策略模式來組織程式碼

一、自旋鎖

分散式鎖抽象策略介面

package com.srr.lock;

/**
 * @Description 分散式鎖的介面
 */
abstract  public interface DistributedLock {
    /**
     * 獲取鎖
     */
    boolean lock();
    /**
     * 解鎖
     */
    void unlock();
}

自旋鎖策略抽象類,使用模板方法模式構建

package com.srr.lock;

/**
 * 自旋鎖策略模板
 */
public abstract class SpinRedisLockStrategy implements DistributedLock {

    private static final Integer retry = 50; //預設重試5次
    private static final Long sleeptime = 100L;
    protected String lockKey;
    protected String requestId;
    protected int expireTime;

    private SpinRedisLockStrategy(){}
    public SpinRedisLockStrategy(String lockKey, String requestId, int expireTime){
        this.lockKey=lockKey;
        this.requestId=requestId;
        this.expireTime=expireTime;
    }
    /**
     * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現
     */
    @Override
    public boolean lock() {
        Boolean flag = false;
        try {
            for (int i=0;i<retry;i++){
                flag = tryLock();
                if(flag){
                    System.out.println(Thread.currentThread().getName()+"獲取鎖成功");
                    break;
                }
                Thread.sleep(sleeptime);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
         return flag;
    }
    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryLock() ;

    /**
     * 解鎖:刪除key
     */
    @Override
    public  abstract void unlock();
}

自旋鎖實現子類

package com.srr.lock;

import redis.clients.jedis.Jedis;

import java.util.Collections;

/**
 * 自旋鎖
 */
public class SpinRedisLock extends SpinRedisLockStrategy{

    private static final Long RELEASE_SUCCESS = 1L;
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    public SpinRedisLock(String lockKey, String requestId, int expireTime) {
        super(lockKey,requestId, expireTime);
    }

    @Override
    protected boolean tryLock() {
        Jedis jedis = new Jedis("localhost", 6379);  //建立客戶端,1p和埠號
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    @Override
    public void unlock() {
        Jedis jedis = new Jedis("localhost", 6379);  //建立客戶端,1p和埠號
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            System.out.println("lock is unlock");
        }
    }
}

至此,自旋鎖方式實現分散式鎖就完成了,下面來看排他鎖阻塞的方式實現

二、排他鎖

  在實現之前需要大家搞懂一個概念,也就是redis的事件通知:

/**
* 鍵空間通知,所有通知以 keyspace@ 為字首
* 鍵事件通知,所有通知以 keyevent@ 為字首
* 所有命令都只在鍵真的被改動了之後,才會產生通知,比如刪除foo會產生
* 鍵空間通知
* “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set”
* 和鍵事件通知
* “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo”
*/

   搞懂概念之後,需要在redis的配置檔案redis.conf中將其 notify-keyspace-events "KEA",預設為notify-keyspace-events "",這樣才能啟動redis的事件監聽機制。

   排它鎖策略抽象類

   

package com.srr.lock;

import redis.clients.jedis.Jedis;

/**
 * @Description  阻塞獲取鎖,模板類
 */
public abstract class BlockingRedisLockStrategy implements DistributedLock {

    protected String lockKey;
    protected String requestId;
    protected int expireTime;

    private BlockingRedisLockStrategy(){}
    public BlockingRedisLockStrategy(String lockKey, String requestId,int expireTime){
        this.lockKey=lockKey;
        this.requestId=requestId;
        this.expireTime=expireTime;
    }
    /**
     * 模板方法,搭建的獲取鎖的框架,具體邏輯交於子類實現
     * @throws Exception
     */
    @Override
    public final boolean lock() {
        //獲取鎖成功
        if (tryLock()){
            System.out.println(Thread.currentThread().getName()+"獲取鎖成功");
            return true;
        }else{  //獲取鎖失敗
            //阻塞一直等待
            waitLock();
            //遞迴,再次獲取鎖
            return lock();
        }
    }
    /**
     * 嘗試獲取鎖,子類實現
     */
    protected abstract boolean tryLock() ;
    /**
     * 等待獲取鎖,子類實現
     */
    protected abstract void waitLock();
    /**
     * 解鎖:刪除key
     */
    @Override
    public  abstract void unlock();
}

排他鎖實現子類

package com.srr.lock;

import redis.clients.jedis.Jedis;

import java.util.Collections;

/**
 * 排他鎖,阻塞
 */
public class BlockingRedisLock extends BlockingRedisLockStrategy {

    private static final Long RELEASE_SUCCESS = 1L;
    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    public BlockingRedisLock(String lockKey, String requestId, int expireTime) {
        super(lockKey,requestId, expireTime);
    }


    /**
     * 嘗試獲取分散式鎖
     * @return 是否獲取成功
     */
    @Override
    public boolean tryLock() {
        Jedis jedis = new Jedis("localhost", 6379);  //建立客戶端,1p和埠號
        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }

    @Override
    public void waitLock() {
        //判斷key是否存在
        Jedis jedis = new Jedis("localhost", 6379);  //建立客戶端,1p和埠號
        KeyExpiredListener keyExpiredListener = new KeyExpiredListener();
        /**
         * 鍵空間通知,所有通知以 keyspace@ 為字首
         * 鍵事件通知,所有通知以 keyevent@ 為字首
         * 所有命令都只在鍵真的被改動了之後,才會產生通知,比如刪除foo會產生
         * 鍵空間通知
         * “pmessage”,"__ key*__ : * “,”__ keyspace@0__:foo",“set”
         * 和鍵事件通知
         * “pmessage”,"__ key*__ : *","__ keyevent@0__:set",“foo”
         */
        //如果要監聽某個key的執行了什麼操作,就訂閱__ keyspace@0__,監聽某種操作動了哪些key,就訂閱__ keyevent@0__
        //這裡我們需要監聽分散式鎖的鍵被刪除了,所以要監聽刪除動作"__keyspace@0__:"+key
        jedis.psubscribe(keyExpiredListener, "__keyspace@0__:"+lockKey);
        System.out.println("over");
    }

    /**
     * 釋放分散式鎖
     * @return 是否釋放成功
     */
    @Override
    public void unlock() {
        Jedis jedis = new Jedis("localhost", 6379);  //建立客戶端,1p和埠號
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
        if (RELEASE_SUCCESS.equals(result)) {
            System.out.println("lock is unlock");
        }
    }
}

redis事件監聽類

package com.srr.lock;


import redis.clients.jedis.JedisPubSub;

/**
 * redis 事件監聽器
 */
public class KeyDelListener extends JedisPubSub {
    public KeyDelListener(){

    }
    // 初始化訂閱時候的處理
    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) {
    }

    // 取得訂閱的訊息後的處理
    @Override
    public void onPMessage(String pattern, String channel, String message) {
        System.out.println("message == "+message);
        this.punsubscribe();
        System.out.println("unsubscribe == "+message);
    }
}

到這裡排他鎖的完整程式碼就寫完了,其實對比一下,兩者的區別在於lock的實現方式不同,筆者為了確保程式碼完整性就全部貼上了。

程式碼寫完了那麼給一個場景測試一下我們的程式碼有沒有問題,請看下面的測試程式碼:

這裡我們構建一個Lock工具類:

package com.srr.lock;

/**
 * 鎖工具類
 */
public class Lock {
    /**
     * 獲取鎖
     */
    boolean lock(DistributedLock lock) {
        return lock.lock();
    };

    /**
     * 釋放鎖
     */
    void unlock(DistributedLock lock) {
        lock.unlock();
    };
}

測試類:

package com.srr.lock;

import redis.clients.jedis.Jedis;

/**
 *  測試場景
 *  count從1加到101
 *  使用redis分散式鎖在分散式環境下保證結果正確
 */
public class T {

    volatile int  count = 1;

    public void inc(){
        for(int i = 0;i<100;i++){
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            count++;
            System.out.println("count == "+count);
        }
    }

    public int getCount(){
       return count;
    }

    public static void main(String[] args) {
        final T t = new T();
        final Lock lock = new Lock();
        //final RedisLock redisLock = new BlockingRedisLock("","1",100000,jedis);
        final DistributedLock distributedLock = new SpinRedisLock("test","1",100000);
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(lock.lock(distributedLock)){
                    t.inc();
                    System.out.println("t1 running");
                    System.out.println("t1 == count == "+ t.getCount());
                    lock.unlock(distributedLock);
                }

            }
        });

        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                if(lock.lock(distributedLock)) {
                    t.inc();
                    System.out.println("t2 running");
                    System.out.println("t2 == count == " + t.getCount());
                    lock.unlock(distributedLock);
                }

            }
        });

        t1.start();
        t2.start();
    }
}

測試結果:

 到這裡,全部程式碼就完成了,如果想使用zookeeper實現分散式鎖只需要抽象出一個策略類實現DistributedLock介面即可。是不是很方便呢。

 原創不易,多多關注!

&n