為什麼要用熔斷

前面我們講過限流保證服務的可用性,不被突如其來的流量打爆。但是兩種情況是限流解決不了的。

  1. 如果我們服務只能處理1000QPS,但是有10wQPS打過來,服務還是會炸。因為拒絕請求也需要成本。
  2. 服務但是io型的,會把mysql,redis,mq等中介軟體打掛。

所以,我們遵循一個思路,可不可以client端在失敗的多的時候就不呼叫了,直接返回錯誤呢?

什麼是熔斷

熔斷器是為了當依賴的服務已經出現故障時,主動阻止對依賴服務的請求。保證自身服務的正常執行不受依賴服務影響,防止雪崩效應。

原始碼分析

原始碼地址

CircuitBreaker 介面

type CircuitBreaker interface {
Allow() error
MarkSuccess()
MarkFailed()
}
  1. Allow()

    • 判斷熔斷器是否允許通過
  2. MarkSuccess()
    • 熔斷器成功的回撥
  3. MarkFailed()
    • 熔斷器失敗的回撥

Group 結構體

type Group struct {
mutex sync.Mutex
val atomic.Value New func() CircuitBreaker
}
  1. mutex

    • 互斥鎖,使val這個map不產生資料競爭
  2. val
    • map,儲存name -> CircuitBreaker
  3. New
    • 生成一個CircuitBreaker

Get方法

// Get .
func (g *Group) Get(name string) CircuitBreaker {
m, ok := g.val.Load().(map[string]CircuitBreaker)
if ok {
breaker, ok := m[name]
if ok {
return breaker // 很具name從val拿出 breaker 如果存在返回
}
}
// slowpath for group don`t have specified name breaker.
g.mutex.Lock()
nm := make(map[string]CircuitBreaker, len(m)+1)
for k, v := range m {
nm[k] = v
}
breaker := g.New()
nm[name] = breaker // 如果不存在 生成一個 並放入map 並返回
g.val.Store(nm)
g.mutex.Unlock()
return breaker
}

Breaker 結構體

// Breaker is a sre CircuitBreaker pattern.
type Breaker struct {
stat window.RollingCounter
r *rand.Rand
// rand.New(...) returns a non thread safe object
randLock sync.Mutex // Reducing the k will make adaptive throttling behave more aggressively,
// Increasing the k will make adaptive throttling behave less aggressively.
k float64
request int64 state int32
}
  1. stat

    • 滑動視窗,記錄成功失敗
  2. r
    • 隨機數
  3. randLock
    • 讀寫鎖
  4. k 成功係數
    • total(總數) = success * k
  5. request 請求數
    • 當總數 < request時,不判斷是否熔斷
  6. state
    • 熔斷器狀態 開啟或者關閉

Allow()方法

// Allow request if error returns nil.
func (b *Breaker) Allow() error {
success, total := b.summary() // 從活動視窗獲取成功數和總數
k := b.k * float64(success) // 根據k成功係數 獲取 // check overflow requests = K * success
if total < b.request || float64(total) < k { // 如果總數<request 或者 總數 < k
if atomic.LoadInt32(&b.state) == StateOpen {
atomic.CompareAndSwapInt32(&b.state, StateOpen, StateClosed) // 如果state是開啟 關閉
}
return nil
}
if atomic.LoadInt32(&b.state) == StateClosed {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) // 如果state是關閉 開啟
}
dr := math.Max(0, (float64(total)-k)/float64(total+1)) // 獲取係數,當k越大 dr越小
drop := b.trueOnProba(dr)
// trueOnProba 獲取水機數
// 返回是否<dr if drop { // 如果是 拒絕請求
return circuitbreaker.ErrNotAllowed
}
return nil
} func (b *Breaker) trueOnProba(proba float64) (truth bool) {
b.randLock.Lock()
truth = b.r.Float64() < proba
b.randLock.Unlock()
return
}

使用trueOnProba的原因是,當熔斷器關閉時,隨機讓一部分請求通過,當success越大,請求的通過的數量就越多。用這些資料成功與否,放入視窗統計,當成功數達到要求時,就可以關閉熔斷器了。

MarkSuccess()以及MarkFailed()方法

// MarkSuccess mark requeest is success.
func (b *Breaker) MarkSuccess() {
b.stat.Add(1) // 成功數+1
} // MarkFailed mark request is failed.
func (b *Breaker) MarkFailed() {
// NOTE: when client reject requets locally, continue add counter let the
// drop ratio higher.
b.stat.Add(0) // 失敗數+1
}

流程圖