1. 程式人生 > >微服務-高併發下介面如何做到優雅的限流

微服務-高併發下介面如何做到優雅的限流

#### 什麼是限流?為什麼要限流 通俗的來講,一根管子往池塘注水,池塘底部有一個口子往外出水,當注水的速度過快時,池塘的水會溢位,此時,我們的做法換根小管子注水或者把注水管子的口堵住一半,這就是限流,限流的目的就是為了防止池塘的水溢位,放在軟體開發中,一臺硬體的CPU和記憶體總歸是有限的,能處理的請求量是有一個閾值的,就跟人的精力一樣是有限的,超過這個限度系統就會異常,人就會生病。 明白了什麼是限流,為什麼要限流,那麼網際網路公司在各種業務大促中,為了保證系統不被流量壓垮,會在系統流量到達設定的閾值時,拒絕後續的流量,限流會導致部分時間段(這個時間段是毫秒級的)系統不可用,不是完全不可用,一般衡量系統處理能力的指標是每秒的QPS或者TPS,假設系統每秒的閾值是1000,當這一秒內有1001個請求訪問時,那最後一個請求就會被限流(拒絕處理) #### 限流常用的幾種演算法 在具體開發中,尤其是RPC框架中,限流是RPC的標配,一般業務開發人員很少做限流演算法開發,這也導致大部分開發人員不是很瞭解限流演算法的原理,這裡分享幾種常用的限流演算法,指出他們的優缺點,並通過程式碼實現他們。 ##### 計數器限流 你要是仔細看了上面的內容,就會發現上面舉例的每秒閾值1000的那個例子就是一個計數器限流的思想,計數器限流的本質是一定時間內,訪問量到達設定的限制後,在這個時間段沒有過去之前,超過閾值的訪問量拒絕處理,舉個例,你告訴老闆我一個小時只處理10件事,這是你的處理能力,但領導半個小內就斷續斷續給你分派了10件事,這時已經到達你的極限了,在後面的半個小時內,領導再派出的活你是拒絕處理的,直到下一個小時的時間段開始。 首先我們定義一個計數限流的結構體,結構體中至少滿足3個欄位,閾值,單位時間,當前請求數,結構體如下 ```go type CountLimiter struct { count int64 //閾值 unitTime time.Duration //單位時間(每秒或者每分鐘) index *atomic.Int64 //計數累加 } ``` 我們需要一個為這個結構體提供建立物件的方法,同時初始化各個欄位,其中有些欄位是可以從外部當作此引數傳入的,完成之後同時啟動一個定時器。 ```go //建立一個計數器限流結構體 func NewCountLimiter(count int64, unitTime time.Duration) *CountLimiter { countLimiter := &CountLimiter{ count: count, unitTime: unitTime, index: atomic.NewInt64(0), } //開啟一個新的協程 go timer(countLimiter) return countLimiter } ``` 這個定時器幹嘛呢,需要在經過單位時間後把當前請求數清0,從而開啟下一個單位時間內的請求統計。 ```go //相當於一個定時器,每經過過單位時間後把index置為0,重新累加 func timer(limiter *CountLimiter) { ticker := time.NewTicker(limiter.unitTime) for { <-ticker.C limiter.index.Store(0) } } ``` 最後最重要的是這個計數器限流物件需要提高一個判斷當前請求是否限流的方法,返回值應該是一個bool值,true代表請求通過,false代表請求被限流。 ```go //判斷是否允許請求通過 func (cl *CountLimiter) IsAllow() bool { //如果index累加已經超過閾值,不允許請求通過 if cl.index.Load() >= cl.count { return false } //index加1 cl.index.Add(1) return true } ``` 這樣一個計數器限流就實現完成了,有沒有什麼問題呢?還是前面舉的例子,每秒1000的閾值,假設在前100毫秒內,計數器index就累加到1000了,那麼剩餘的900毫秒內就無法處理任何請求了,這種限流很容易造成熱點,再來分析一種情況,在一秒內最後100毫秒時間內突發請求800個,這時進入下一個單位時間內,在這個單位時間的前100毫秒內,突發請求700個,這時你會發現200毫秒處理了請求1500個,好像限流不起作用了,是的,這是一個邊界問題,是計數器限流的缺點。,如下圖,黃線是第一個單位時間內,紅線是第二個單位時間內。 ![](https://img2020.cnblogs.com/blog/706455/202003/706455-20200316140209535-1919628938.png) ##### 令牌桶限流 令牌桶限流-顧名思義,手中握有令牌才能通過,系統只處理含有令牌的請求,如果一個請求獲取不到令牌,系統拒絕處理,再通俗一點,醫院每天接待病人是有限的,只有掛了號才能看病,掛不上號,對不起,醫院不給你看病。 令牌桶,有一個固定大小的容器,每隔一定的時間往桶內放入固定數量的定牌,當請求到來時去容器內先獲取令牌,拿到了,開始處理,拿不到拒絕處理(或者短暫的等待,再此獲取還是獲取不到就放棄) 首先我們定義一個令牌桶結構體,根據令牌桶演算法我們結構體中欄位至少需要有桶容量,令牌容器,時間間隔,初始令牌數核心欄位,程式碼如下: ```go type TokenBucket struct { interval time.Duration //時間間隔 ticker *time.Ticker //定時器 cap int // 桶容量 avail int //桶內一開始令牌數 tokenArray []int //儲存令牌的陣列 intervalInToken int //時間間隔內放入令牌的個數 index int //陣列放入令牌的下標處 mutex sync.Mutex } ``` 同樣的,我們需要提供一個建立令牌桶物件的方法,並且初始化所有欄位的值,一些欄位需要根據外部傳參來決定,同時開啟一個新的協程定時放入一定數量的令牌 ```go //建立一個令牌通,入參為令牌桶的容量 func NewTokenBucket(cap int) *TokenBucket { if cap < 100{ return nil } tokenBucket := &TokenBucket{ interval: time.Second * 1, cap: cap, avail: 100, tokenArray: make([]int, cap, cap), intervalInToken: 100, index: 0, mutex: sync.Mutex{}, } //開啟一個協程往容器內定時放入令牌 go adjustTokenDaemon(tokenBucket) return tokenBucket } ``` 這個方法的核心是初始化令牌桶的初始數量,然後啟動定時器,定時呼叫放入令牌方法 ```go //調整令牌桶令牌的方法 func adjustTokenDaemon(tokenBucket *TokenBucket) { //如果桶內一開始的令牌小於初始令牌,開始放入初始令牌 for tokenBucket.index < tokenBucket.avail { tokenBucket.tokenArray[tokenBucket.index] = 1 tokenBucket.index++ } tokenBucket.ticker = time.NewTicker(tokenBucket.interval) go func(t *time.Ticker) { for { <-t.C putToken(tokenBucket) } }(tokenBucket.ticker) } ``` 往令牌容器中新增令牌,記得加鎖,因為涉及到多協程操作,一個放令牌,一個取令牌,所以可能存在併發安全情況。 ```go //放入令牌 func putToken(tokenBucket *TokenBucket) { tokenBucket.mutex.Lock() for i := 0; i < tokenBucket.intervalInToken; i++ { //容器滿了,無法放入令牌了,終止 if tokenBucket.index > tokenBucket.cap-1 { break } tokenBucket.tokenArray[tokenBucket.index] = 1 tokenBucket.index++ } defer tokenBucket.mutex.Unlock() } ``` 最後當有請求到來時,我們從令牌桶內取出一個令牌,如果取出成功,則代表請求通過,否則,請求失敗,相當於限流了。 ```go //從令牌桶彈出一個令牌,如果令牌通有令牌,返回true,否則返回false func (tokenBucket *TokenBucket) PopToken() bool { defer tokenBucket.mutex.Unlock() tokenBucket.mutex.Lock() if tokenBucket.index <= 0 { return false } tokenBucket.tokenArray[tokenBucket.index-1] = 0 tokenBucket.index-- return true } ``` 上面程式碼就是令牌桶的限流的實現程式碼了,相對與計數器限流會比較複雜一些,令牌桶限流能夠更方便的調整放入令牌的頻率和每次獲取令牌的個數,甚至可以用令牌桶思想來限制閘道器入口流量。 ![](https://img2020.cnblogs.com/blog/706455/202003/706455-20200316140156379-1147243903.png) ##### 漏斗限流 漏斗限流,意思是說在一個漏斗容器中,當請求來臨時就從漏斗頂部放入,漏斗底部會以一定的頻率流出,當放入的速度大於流出的速度時,漏斗的空間會逐漸減少為0,這時請求會被拒絕,其實就是上面開始時池塘流水的例子。流入速率是隨機的,流出速率是固定的,當漏斗滿了之後,其實到了一個平滑的階段,因為流出是固定的,所以你流入也是固定的,相當於請求是勻速通過的 ![](https://img2020.cnblogs.com/blog/706455/202003/706455-20200316140141031-1112261516.png) 首先定義漏斗限流的結構體,根據漏斗限流原理,需要欄位流出速率,漏斗容量,定時器核心欄位,這裡容量不用具化的資料結構來表示了,採用雙指標法,一個流入的指標,一個流出的指標,大家仔細看看設計。 ```go //漏斗限流 type FunnelRateLimiter struct { interval time.Duration //時間間隔 cap int //漏斗容量 rate int //漏斗流出速率 每秒流多少 head int //放入水的指標 tail int //漏水的指標 ticker *time.Ticker //定時器 } ``` 建立漏斗限流的物件,並且初始化各個欄位,同時開啟定時器,模擬漏斗流水操作。 ```go //建立漏斗限流結構體 func NewFunnelRateLimiter(cap int, rate int) *FunnelRateLimiter { limiter := &FunnelRateLimiter{ interval: time.Second * 1, cap: cap, rate: rate, head: 0, tail: 0, } go leakRate(limiter) return limiter } ``` 真實的漏斗流水,看流入的總容量減去流出的總容量是否大於流出速率,漏斗限流的核心是保證漏斗儘量空著,這樣請求才能流入進來,所以大於的話就往出流走固定速率的請求,否則就把漏斗清空。 ```go //模擬漏斗以一定的流速漏水 func leakRate(limiter *FunnelRateLimiter) { limiter.ticker = time.NewTicker(limiter.interval) for { <-limiter.ticker.C //根本沒有流量,不需要漏(就是漏斗裡沒有請求,無法流出) if limiter.tail >= limiter.head { continue } //看漏斗裡的剩餘的請求是否大於流出的請求,如果大於,就流出這麼多 //舉個例子,每秒流出100,首先得保證漏斗裡有100個 if (limiter.head - limiter.tail) > limiter.rate { limiter.tail = limiter.tail + limiter.rate } else { //否則流出所有(漏斗裡只有70個,就把70個流完) limiter.tail = limiter.head } } } ``` 最後必須有一個判斷請求是否允許通過的方法,實則就是判斷漏斗容量是否還有空位,也就判斷流入總量減去流出總量是否大於總的容量,大於的話代表漏斗已經裝不下了,必須限流,否則,請求通過 ```go //是否允許請求通過(主要看漏斗是否滿了) func (limiter *FunnelRateLimiter) IsAllow() bool { if limiter.head-limiter.tail >= limiter.cap { //說明漏斗滿了 return false } limiter.head++ return true } ``` 我們程式碼實現採用了雙變數head,tail,開始都是0,每當有流量進入時,head變數加1,每過一定時間節點tail進行自加rate,當head的值大於減去tail大於cap,就代表漏斗滿了,否則漏斗可以處理請求,通俗講就相當於一個人(head)在前面跑,另一個人(tail)在後面追,當head跑的快時,他們之間的差距有可能達到cap,但是記住,tail不能追上head,最多持平,都是0. #### RPC限流到底怎麼做的? 微服務盛行的時代,一個Application可能對付釋出多個服務(A,B兩個服務),一個服務可能存在多個方法(A1,A2,B1,B2),而且一個Application通常會部署多臺機器,我們通常的限流可能回對某個服務限流,也可能對某個服務下面的方法限流,一般情況下RPC的控制檯中支援限流的視覺化,可配置化。 ![](https://img2020.cnblogs.com/blog/706455/202003/706455-20200316140109588-1979821962.png) 從上圖來看,瀏覽器觸發配置中心的限流規則變更,配置中心通知監聽了該規則的伺服器,這個時候可能是客戶端限流,也可能是服務端限流,取決於瀏覽器上的操作,假設是服務端限流,那麼每個服務端啟動一個限流演算法(可能是上面演算法中的任意一個),這個時候是每臺機器都在限流,相當於單機限流,各不影響。 **第一個問題**:我們介紹了三種限流演算法,比如計數器限流,會開啟一個協程定時檢測重置計數變數為0,如果一個應用有很多個服務,是否意味著要開啟很多個協程,那麼有人說協程輕量級的,沒事,但要是Java中的執行緒呢,怎麼解決,思路是延遲重置,服務開始時,設定計數閾值,同時記錄當前時間,每當請求來臨時,我們只允許在當前時間段內並且計數變數沒有到達閾值的請求通過,否則拒絕,當過了當前時間段,我們重置計數變數,這樣是不是就不用開啟新的協程了,優化完的程式碼如下 ```go //計數器限流,不用新開協程, 每次判斷時, // 先看當前時間和上次時間差是否大於1秒,如果大於則計數器清零0,重新開始,如果小與1秒,則判斷計數器到達閾值,返回false,否則返回true type CountLimiterNew struct { count int64 lastTime int64 index *atomic.Int64 nano int64 } func NewCountLimiterNew(count int64, lastTime int64) *CountLimiterNew { countLimiterNew := &CountLimiterNew{ count: count, lastTime: time.Now().UnixNano(), index: atomic.NewInt64(0), nano: 1000 * 1000 * 1000, } return countLimiterNew } func (cl *CountLimiterNew) IsAllowNew() bool { //已經進入到下一秒中了 if time.Now().UnixNano()-cl.lastTime > cl.nano { cl.lastTime = time.Now().UnixNano() cl.index.Store(1) return true } //當前這一秒鐘計數器到達閾值了,進行限流 if cl.index.Load() > cl.count { return false } //計數器加1 cl.index.Add(1) return true } ``` **第二個問題**:上面我們假設是服務端限流,那麼到底該用服務端限流還是客戶端限流,我們看這樣一個示例,有一個A服務,部署了10臺機器(相當於10個服務提供者),但呼叫A服務的有100個消費者(客戶端),假設我們每臺機器的閾值是1000,你怎麼分給100個客戶端呢?你也不瞭解他們的呼叫量,就比較麻煩,所以一般情況下都是在服務端限流,因為你自己的服務你最清楚。什麼時候用客戶端限流呢?當你明確的知道某一個客戶端呼叫量非常大,影響了其它客戶端的使用,這時你可以指定該客戶端的限流規則 **第三個問題**:我們上面提到的都是單機限流,還是我們的A服務,部署了10臺,但有一臺機器是1核2G,其餘是4核8G的,這時限流就麻煩了,不能用統一標準限流了,那麼在分散式應用程式中,有沒有分散式限流的方法呢?這裡提供幾種思路: 1. Nginx 層限流,一般http服務需要經過閘道器,Nginx層相當於閘道器限流 2. Redis限流,redis是執行緒安全的,redis支援LUA指令碼 3. 開源元件Hystrix、resilience4j、Sentinel 分散式限流可以單獨寫好幾篇文章了,後面會單開幾篇文章寫分散式限流的 #### 認真思考,仔細總結 除了上面的三種限流演算法,還有很多它們的變種實現,比如滑動時間視窗演算法,思考一下該如何實現呢?可以評論區裡討論

![](https://img2018.cnblogs.com/blog/706455/201909/706455-20190911210708072-261554801.jpg)