Redis實現的分散式鎖和分散式限流
隨著現在分散式越來越普遍,分散式鎖也十分常用,我的上一篇文章解釋了使用zookeeper實現分散式鎖(傳送門),本次咱們說一下如何用Redis實現分散式鎖和分佈限流。
Redis有個事務鎖,就是如下的命令,這個命令的含義是將一個value設定到一個key中,如果不存在將會賦值並且設定超時時間為30秒,如何這個key已經存在了,則不進行設定。
SET key value NX PX 30000
這個事務鎖很好的解決了兩個單獨的命令,一個設定set key value nx,即該key不存在的話將對其進行設定,另一個是expire key seconds,設定該key的超時時間。我們可以想一下,如果這兩個命令用程式單獨使用會存在什麼問題:
1. 如果一個set key的命令設定了key,然後程式異常了,expire時間沒有設定,那麼這個key會一直鎖住。
2. 如果一個set key時出現了異常,但是直接執行了expire,過了一會兒之後另一個進行set key,還沒怎麼執行程式碼,結果key過期了,別的執行緒也進入了鎖。
還有很多出問題的可能點,這裡我們就不討論了,下面咱們來看看如何實現吧。本文使用的Spring Boot 2.x + Spring data redis + Swagger +lombok + AOP + lua指令碼。在實現的過程中遇到了很多問題,都一一解決實現了。依賴的POM檔案如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.hqs</groupId> <artifactId>distributedlock</artifactId> <version>0.0.1-SNAPSHOT</version> <name>distributedlock</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> <scope>compile</scope> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
使用了兩個lua指令碼,一個用於執行lock,另一個執行unlock。咱們簡單看一下,lock指令碼就是採用Redis事務執行的set nx px命令,其實還有set nx ex命令,這個ex命令是採用秒的方式進行設定過期時間,這個px是採用毫秒的方式設定過期時間。value需要使用一個唯一的值,這個值在解鎖的時候需要判斷是否一致,如果一致的話就進行解鎖。這個也是官方推薦的方法。另外在lock的地方我設定了一個result,用於輸出測試時的結果,這樣就可以結合程式去進行debug了。
local expire = tonumber(ARGV[2]) local ret = redis.call('set', KEYS[1], ARGV[1], 'NX', 'PX', expire) local strret = tostring(ret) --用於檢視結果,我本機獲取鎖成功後程序返回隨機結果"table: 0x7fb4b3700fe0",否則返回"false" redis.call('set', 'result', strret) if strret == 'false' then return false else return true end
redis.call('del', 'result') if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end
來看下程式碼,主要寫了兩個方法,一個是用與鎖另外一個是用於結解鎖。這塊需要注意的是使用RedisTemplate<String, String>,這塊意味著key和value一定都是String的,我在使用的過程中就出現了一些錯誤。首先初始化兩個指令碼到程式中,然後呼叫執行指令碼。
package com.hqs.distributedlock.lock; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.util.Collections; @Slf4j @Component public class DistributedLock { //注意RedisTemplate用的String,String,後續所有用到的key和value都是String的 @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired RedisScript<Boolean> lockScript; @Autowired RedisScript<Long> unlockScript; public Boolean distributedLock(String key, String uuid, String secondsToLock) { Boolean locked = false; try { String millSeconds = String.valueOf(Integer.parseInt(secondsToLock) * 1000); locked =redisTemplate.execute(lockScript, Collections.singletonList(key), uuid, millSeconds); log.info("distributedLock.key{}: - uuid:{}: - timeToLock:{} - locked:{} - millSeconds:{}", key, uuid, secondsToLock, locked, millSeconds); } catch (Exception e) { log.error("error", e); } return locked; } public void distributedUnlock(String key, String uuid) { Long unlocked = redisTemplate.execute(unlockScript, Collections.singletonList(key), uuid); log.info("distributedLock.key{}: - uuid:{}: - unlocked:{}", key, uuid, unlocked); } }
還有一個就是指令碼定義的地方需要注意,返回的結果集一定是Long, Boolean,List, 一個反序列化的值。這塊要注意。
package com.hqs.distributedlock.config; import com.sun.org.apache.xpath.internal.operations.Bool; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.io.ClassPathResource; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.scripting.ScriptSource; import org.springframework.scripting.support.ResourceScriptSource; @Configuration @Slf4j public class BeanConfiguration { /** * The script resultType should be one of * Long, Boolean, List, or a deserialized value type. It can also be null if the script returns * a throw-away status (specifically, OK). * @return */ @Bean public RedisScript<Long> limitScript() { RedisScript redisScript = null; try { ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua")); //log.info("script:{}", scriptSource.getScriptAsString()); redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class); } catch (Exception e) { log.error("error", e); } return redisScript; } @Bean public RedisScript<Boolean> lockScript() { RedisScript<Boolean> redisScript = null; try { ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/lock.lua")); redisScript = RedisScript.of(scriptSource.getScriptAsString(), Boolean.class); } catch (Exception e) { log.error("error" , e); } return redisScript; } @Bean public RedisScript<Long> unlockScript() { RedisScript<Long> redisScript = null; try { ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("/scripts/unlock.lua")); redisScript = RedisScript.of(scriptSource.getScriptAsString(), Long.class); } catch (Exception e) { log.error("error" , e); } return redisScript; } @Bean public RedisScript<Long> limitAnother() { DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(); redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/scripts/limit.lua"))); redisScript.setResultType(Long.class); return redisScript; } }
好了,這塊就寫好了,然後寫好controller類準備測試。
@PostMapping("/distributedLock") @ResponseBody public String distributedLock(String key, String uuid, String secondsToLock, String userId) throws Exception{ //String uuid = UUID.randomUUID().toString(); Boolean locked = false; try { locked = lock.distributedLock(key, uuid, secondsToLock); if(locked) { log.info("userId:{} is locked - uuid:{}", userId, uuid); log.info("do business logic"); TimeUnit.MICROSECONDS.sleep(3000); } else { log.info("userId:{} is not locked - uuid:{}", userId, uuid); } } catch (Exception e) { log.error("error", e); } finally { if(locked) { lock.distributedUnlock(key, uuid); } } return "ok"; }
我也寫了一個測試類,用於測試和輸出結果, 使用100個執行緒,然後鎖的時間設定10秒,controller裡邊需要休眠3秒模擬業務執行。
@Test public void distrubtedLock() { String url = "http://localhost:8080/distributedLock"; String uuid = "abcdefg"; //log.info("uuid:{}", uuid); String key = "redisLock"; String secondsToLive = "10"; for(int i = 0; i < 100; i++) { final int userId = i; new Thread(() -> { MultiValueMap<String, String> params = new LinkedMultiValueMap<>(); params.add("uuid", uuid); params.add("key", key); params.add("secondsToLock", secondsToLive); params.add("userId", String.valueOf(userId)); String result = testRestTemplate.postForObject(url, params, String.class); System.out.println("-------------" + result); } ).start(); } }
獲取鎖的地方就會執行do business logic, 然後會有部分執行緒獲取到鎖並執行業務,執行完業務的就會釋放鎖。
分散式鎖就實現好了,接下來實現分散式限流。先看一下limit的lua指令碼,需要給指令碼傳兩個值,一個值是限流的key,一個值是限流的數量。獲取當前key,然後判斷其值是否為nil,如果為nil的話需要賦值為0,然後進行加1並且和limit進行比對,如果大於limt即返回0,說明限流了,如果小於limit則需要使用Redis的INCRBY key 1,就是將key進行加1命令。並且設定超時時間,超時時間是秒,並且如果有需要的話這個秒也是可以用引數進行設定。
--lua 下標從 1 開始 -- 限流 key local key = KEYS[1] -- 限流大小 local limit = tonumber(ARGV[1]) -- 獲取當前流量大小 local curentLimit = tonumber(redis.call('get', key) or "0") if curentLimit + 1 > limit then -- 達到限流大小 返回 return 0; else -- 沒有達到閾值 value + 1 redis.call("INCRBY", key, 1) -- EXPIRE後邊的單位是秒 redis.call("EXPIRE", key, 10) return curentLimit + 1 end
執行limit的指令碼和執行lock的指令碼類似。
package com.hqs.distributedlock.limit; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import java.util.Collections; /** * @author huangqingshi * @Date 2019-01-17 */ @Slf4j @Component public class DistributedLimit { //注意RedisTemplate用的String,String,後續所有用到的key和value都是String的 @Autowired private RedisTemplate<String, String> redisTemplate; @Autowired RedisScript<Long> limitScript; public Boolean distributedLimit(String key, String limit) { Long id = 0L; try { id = redisTemplate.execute(limitScript, Collections.singletonList(key), limit); log.info("id:{}", id); } catch (Exception e) { log.error("error", e); } if(id == 0L) { return false; } else { return true; } } }
接下來咱們寫一個限流注解,並且設定註解的key和限流的大小:
package com.hqs.distributedlock.annotation; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; /** * 自定義limit註解 * @author huangqingshi * @Date 2019-01-17 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) public @interface DistriLimitAnno { public String limitKey() default "limit"; public int limit() default 1; }
然後對註解進行切面,在切面中判斷是否超過limit,如果超過limit的時候就需要丟擲異常exceeded limit,否則正常執行。
package com.hqs.distributedlock.aspect; import com.hqs.distributedlock.annotation.DistriLimitAnno; import com.hqs.distributedlock.limit.DistributedLimit; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.JoinPoint; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Before; import org.aspectj.lang.annotation.Pointcut; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.stereotype.Component; import java.lang.reflect.Method; /** * @author huangqingshi * @Date 2019-01-17 */ @Slf4j @Aspect @Component @EnableAspectJAutoProxy(proxyTargetClass = true) public class LimitAspect { @Autowired DistributedLimit distributedLimit; @Pointcut("@annotation(com.hqs.distributedlock.annotation.DistriLimitAnno)") public void limit() {}; @Before("limit()") public void beforeLimit(JoinPoint joinPoint) throws Exception { MethodSignature signature = (MethodSignature) joinPoint.getSignature(); Method method = signature.getMethod(); DistriLimitAnno distriLimitAnno = method.getAnnotation(DistriLimitAnno.class); String key = distriLimitAnno.limitKey(); int limit = distriLimitAnno.limit(); Boolean exceededLimit = distributedLimit.distributedLimit(key, String.valueOf(limit)); if(!exceededLimit) { throw new RuntimeException("exceeded limit"); } } }
因為有丟擲異常,這裡我弄了一個統一的controller錯誤處理,如果controller出現Exception的時候都需要走這塊異常。如果是正常的RunTimeException的時候獲取一下,否則將異常獲取一下並且輸出。
package com.hqs.distributedlock.util; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.context.request.NativeWebRequest; import javax.servlet.http.HttpServletRequest; import java.util.HashMap; import java.util.Map; /** * @author huangqingshi * @Date 2019-01-17 * 統一的controller錯誤處理 */ @Slf4j @ControllerAdvice public class UnifiedErrorHandler { private static Map<String, String> res = new HashMap<>(2); @ExceptionHandler(value = Exception.class) @ResponseStatus(HttpStatus.OK) @ResponseBody public Object processException(HttpServletRequest req, Exception e) { res.put("url", req.getRequestURL().toString()); if(e instanceof RuntimeException) { res.put("mess", e.getMessage()); } else { res.put("mess", "sorry error happens"); } return res; } }
好了,接下來將註解寫到自定義的controller上,limit的大小為10,也就是10秒鐘內限制10次訪問。
@PostMapping("/distributedLimit") @ResponseBody @DistriLimitAnno(limitKey="limit", limit = 10) public String distributedLimit(String userId) { log.info(userId); return "ok"; }
也是來一段Test方法來跑,老方式100個執行緒開始跑,只有10次,其他的都是limit。沒有問題。
總結一下,這次實現採用了使用lua指令碼和Redis實現了鎖和限流,但是真實使用的時候還需要多測試,另外如果此次Redis也是採用的單機實現方法,使用叢集的時候可能需要改造一下。關於鎖這塊其實Reids他們自己也實現了 RedLock , java實現的版本Redission。也有很多公司使用了,功能非常強大。各種場景下都用到了。
如有問題,歡迎拍磚~