1. 程式人生 > >分散式鎖(一):基於redis的分散式鎖實現

分散式鎖(一):基於redis的分散式鎖實現

隨著業務越來越複雜,應用服務都會朝著分散式、叢集方向部署,而分散式CAP原則告訴我們,Consistency(一致性)、 Availability(可用性)、Partition tolerance(分割槽容錯性),三者不可得兼。

很多場景中,需要使用分散式事務、分散式鎖等技術來保證資料最終一致性。有的時候,我們需要保證某一方法同一時刻只能被一個執行緒執行。在單機(單程序)環境中,JAVA提供了很多併發相關API,但在多機(多程序)環境中就無能為力了。

對於分散式鎖,最好能夠滿足以下幾點:

1.可以保證在分散式部署的應用叢集中,同一個方法在同一時間只能被一臺機器上的一個執行緒執行
2.這把鎖要是一把可重入鎖(避免死鎖)
3.這把鎖最好是一把阻塞鎖
4.有高可用的獲取鎖和釋放鎖功能
5.獲取鎖和釋放鎖的效能要好
1.基於資料庫實現分散式鎖 
2.基於redis快取實現分散式鎖 
3.基於zookeeper實現分散式鎖

本篇文章介紹如何基於redis實現分散式鎖。對於第一種(基於資料庫)及第三種(基於zookeeper)的實現方式可以參考博文

首先奉上原始碼:原始碼

分散式同步鎖實現

實現思路

鎖的實現主要基於redis的SETNX命令(SETNX詳細解釋參考這裡),我們來看SETNX的解釋

SETNX key value
將 key 的值設為 value ,當且僅當 key 不存在。
若給定的 key 已經存在,則 SETNX 不做任何動作。
SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
返回值:
設定成功,返回 1 。
設定失敗,返回 0 。

使用SETNX完成同步鎖的流程及事項如下:

  1. 使用SETNX命令獲取鎖,若返回0(key已存在,鎖已存在)則獲取失敗,反之獲取成功
  2. 為了防止獲取鎖後程序出現異常,導致其他執行緒/程序呼叫SETNX命令總是返回0而進入死鎖狀態,需要為該key設定一個“合理”的過期時間
  3. 釋放鎖,使用DEL命令將鎖資料刪除

實現過程

建立同步鎖實現類

/**
 * 同步鎖
 *
 * @property key Redis key
 * @property stringRedisTemplate RedisTemplate
 * @property expire Redis TTL/秒
 * @property safetyTime 安全時間/秒
 */
class SyncLock(
        private val key: String,
        private val stringRedisTemplate: StringRedisTemplate,
        private val expire: Long,
        private val safetyTime: Long
)
key reids中的key,對應java api synchronized的物件
expire reids中key的過期時間

safetyTime 下文介紹其作用

實現鎖的獲取功能

private val value: String get() = Thread.currentThread().name

/**
 * 嘗試獲取鎖(立即返回)
 *
 * @return 是否獲取成功
 */
fun tryLock(): Boolean {
    val locked = stringRedisTemplate.opsForValue().setIfAbsent(key, value) ?: false
    if (locked) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    }
    return locked
}

這裡使用setIfAbsent函式(對應SETNX命令)嘗試設定key的值為value(當前執行緒id+執行緒名),若成功則同時設定key的過期時間並返回true,否則返回false。

實現帶超時時間的鎖獲取功能


private val waitMillisPer: Long = 10

/**
 * 嘗試獲取鎖,並至多等待timeout時長
 *
 * @param timeout 超時時長
 * @param unit 時間單位
 *
 * @return 是否獲取成功
 */
fun tryLock(timeout: Long, unit: TimeUnit): Boolean {
    val waitMax = unit.toMillis(timeout)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    if (waitAlready < waitMax) {
        stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
        return true
    }
    return false
}

這裡使用while迴圈不斷嘗試鎖的獲取,並至多嘗試timeout時長,在timeout時間內若成功則同時設定key的過期時間並返回true,否則返回false。其實以上兩種tryLock函式還是有一種可能便是,在呼叫setIfAbsent後、呼叫expire之前若服務出現異常,也將導致該鎖(key)無法釋放(過期或刪除),使得其他執行緒/程序再無法獲取鎖而進入死迴圈,為了避免此問題的產生,我們引入了safetyTime,該引數的作用為,從獲取鎖開始直到safetyTime時長,若仍未獲取成功則認為某一執行緒/程序出現異常導致資料不正確,此時強制獲取,其實現如下

