1. 程式人生 > >Spring Cloud限流詳解

Spring Cloud限流詳解

在高併發的應用中,限流往往是一個繞不開的話題。本文詳細探討在Spring Cloud中如何實現限流。

Zuul 上實現限流是個不錯的選擇,只需要編寫一個過濾器就可以了,關鍵在於如何實現限流的演算法。常見的限流演算法有漏桶演算法以及令牌桶演算法。這個可參考 https://www.cnblogs.com/LBSer/p/4083131.html ,寫得通俗易懂,你值得擁有,我就不拽文了。

Google Guava 為我們提供了限流工具類RateLimiter ,於是乎,我們可以擼程式碼了。

程式碼示例

@Componentpublic classRateLimitZuulFilterextends
ZuulFilter
{ private final RateLimiter rateLimiter = RateLimiter.create(1000.0); @Override public String filterType(){ return FilterConstants.PRE_TYPE; } @Override publicintfilterOrder(){ return Ordered.HIGHEST_PRECEDENCE; } @Override publicbooleanshouldFilter(){ // 這裡可以考慮弄個限流開啟的開關,開啟限流返回true
,關閉限流返回false,你懂的。
return true; } @Override public Object run(){ try { RequestContext currentContext = RequestContext.getCurrentContext(); HttpServletResponse response = currentContext.getResponse(); if (!rateLimiter.tryAcquire()) { HttpStatus httpStatus = HttpStatus.TOO_MANY_REQUESTS; response
.setContentType(MediaType.TEXT_PLAIN_VALUE); response.setStatus(httpStatus.value()); response.getWriter().append(httpStatus.getReasonPhrase()); currentContext.setSendZuulResponse(false); throw new ZuulException( httpStatus.getReasonPhrase(), httpStatus.value(), httpStatus.getReasonPhrase() ); } } catch (Exception e) { ReflectionUtils.rethrowRuntimeException(e); } return null; }}

如上,我們編寫了一個pre 型別的過濾器。對Zuul過濾器有疑問的可參考我的部落格:

在過濾器中,我們使用Guava RateLimiter 實現限流,如果已經達到最大流量,就拋異常。

分散式場景下的限流

以上單節點Zuul下的限流,但在生產中,我們往往會有多個Zuul例項。對於這種場景如何限流呢?我們可以藉助Redis實現限流。

使用redis實現,儲存兩個key,一個用於計時,一個用於計數。請求每呼叫一次,計數器增加1,若在計時器時間內計數器未超過閾值,則可以處理任務

