原理

通過執行緒安全findAndModify 實現鎖

實現

定義鎖儲存物件:

/**
* mongodb 分散式鎖
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@Document(collection = "distributed-lock-doc")
public class LockDocument {
@Id
private String id;
private long expireAt;
private String token;
}

定義Lock API:

public interface LockService {

    String acquire(String key, long expiration);

    boolean release(String key, String token);

    boolean refresh(String key, String token, long expiration);
}

獲取鎖:

 	@Override
public String acquire(String key, long expiration) {
Query query = Query.query(Criteria.where("_id").is(key));
String token = this.generateToken();
Update update = new Update()
.setOnInsert("_id", key)
.setOnInsert("expireAt", System.currentTimeMillis() + expiration)
.setOnInsert("token", token); FindAndModifyOptions options = new FindAndModifyOptions().upsert(true)
.returnNew(true);
LockDocument doc = mongoTemplate.findAndModify(query, update, options,
LockDocument.class);
boolean locked = doc.getToken() != null && doc.getToken().equals(token); // 如果已過期
if (!locked && doc.getExpireAt() < System.currentTimeMillis()) {
DeleteResult deleted = this.mongoTemplate.remove(
Query.query(Criteria.where("_id").is(key)
.and("token").is(doc.getToken())
.and("expireAt").is(doc.getExpireAt())),
LockDocument.class);
if (deleted.getDeletedCount() >= 1) {
// 成功釋放鎖, 再次嘗試獲取鎖
return this.acquire(key, expiration);
}
} log.debug("Tried to acquire lock for key {} with token {} . Locked: {}",
key, token, locked);
return locked ? token : null;
}

原理:

  • 先嚐試upsert鎖物件,如果成功且token一致,說明拿到鎖
  • 否則加鎖失敗
  • 如果未拿到鎖,但是鎖已過期,嘗試刪除鎖
    • 如果刪除成功,再次嘗試拿鎖
    • 如果失敗,說明鎖可能已經續期了

釋放和續期鎖:


@Override
public boolean release(String key, String token) {
Query query = Query.query(Criteria.where("_id").is(key)
.and("token").is(token));
DeleteResult deleted = mongoTemplate.remove(query, LockDocument.class);
boolean released = deleted.getDeletedCount() == 1;
if (released) {
log.debug("Remove query successfully affected 1 record for key {} with token {}",
key, token);
} else if (deleted.getDeletedCount() > 0) {
log.error("Unexpected result from release for key {} with token {}, released {}",
key, token, deleted);
} else {
log.error("Remove query did not affect any records for key {} with token {}",
key, token);
} return released;
} @Override
public boolean refresh(String key, String token,
long expiration) {
Query query = Query.query(Criteria.where("_id").is(key)
.and("token").is(token));
Update update = Update.update("expireAt",
System.currentTimeMillis() + expiration);
UpdateResult updated =
mongoTemplate.updateFirst(query, update, LockDocument.class); final boolean refreshed = updated.getModifiedCount() == 1;
if (refreshed) {
log.debug("Refresh query successfully affected 1 record for key {} " +
"with token {}", key, token);
} else if (updated.getModifiedCount() > 0) {
log.error("Unexpected result from refresh for key {} with token {}, " +
"released {}", key, token, updated);
} else {
log.warn("Refresh query did not affect any records for key {} with token {}. " +
"This is possible when refresh interval fires for the final time " +
"after the lock has been released",
key, token);
} return refreshed;
}

使用


private LockService lockService; private void tryAcquireLockAndSchedule() {
while (!this.stopSchedule) {
// 嘗試拿鎖
this.token = this.lockService.acquire(SCHEDULER_LOCK, 20000);
if (this.token != null) {
// 拿到鎖
} else {
// 等待LOCK_EXPIRATION, 再次嘗試
Thread.sleep(LOCK_EXPIRATION);
}
}
}
  • 先嚐試拿鎖,如果獲取到token,說明拿鎖成功
  • 否則可以sleep一段時間後再拿鎖

完整程式碼,可到github檢視 https://github.com/jadepeng/docker-pipeline/blob/main/pipeline-master/src/main/java/com/github/jadepeng/pipeline/service/impl/MongoLockService.java


感謝您的認真閱讀。

如果你覺得有幫助,歡迎點贊支援!

不定期分享軟體開發經驗,歡迎關注作者, 一起交流軟體開發: