使用Redisson通過自定義註解實現分散式鎖,使用Spring AOP簡化分散式鎖
Redisson概述
Redisson是一個在Redis的基礎上實現的Java駐記憶體資料網格(In-Memory Data Grid)。它不僅提供了一系列的分散式的Java常用物件,還提供了許多分散式服務。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。
Redisson底層採用的是Netty 框架。支援Redis 2.8以上版本,支援Java1.6+以上版本。
關於Redisson更多詳細介紹,可參考Redssion概述
Redisson提供的分散式鎖
可重入鎖
Redisson的分散式可重入鎖RLock Java物件實現了java.util.concurrent.locks.Lock介面,同時還支援自動過期解鎖。下面是RLock的基本使用方法:
RLock lock = redisson.getLock("anyLock");
// 最常見的使用方法
lock.lock();
// 支援過期解鎖功能
// 10秒鐘以後自動解鎖
// 無需呼叫unlock方法手動解鎖
lock .lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
Redisson同時還為分散式鎖提供了非同步執行的相關方法:
RLock lock = redisson.getLock("anyLock");
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(100, 10 , TimeUnit.SECONDS);
公平鎖
Redisson分散式可重入公平鎖也是實現了java.util.concurrent.locks.Lock介面的一種RLock物件。在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端執行緒同時請求加鎖時,優先分配給先發出請求的執行緒。
RLock fairLock = redisson.getFairLock("anyLock");
// 最常見的使用方法
fairLock.lock();
// 支援過期解鎖功能
// 10秒鐘以後自動解鎖
// 無需呼叫unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以後10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
...
fairLock.unlock();
其他鎖
Redisson還提供了其他機制的鎖,如聯鎖(MultiLock)、紅鎖(RedLock)等。詳細可參考:分散式鎖和同步器
使用Redisson實現分散式鎖
- 定義回撥介面
/**
* 分散式鎖回撥介面
*/
public interface DistributedLockCallback<T> {
/**
* 呼叫者必須在此方法中實現需要加分散式鎖的業務邏輯
*
* @return
*/
public T process();
/**
* 得到分散式鎖名稱
*
* @return
*/
public String getLockName();
}
- 定義分散式鎖模板
/**
* 分散式鎖操作模板
*/
public interface DistributedLockTemplate {
long DEFAULT_WAIT_TIME = 30;
long DEFAULT_TIMEOUT = 5;
TimeUnit DEFAULT_TIME_UNIT = TimeUnit.SECONDS;
/**
* 使用分散式鎖,使用鎖預設超時時間。
* @param callback
* @param fairLock 是否使用公平鎖
* @return
*/
<T> T lock(DistributedLockCallback<T> callback, boolean fairLock);
/**
* 使用分散式鎖。自定義鎖的超時時間
*
* @param callback
* @param leaseTime 鎖超時時間。超時後自動釋放鎖。
* @param timeUnit
* @param fairLock 是否使用公平鎖
* @param <T>
* @return
*/
<T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock);
/**
* 嘗試分散式鎖,使用鎖預設等待時間、超時時間。
* @param callback
* @param fairLock 是否使用公平鎖
* @param <T>
* @return
*/
<T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock);
/**
* 嘗試分散式鎖,自定義等待時間、超時時間。
* @param callback
* @param waitTime 獲取鎖最長等待時間
* @param leaseTime 鎖超時時間。超時後自動釋放鎖。
* @param timeUnit
* @param fairLock 是否使用公平鎖
* @param <T>
* @return
*/
<T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock);
}
- 實現分散式鎖模板
public class SingleDistributedLockTemplate implements DistributedLockTemplate {
private RedissonClient redisson;
public SingleDistributedLockTemplate() {
}
public SingleDistributedLockTemplate(RedissonClient redisson) {
this.redisson = redisson;
}
@Override
public <T> T lock(DistributedLockCallback<T> callback, boolean fairLock) {
return lock(callback, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
}
@Override
public <T> T lock(DistributedLockCallback<T> callback, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
RLock lock = getLock(callback.getLockName(), fairLock);
try {
lock.lock(leaseTime, timeUnit);
return callback.process();
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
}
@Override
public <T> T tryLock(DistributedLockCallback<T> callback, boolean fairLock) {
return tryLock(callback, DEFAULT_WAIT_TIME, DEFAULT_TIMEOUT, DEFAULT_TIME_UNIT, fairLock);
}
@Override
public <T> T tryLock(DistributedLockCallback<T> callback, long waitTime, long leaseTime, TimeUnit timeUnit, boolean fairLock) {
RLock lock = getLock(callback.getLockName(), fairLock);
try {
if (lock.tryLock(waitTime, leaseTime, timeUnit)) {
return callback.process();
}
} catch (InterruptedException e) {
} finally {
if (lock != null && lock.isLocked()) {
lock.unlock();
}
}
return null;
}
private RLock getLock(String lockName, boolean fairLock) {
RLock lock;
if (fairLock) {
lock = redisson.getFairLock(lockName);
} else {
lock = redisson.getLock(lockName);
}
return lock;
}
public void setRedisson(RedissonClient redisson) {
this.redisson = redisson;
}
}
- 使用SingleDistributedLockTemplate
DistributedLockTemplate lockTemplate = ...;
final String lockName = ...;
lockTemplate.lock(new DistributedLockCallback<Object>() {
@Override
public Object process() {
//do some business
return null;
}
@Override
public String getLockName() {
return lockName;
}
}, false);
但是每次使用分散式鎖都要寫類似上面的重複程式碼,有沒有什麼方法可以只關注核心業務邏輯程式碼的編寫,即上面的"do some business"。下面介紹如何使用Spring AOP來實現這一目標。
使用Spring AOP簡化分散式鎖
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DistributedLock {
/**
* 鎖的名稱。
* 如果lockName可以確定,直接設定該屬性。
*/
String lockName() default "";
/**
* lockName字尾
*/
String lockNamePre() default "";
/**
* lockName字尾
*/
String lockNamePost() default "lock";
/**
* 獲得鎖名時拼接前後綴用到的分隔符
* @return
*/
String separator() default ".";
/**
* <pre>
* 獲取註解的方法引數列表的某個引數物件的某個屬性值來作為lockName。因為有時候lockName是不固定的。
* 當param不為空時,可以通過argNum引數來設定具體是引數列表的第幾個引數,不設定則預設取第一個。
* </pre>
*/
String param() default "";
/**
* 將方法第argNum個引數作為鎖
*/
int argNum() default 0;
/**
* 是否使用公平鎖。
* 公平鎖即先來先得。
*/
boolean fairLock() default false;
/**
* 是否使用嘗試鎖。
*/
boolean tryLock() default false;
/**
* 最長等待時間。
* 該欄位只有當tryLock()返回true才有效。
*/
long waitTime() default 30L;
/**
* 鎖超時時間。
* 超時時間過後,鎖自動釋放。
* 建議:
* 儘量縮簡需要加鎖的邏輯。
*/
long leaseTime() default 5L;
/**
* 時間單位。預設為秒。
*/
TimeUnit timeUnit() default TimeUnit.SECONDS;
}
- 定義切面織入的程式碼
@Aspect
@Component
public class DistributedLockAspect {
@Autowired
private DistributedLockTemplate lockTemplate;
@Pointcut("@annotation(cn.sprinkle.study.distributedlock.common.annotation.DistributedLock)")
public void DistributedLockAspect() {}
@Around(value = "DistributedLockAspect()")
public Object doAround(ProceedingJoinPoint pjp) throws Throwable {
//切點所在的類
Class targetClass = pjp.getTarget().getClass();
//使用了註解的方法
String methodName = pjp.getSignature().getName();
Class[] parameterTypes = ((MethodSignature)pjp.getSignature()).getMethod().getParameterTypes();
Method method = targetClass.getMethod(methodName, parameterTypes);
Object[] arguments = pjp.getArgs();
final String lockName = getLockName(method, arguments);
return lock(pjp, method, lockName);
}
@AfterThrowing(value = "DistributedLockAspect()", throwing="ex")
public void afterThrowing(Throwable ex) {
throw new RuntimeException(ex);
}
public String getLockName(Method method, Object[] args) {
Objects.requireNonNull(method);
DistributedLock annotation = method.getAnnotation(DistributedLock.class);
String lockName = annotation.lockName(),
param = annotation.param();
if (isEmpty(lockName)) {
if (args.length > 0) {
if (isNotEmpty(param)) {
Object arg;
if (annotation.argNum() > 0) {
arg = args[annotation.argNum() - 1];
} else {
arg = args[0];
}
lockName = String.valueOf(getParam(arg, param));
} else if (annotation.argNum() > 0) {
lockName = args[annotation.argNum() - 1].toString();
}
}
}
if (isNotEmpty(lockName)) {
String preLockName = annotation.lockNamePre(),
postLockName = annotation.lockNamePost(),
separator = annotation.separator();
StringBuilder lName = new StringBuilder();
if (isNotEmpty(preLockName)) {
lName.append(preLockName).append(separator);
}
lName.append(lockName);
if (isNotEmpty(postLockName)) {
lName.append(separator).append(postLockName);
}
lockName = lName.toString();
return lockName;
}
throw new IllegalArgumentException("Can't get or generate lockName accurately!");
}
/**
* 從方法引數獲取資料
*
* @param param
* @param arg 方法的引數陣列
* @return
*/
public Object getParam(Object arg, String param) {
if (isNotEmpty(param) && arg != null) {
try {
Object result = PropertyUtils.getProperty(arg, param);
return result;
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException(arg + "沒有屬性" + param + "或未實現get方法。", e);
} catch (Exception e) {
throw new RuntimeException("", e);
}
}
return null;
}
public Object lock(ProceedingJoinPoint pjp, Method method, final String lockName) {
DistributedLock annotation = method.getAnnotation(DistributedLock.class);
boolean fairLock = annotation.fairLock();
boolean tryLock = annotation.tryLock();
if (tryLock) {
return tryLock(pjp, annotation, lockName, fairLock);
} else {
return lock(pjp,lockName, fairLock);
}
}
public Object lock(ProceedingJoinPoint pjp, final String lockName, boolean fairLock) {
return lockTemplate.lock(new DistributedLockCallback<Object>() {
@Override
public Object process() {
return proceed(pjp);
}
@Override
public String getLockName() {
return lockName;
}
}, fairLock);
}
public Object tryLock(ProceedingJoinPoint pjp, DistributedLock annotation, final String lockName, boolean fairLock) {
long waitTime = annotation.waitTime(),
leaseTime = annotation.leaseTime();
TimeUnit timeUnit = annotation.timeUnit();
return lockTemplate.tryLock(new DistributedLockCallback<Object>() {
@Override
public Object process() {
return proceed(pjp);
}
@Override
public String getLockName() {
return lockName;
}
}, waitTime, leaseTime, timeUnit, fairLock);
}
public Object proceed(ProceedingJoinPoint pjp) {
try {
return pjp.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
private boolean isEmpty(Object str) {
return str == null || "".equals(str);
}
private boolean isNotEmpty(Object str) {
return !isEmpty(str);
}
}
有了上面兩段程式碼,以後需要用到分散式鎖,只需在核心業務邏輯方法添加註解@DistributedLock,並設定LockName、fairLock等即可。下面的DistributionService演示了多種使用情景。
@Service
public class DistributionService {
@Autowired
private RedissonClient redissonClient;
@DistributedLock(param = "id", lockNamePost = ".lock")
public Integer aspect(Person person) {
RMap<String, Integer> map = redissonClient.getMap("distributionTest");
Integer count = map.get("count");
if (count > 0) {
count = count - 1;
map.put("count", count);
}
return count;
}
@DistributedLock(argNum = 1, lockNamePost = ".lock")
public Integer aspect(String i) {
RMap<String, Integer> map = redissonClient.getMap("distributionTest");
Integer count = map.get("count");
if (count > 0) {
count = count - 1;
map.put("count", count);
}
return count;
}
@DistributedLock(lockName = "lock", lockNamePost = ".lock")
public int aspect(Action<Integer> action) {
return action.action();
}
}
- 測試
定義一個Worker類:
public class Worker implements Runnable {
private final CountDownLatch startSignal;
private final CountDownLatch doneSignal;
private final DistributionService service;
private RedissonClient redissonClient;
public Worker(CountDownLatch startSignal, CountDownLatch doneSignal, DistributionService service, RedissonClient redissonClient) {
this.startSignal = startSignal;
this.doneSignal = doneSignal;
this.service = service;
this.redissonClient = redissonClient;
}
@Override
public void run