1. 程式人生 > >分散式鎖(redis\ip\database)的一些實現方式及在實際中的應用

分散式鎖(redis\ip\database)的一些實現方式及在實際中的應用

分散式鎖,在分散式架構中的鎖的實現。接觸到的使用情形主要分為兩種情況:

一:該資源只允許一次訪問,請注意,此種情況更像是一種獨佔,即併發情況下,只有一個執行緒能獲取該資源,其他執行緒則放棄該資源。

二:該資源每次只允許一個執行緒訪問,其他執行緒會不斷嘗試獲取這個鎖,直到所有執行緒都訪問了這個資源。

1。redis實現分散式鎖

在redis中設定一個鎖名,和value值,再為這個鎖名設定超時時間。

每次執行緒來訪問先判斷,這個鎖名是否在redis中已經存在。如果存在則直接返回false表示該執行緒並未獲取到鎖。

設定超時時間,是為了防止死鎖(某一執行緒獲取鎖後未釋放該鎖)。

程式碼:

/**
 * 加鎖
 *
 * @param lockName 鎖名
 * @param timeout 超時時間 秒
 * @return
 */
@Override
public boolean lock(String lockName, long timeout) {
    String  value = UUIDUtil.getUUID();
    String key = "key"+lockName;
    System.out.println(Thread.currentThread().getName()+"正在初步獲取鎖!");
   if(redisUtil.setIfAbsent(key,value)) {
       redisUtil.expire(key,timeout);
       return true;
   }
    return false;
}
/**
 * 釋放鎖
 *
 * @param lockName
 */
@Override
public void unLock(String lockName) {
redisUtil.del(lockName);
}

redisUtils中的方法:

/**
 * 當且僅當key不存在時,set一個key為val的字串,返回true;若key存在,則什麼都不做返回false
 *
 * @param key
 * @param value
 * @return
 */
public boolean setIfAbsent(String key,String value){
   return redisTemplate.opsForValue().setIfAbsent(key,value);
}
/**
 * 指定快取失效時間
 * @param key 鍵
 * @param time 時間(秒)
 * @return
 */
public boolean expire(String key,long time){
    try {
        if(time>0){
            redisTemplate.expire(key, time, TimeUnit.SECONDS);
        }
        return true;
    } catch (Exception e) {
        e.printStackTrace();
        return false;
    }
}
/**
 * 刪除快取
 * @param key 可以傳一個值 或多個
 */
@SuppressWarnings("unchecked")
public void del(String ... key){
    if(key!=null&&key.length>0){
        if(key.length==1){
            redisTemplate.delete(key[0]);
        }else{
            redisTemplate.delete(CollectionUtils.arrayToList(key));
        }
    }
}

2.ip鎖,通過獲取伺服器的ip和配置的可執行的ip是否一致,一致即獲取了鎖。及指定每次獲取資源的伺服器(這種其實就是把分散式變成了單機版)

public class IpLock implements ConcurrentLock {

 private  String hostAddress = IPUtil.getHostAddress();
    @Override
    public boolean lock(String lockName, long timeout) {
        String server_ip = PropertiesUtils.getApplicationProperties().getProperty("server_Ip");
        if(StringUtils.isNotBlank(hostAddress)&&StringUtils.isNotBlank(server_ip)){
            if(server_ip.equals(hostAddress)||server_ip.equals("localhost")){
                return true;
            }
        }
        return false;
    }

    @Override
    public void unLock(String lockName) {

    }
}

3.資料庫鎖,依賴資料庫的行級鎖,每次只能有一個執行緒獲取到update的鎖。如果更新成功表示獲取到了鎖。未更新成功即沒有拿到鎖。

public class dbLock implements ConcurrentLock {

    private static Logger logger = LoggerFactory.getLogger(dbLock.class);


    private DataSource dataSource;

    private ThreadLocal<Connection> connectionCatch = new ThreadLocal<>();

    @Override
    public boolean lock(String lockName, long timeout) {
        Connection connection = null;
        PreparedStatement ps = null;
        String sql = "update tb_dataBase_lock t set t.lock_version = t.lock_version + 1, t.update_date = sysdate where t.lock_name = ? ";
        try {
            connection = dataSource.getConnection();
            if(connection!=null&& !connection.isClosed()){
                connectionCatch.set(connection);
                connection.setAutoCommit(false);
            PreparedStatement statement = connection.prepareStatement(sql);
            statement.setString(1,lockName);
            int i = statement.executeUpdate();
            connection.close();
            return i==1;
            }
            return false;
        } catch (SQLException e) {
            connectionCatch.remove();
            logger.error(e.getMessage(),e);
            try {
                if(connection!=null&&!connection.isClosed()){
                    connection.close();
                }
                if(ps !=null && ps.isClosed()){
                    ps.close();
                }
            } catch (SQLException e1) {
                logger.error(e.getMessage(),e);
            }
        }
        return false;
    }

    @Override
    public void unLock(String lockName) {
        Connection connection = connectionCatch.get();
        try {
            if(connection == null|| connection.isClosed())return;
            connection.commit();
            connection.close();
        }catch (Exception e){
            logger.error(e.getMessage());
        }
        connectionCatch.remove();
    }

    public void setDataSource(DataSource dataSource) {
        this.dataSource = dataSource;
    }

4.zookeeperLock,利用zookeeper來實現分散式鎖。

未完待續........

實際應用:通過AOP來統一寫鎖的邏輯。

自定義註解作為切入點(pointCut)。

定義鎖的執行邏輯類作為通知(advice),具體可以實現methodInterceptor.

public class ConcurrentLockInterceptor implements MethodInterceptor {

    private Logger logger = LoggerFactory.getLogger(ConcurrentLockInterceptor.class);

    private Map<String,ConcurrentLock> locks;

    public void setLocks(Map<String, ConcurrentLock> locks) {
        this.locks = locks;
    }

    @Override
    public Object invoke(MethodInvocation invocation) throws Throwable {
        LockRequired lockRequired = invocation.getMethod().getAnnotation(LockRequired.class);
        if(lockRequired == null){
            return invocation.proceed();
        }
        String lockName = lockRequired.lockName();
        if(StringUtils.isBlank(lockName)){
            lockName = invocation.getThis().getClass().getSimpleName()+"."+invocation.getMethod().getName();
        }
        ConcurrentLock lock = locks.get("redis");
        if(lockRequired.strategy().equals(LockRequired.LockStrategy.IP)){
            lock= locks.get("ip");
        }else if(lockRequired.strategy().equals(LockRequired.LockStrategy.DB)){
            lock = locks.get("db");
        }

        boolean locked =false;
        try {
            locked = lock.lock(lockName, lockRequired.timeout());
            if (locked) {
                logger.info("伺服器【" + IPUtil.getHostAddress() + "】獲取了鎖【" + lockName + "】");
                invocation.proceed();
                lock.unLock(lockName);
            } else {
                logger.info("伺服器【" + IPUtil.getHostAddress() + "】沒獲取到鎖【" + lockName + "】跳過執行方法!");
            }
        }catch (Exception e){
            logger.error(e.getMessage());
            if(locked){
                lock.unLock(lockName);
                logger.debug("伺服器["+IPUtil.getHostAddress()+"]成功釋放鎖["+lockName+"]");
            }

        }
        return null;
    }
}