1. 程式人生 > >分散式鎖, 註解形式, 搞定SpringBoot定時任務@Scheduled 在叢集下的優化

分散式鎖, 註解形式, 搞定SpringBoot定時任務@Scheduled 在叢集下的優化

SpringBoot提供了 Schedule模組完美支援定時任務的執行

在實際開發中由於專案部署在分散式或叢集伺服器上 會導致定時任務多次觸發

因此,使用redis分佈鎖機制可以有效避免多次執行定時任務

  核心方法是org.springframework.data.redis.core包下的

 setIfAbsent() 方法 返回值為布林型別

  方法類似redis的SETNX命令 即”SET if Not Exists”

  伺服器在執行郵件定時傳送任務之前會向redis快取中寫入lock_key即任務鎖 表明此伺服器正在執行定時任務

  另一臺伺服器在寫入鎖時 由於鎖已經存在就不做任何操作

  執行定時任務的伺服器在執行完成後需釋放任務鎖

 

具體程式碼實現如下:

定義註解:

/**
 * redis鎖註解
 * @author zhouzhou
 */
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
@Documented
public @interface RedisLock {

    String lockPrefix() default "";
    String lockKey() default "";
    long timeOut() default 5;
    TimeUnit timeUnit() default TimeUnit.SECONDS;

}

定義切面@Aspect, pointCut就是 RedisLock註解



/**
 * Description: redis鎖攔截器實現
 * User: zhouzhou
 * Date: 2018-09-05
 * Time: 15:30
 */
@Aspect
@Component
public class RedisLockAspect {

    private static final Integer MAX_RETRY_COUNT = 3;
    private static final String LOCK_PRE_FIX = "lockPreFix";
    private static final String LOCK_KEY = "lockKey";
    private static final String TIME_OUT = "timeOut";
    private static final int PROTECT_TIME = 2 << 11;//4096

    private static final Logger log = LoggerFactory.getLogger(RedisLockAspect.class);

    @Autowired
    private CommonRedisHelper commonRedisHelper;


    @Pointcut("@annotation(com.shuige.components.cache.annotation.RedisLock)")
    public void redisLockAspect() {
    }

    @Around("redisLockAspect()")
    public void lockAroundAction(ProceedingJoinPoint proceeding) throws Exception {
        // 由於本專案未實現Redisson,所以通過隨機形式
        Thread.sleep((int) (Math.random() * PROTECT_TIME));
        //獲取redis鎖
        Boolean flag = this.getLock(proceeding, 0, System.currentTimeMillis());
        if (flag) {
            try {
                proceeding.proceed();
                Thread.sleep(PROTECT_TIME);
            } catch (Throwable throwable) {
                throw new RuntimeException("分散式鎖執行發生異常" + throwable.getMessage(), throwable);
            } finally {
                // 刪除鎖
                this.delLock(proceeding);
            }
        } else {
            log.info("其他系統正在執行此項任務");
        }

    }

    /**
     * 獲取鎖
     *
     * @param proceeding
     * @return
     */
    private boolean getLock(ProceedingJoinPoint proceeding, int count, long currentTime) {
        //獲取註解中的引數
        Map<String, Object> annotationArgs = this.getAnnotationArgs(proceeding);
        String lockPrefix = (String) annotationArgs.get(LOCK_PRE_FIX);
        String key = (String) annotationArgs.get(LOCK_KEY);
        long expire = (long) annotationArgs.get(TIME_OUT);
        //String key = this.getFirstArg(proceeding);
        if (StringUtils.isEmpty(lockPrefix) || StringUtils.isEmpty(key)) {
            // 此條執行不到
            throw new RuntimeException("RedisLock,鎖字首,鎖名未設定");
        }
        if (commonRedisHelper.setNx(lockPrefix, key, expire)) {
            return true;
        } else {
            // 如果當前時間與鎖的時間差, 大於保護時間,則強制刪除鎖(防止鎖死)
            long createTime = commonRedisHelper.getLockValue(lockPrefix, key);
            if ((currentTime - createTime) > (expire + PROTECT_TIME)) {
                count ++;
                if (count > MAX_RETRY_COUNT){
                    return false;
                }
                commonRedisHelper.delete(lockPrefix, key);
                getLock(proceeding,count,currentTime);
            }
            return false;
        }
    }

    /**
     * 刪除鎖
     *
     * @param proceeding
     */
    private void delLock(ProceedingJoinPoint proceeding) {
        Map<String, Object> annotationArgs = this.getAnnotationArgs(proceeding);
        String lockPrefix = (String) annotationArgs.get(LOCK_PRE_FIX);
        String key = (String) annotationArgs.get(LOCK_KEY);
        commonRedisHelper.delete(lockPrefix, key);
    }

    /**
     * 獲取鎖引數
     *
     * @param proceeding
     * @return
     */
    private Map<String, Object> getAnnotationArgs(ProceedingJoinPoint proceeding) {
        Class target = proceeding.getTarget().getClass();
        Method[] methods = target.getMethods();
        String methodName = proceeding.getSignature().getName();
        for (Method method : methods) {
            if (method.getName().equals(methodName)) {
                Map<String, Object> result = new HashMap<String, Object>();
                RedisLock redisLock = method.getAnnotation(RedisLock.class);
                result.put(LOCK_PRE_FIX, redisLock.lockPrefix());
                result.put(LOCK_KEY, redisLock.lockKey());
                result.put(TIME_OUT, redisLock.timeUnit().toSeconds(redisLock.timeOut()));
                return result;
            }
        }
        return null;
    }

    /**
     * 獲取第一個String型別的引數為鎖的業務引數
     *
     * @param proceeding
     * @return
     */
    @Deprecated
    public String getFirstArg(ProceedingJoinPoint proceeding) {
        Object[] args = proceeding.getArgs();
        if (args != null && args.length > 0) {
            for (Object object : args) {
                String type = object.getClass().getName();
                if ("java.lang.String".equals(type)) {
                    return (String) object;
                }
            }
        }
        return null;
    }

}

CommonRedisHelper



/**
 * Description:
 * User: zhouzhou
 * Date: 2018-09-05
 * Time: 15:39
 */
@Component
public class CommonRedisHelper {

    @Autowired
    RedisTemplate redisTemplate;

    /**
     * 加分散式鎖
     *
     * @param track
     * @param sector
     * @param timeout
     * @return
     */
    public boolean setNx(String track, String sector, long timeout) {
        ValueOperations valueOperations = redisTemplate.opsForValue();

        Boolean flag = valueOperations.setIfAbsent(track + sector, System.currentTimeMillis());
        // 如果成功設定超時時間, 防止超時
        if (flag) {
            valueOperations.set(track + sector, getLockValue(track, sector), timeout, TimeUnit.SECONDS);
        }
        return flag;
    }

    /**
     * 刪除鎖
     *
     * @param track
     * @param sector
     */
    public void delete(String track, String sector) {
        redisTemplate.delete(track + sector);
    }

    /**
     * 查詢鎖
     * @return 寫鎖時間
     */
    public long getLockValue(String track, String sector) {
        ValueOperations valueOperations = redisTemplate.opsForValue();
        long createTime = (long) valueOperations.get(track + sector);
        return createTime;
    }

}