if(!cacheDao.hasKey(TIME_KEY)) { cacheDao.putToValue(TIME_KEY, 0, 1, TimeUnit.SECONDS);} if(cacheDao.hasKey(TIME_KEY) && cacheDao.incrBy(COUNTER_KEY, 1) > 400) { // 拋個異常什麼的}

實現微服務級別的限流

一些場景下,我們可能還需要實現微服務粒度的限流。此時可以有兩種方案:

方式一:在微服務本身實現限流。

和在Zuul上實現限流類似,只需編寫一個過濾器或者攔截器即可,比較簡單,不作贅述。個人不太喜歡這種方式,因為每個微服務都得編碼,感覺成本很高啊。

加班那麼多,作為程式猿的我們,應該學會偷懶,這樣才可能有時間孝順父母、抱老婆、逗兒子、遛狗養鳥、聊天打屁、追求人生信仰。好了不扯淡了,看方法二吧。

方法二:在Zuul上實現微服務粒度的限流。

在講解之前,我們不妨模擬兩個路由規則,兩種路由規則分別代表Zuul的兩種路由方式。

zuul: routes: microservice-provider-user: /user/** user2: url: http://localhost:8000/ path: /user2/**

如配置所示,在這裡,我們定義了兩個路由規則,microservice-provider-user 以及user2 ,其中microservice-provider-user 這個路由規則使用到Ribbon + Hystrix,走的是RibbonRoutingFilter ;而user2 這個路由用不上Ribbon也用不上Hystrix,走的是SipleRoutingFilter 。如果你搞不清楚這點,請參閱我的部落格:

搞清楚這點之後,我們就可以擼程式碼了:

@Componentpublic classRateLimitZuulFilterextendsZuulFilter{ private Map<String, RateLimiter> map = Maps.newConcurrentMap(); @Override public String filterType(){ return FilterConstants.PRE_TYPE; } @Override publicintfilterOrder(){ // 這邊的order一定要大於org.springframework.cloud.netflix.zuul.filters.pre.PreDecorationFilter的order // 也就是要大於5 // 否則,RequestContext.getCurrentContext()裡拿不到serviceId等資料。 return Ordered.LOWEST_PRECEDENCE; } @Override publicbooleanshouldFilter(){ // 這裡可以考慮弄個限流開啟的開關,開啟限流返回true,關閉限流返回false,你懂的。 return true; } @Override public Object run(){ try { RequestContext context = RequestContext.getCurrentContext(); HttpServletResponse response = context.getResponse(); String key = null; // 對於service格式的路由,走RibbonRoutingFilter String serviceId = (String) context.get(SERVICE_ID_KEY); if (serviceId != null) { key = serviceId; map.putIfAbsent(serviceId, RateLimiter.create(1000.0)); } // 如果壓根不走RibbonRoutingFilter,則認為是URL格式的路由 else { // 對於URL格式的路由,走SimpleHostRoutingFilter URL routeHost = context.getRouteHost(); if (routeHost != null) { String url = routeHost.toString(); key = url; map.putIfAbsent(url, RateLimiter.create(2000.0)); } } RateLimiter rateLimiter = map.get(key); if (!rateLimiter.tryAcquire()) { HttpStatus httpStatus = HttpStatus.TOO_MANY_REQUESTS; response.setContentType(MediaType.TEXT_PLAIN_VALUE); response.setStatus(httpStatus.value()); response.getWriter().append(httpStatus.getReasonPhrase()); context.setSendZuulResponse(false); throw new ZuulException( httpStatus.getReasonPhrase(), httpStatus.value(), httpStatus.getReasonPhrase() ); } } catch (Exception e) { ReflectionUtils.rethrowRuntimeException(e); } return null; }}

簡單講解一下這段程式碼:

對於microservice-provider-user 這個路由,我們可以用context.get(SERVICE_ID_KEY); 獲取到serviceId,獲取出來就是microservice-provider-user ;

而對於user2 這個路由,我們使用context.get(SERVICE_ID_KEY); 獲得是null,但是呢,可以用context.getRouteHost() 獲得路由到的地址,獲取出來就是http://localhost:8000/ 。接下來的事情,你們懂的。

改進與提升

實際專案中,除以上實現的限流方式,還可能會:

一、在上文的基礎上,增加配置項,控制每個路由的限流指標,並實現動態重新整理,從而實現更加靈活的管理

二、基於CPU、記憶體、資料庫等壓力限流(感謝平安常浩智)提出。

下面,筆者藉助Spring Boot Actuator提供的Metrics 能力進行實現基於記憶體壓力的限流——當可用記憶體低於某個閾值就開啟限流,否則不開啟限流。

@Componentpublic classRateLimitZuulFilterextendsZuulFilter{ @Autowired private SystemPublicMetrics systemPublicMetrics; @Override publicbooleanshouldFilter(){ // 這裡可以考慮弄個限流開啟的開關,開啟限流返回true,關閉限流返回false,你懂的。 Collection<Metric<?>> metrics = systemPublicMetrics.metrics(); Optional<Metric<?>> freeMemoryMetric = metrics.stream() .filter(t -> "mem.free".equals(t.getName())) .findFirst(); // 如果不存在這個指標,穩妥起見,返回true,開啟限流 if (!freeMemoryMetric.isPresent()) { return true; } long freeMemory = freeMemoryMetric.get() .getValue() .longValue(); // 如果可用記憶體小於1000000KB,開啟流控 return freeMemory < 1000000L; } // 省略其他方法}

三、實現不同維度的限流,例如:

  • 對請求的目標URL進行限流(例如:某個URL每分鐘只允許呼叫多少次)
  • 對客戶端的訪問IP進行限流(例如:某個IP每分鐘只允許請求多少次)
  • 對某些特定使用者或者使用者組進行限流(例如:非VIP使用者限制每分鐘只允許呼叫100次某個API等)
  • 多維度混合的限流。此時,就需要實現一些限流規則的編排機制。與、或、非等關係。

參考文件