實現帶保護功能的鎖獲取功能


/**
 * 獲取鎖
 */
fun lock() {
    val waitMax = TimeUnit.SECONDS.toMillis(safetyTime)
    var waitAlready: Long = 0

    while (stringRedisTemplate.opsForValue().setIfAbsent(key, value) != true && waitAlready < waitMax) {
        Thread.sleep(waitMillisPer)
        waitAlready += waitMillisPer
    }

    // stringRedisTemplate.expire(key, expire, TimeUnit.SECONDS)
    stringRedisTemplate.opsForValue().set(key, value, expire, TimeUnit.SECONDS)
}

這裡同樣使用while迴圈不斷嘗試鎖的獲取,但至多等待safetyTime時長,最終不論是否成功,均使用SETEX 命令將key設定為當前先執行緒對應的value,並同時設定該key的過期時間。

/**
 * 釋放鎖
 */
fun unLock() {
    stringRedisTemplate.opsForValue()[key]?.let {
        if (it == value) {
            stringRedisTemplate.delete(key)
        }
    }
}

實現鎖的釋放功能

/**
 * 釋放鎖
 */
fun unLock() {
    stringRedisTemplate.opsForValue()[key]?.let {
        if (it == value) {
            stringRedisTemplate.delete(key)
        }
    }
}
鎖的釋放使用DEL命令刪除key,但需要注意的是,釋放鎖時只能釋放本執行緒持有的鎖

若expire設定不合理,如expire設定為10秒,結果在獲取鎖後執行緒運行了20秒,該鎖有可能已經被其他執行緒強制獲取,即該key代表的鎖已經不是當前執行緒所持有的鎖,此時便不能冒然刪除該key,而只能釋放本執行緒持有的鎖。

整合Spring Boot

為了更好的與spring整合,我們建立一個工廠類來輔助建立同步鎖例項:

/**
 * SyncLock同步鎖工廠類
 */
@Component
class SyncLockFactory {
    @Autowired
    private lateinit var stringRedisTemplate: StringRedisTemplate

    private val syncLockMap = mutableMapOf<String, SyncLock>()

    /**
     * 建立SyncLock
     *
     * @param key Redis key
     * @param expire Redis TTL/秒,預設10秒
     * @param safetyTime 安全時間/秒,為了防止程式異常導致死鎖,在此時間後強制拿鎖,預設 expire * 5 秒
     */
    @Synchronized
    fun build(key: String, expire: Long = 10 /* seconds */, safetyTime: Long = expire * 5/* seconds */): SyncLock {
        if (!syncLockMap.containsKey(key)) {
            syncLockMap[key] = SyncLock(key, stringRedisTemplate, expire, safetyTime)
        }
        return syncLockMap[key]!!
    }
}

在spring框架下可以更方便的使用

@Component
class SomeLogic: InitializingBean {
  @Autowired
  lateinit var syncLockFactory: SyncLockFactory
  
  lateinit var syncLock

  override fun afterPropertiesSet() {
    syncLock = syncLockFactory.build("lock:some:name", 10)
  }

  fun someFun() {
    syncLock.lock()
    try {
      // some logic
    } finally {
      syncLock.unlock()
    }
  }
}

註解的實現

藉助spring aop框架,我們可以將SyncLock的使用進一步簡化。

建立註解類

/**
 * 同步鎖註解
 *
 * @property key Redis key
 * @property expire Redis TTL/秒,預設10秒
 */
@Target(AnnotationTarget.FUNCTION)
@Retention(AnnotationRetention.RUNTIME)
annotation class SyncLockable(
        val key: String,
        val expire: Long = 10
)

實現AOP

/**
 * 同步鎖註解處理
 */
@Aspect
@Component
class SyncLockHandle {
    @Autowired
    private lateinit var syncLockFactory: SyncLockFactory

    /**
     * 在方法上執行同步鎖
     */
    @Around("@annotation(syncLockable)")
    fun syncLock(jp: ProceedingJoinPoint, syncLockable: SyncLockable): Any? {
        val lock = syncLockFactory.build(syncLockable.key, syncLockable.expire)

        try {
            lock.lock()
            return jp.proceed()
        } finally {
            lock.unLock()
        }
    }
}

如此一來,我們便可以按照如下方式使用SyncLock。

@Component
class SomeLogic {
    @SyncLockable("lock:some:name", 10)
    fun someFun() {
        // some logic
    }
}
完結。。。