1. 程式人生 > >hystrix-go 原始碼分析

hystrix-go 原始碼分析

閱讀原始碼的過程,就像是在像武俠小說裡閱讀武功祕籍一樣,分析高手的一招一式,提煉出精髓,來增強自己的內力。
之前的帖子說了一下微服務的雪崩效應和常見的解決方案,太水,沒有上程式碼怎麼叫解決方案。github上有很多開源的庫來解決雪崩問題,比較出名的是Netflix的開源庫hystrix。集流量控制熔斷容錯等於一身的java語言的庫。今天分析的原始碼庫是 hystrix-go,他是hystrix的的go語言版,應該是說簡化版本,用很少的程式碼量實現了主要功能。很推薦朋友們有時間讀一讀。

使用簡單

hystrix的使用是非常簡單的,同步執行,直接呼叫Do方法。

err := hystrix.Do("my_command", func() error {
   // talk to other services
   return nil
}, func(err error) error {
   // do this when services are down
   return nil
})

非同步執行Go方法,內部實現是啟動了一個gorouting,如果想得到自定義方法的資料,需要你傳channel來處理資料,或者輸出。返回的error也是一個channel

 output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
    // talk to other services
    output <- true
    return nil
}, nil)

select {
case out := <-output:
    // success
case err := <-errors:
    // failure

大概的執行流程圖

其實方法DoGo方法內部都是呼叫了hystrix.GoC方法,只是Do方法處理了非同步的過程

func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
    done := make(chan struct{}, 1)
    r := func(ctx context.Context) error {
        err := run(ctx)
        if err != nil {
            return err
        }
        done <- struct{}{}
        return nil
    }
    f := func(ctx context.Context, e error) error {
        err := fallback(ctx, e)
        if err != nil {
            return err
        }
        done <- struct{}{}
        return nil
    }
    var errChan chan error
    if fallback == nil {
        errChan = GoC(ctx, name, r, nil)
    } else {
        errChan = GoC(ctx, name, r, f)
    }

    select {
    case <-done:
        return nil
    case err := <-errChan:
        return err
    }
}

自定義Command配置

在呼叫Do Go等方法之前我們可以先自定義一些配置

    hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
        Timeout:                int(time.Second * 3),
        MaxConcurrentRequests:  100,
        SleepWindow:            int(time.Second * 5),
        RequestVolumeThreshold: 30,
        ErrorPercentThreshold: 50,
    })

    err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
        // ...
        return nil
    }, func(i context.Context, e error) error {
        // ...
        return e
    })

我大要說了一下CommandConfig第個欄位的意義:

  • Timeout: 執行command的超時時間。預設時間是1000毫秒
  • MaxConcurrentRequests:command的最大併發量 預設值是10
  • SleepWindow:當熔斷器被開啟後,SleepWindow的時間就是控制過多久後去嘗試服務是否可用了。預設值是5000毫秒
  • RequestVolumeThreshold: 一個統計視窗10秒內請求數量。達到這個請求數量後才去判斷是否要開啟熔斷。預設值是20
  • ErrorPercentThreshold:錯誤百分比,請求數量大於等於RequestVolumeThreshold並且錯誤率到達這個百分比後就會啟動熔斷 預設值是50

當然如果不配置他們,會使用預設值

講完了怎麼用,接下來就是分析原始碼了。我是從下層到上層的順序分析程式碼和執行流程

統計控制器

每一個Command都會有一個預設統計控制器,當然也可以新增多個自定義的控制器。
預設的統計控制器DefaultMetricCollector儲存著熔斷器的所有狀態,呼叫次數失敗次數被拒絕次數等等

type DefaultMetricCollector struct {
    mutex *sync.RWMutex

    numRequests *rolling.Number
    errors      *rolling.Number

    successes               *rolling.Number
    failures                *rolling.Number
    rejects                 *rolling.Number
    shortCircuits           *rolling.Number
    timeouts                *rolling.Number
    contextCanceled         *rolling.Number
    contextDeadlineExceeded *rolling.Number

    fallbackSuccesses *rolling.Number
    fallbackFailures  *rolling.Number
    totalDuration     *rolling.Timing
    runDuration       *rolling.Timing
}

最主要的還是要看一下rolling.Numberrolling.Number才是狀態最終儲存的地方
Number儲存了10秒內的Buckets資料資訊,每一個Bucket的統計時長為1秒

type Number struct {
    Buckets map[int64]*numberBucket
    Mutex   *sync.RWMutex
}

type numberBucket struct {
    Value float64
}

字典欄位Buckets map[int64]*numberBucket 中的Key儲存的是當前時間
可能你會好奇Number是如何保證只儲存10秒內的資料的。每一次對熔斷器的狀態進行修改時,Number都要先得到當前的時間(秒級)的Bucket不存在則建立。

func (r *Number) getCurrentBucket() *numberBucket {
    now := time.Now().Unix()
    var bucket *numberBucket
    var ok bool

    if bucket, ok = r.Buckets[now]; !ok {
        bucket = &numberBucket{}
        r.Buckets[now] = bucket
    }

    return bucket
}

修改完後去掉10秒外的資料

func (r *Number) removeOldBuckets() {
    now := time.Now().Unix() - 10

    for timestamp := range r.Buckets {
        // TODO: configurable rolling window
        if timestamp <= now {
            delete(r.Buckets, timestamp)
        }
    }
}

比如Increment方法,先得到Bucket再刪除舊的資料

func (r *Number) Increment(i float64) {
    if i == 0 {
        return
    }

    r.Mutex.Lock()
    defer r.Mutex.Unlock()

    b := r.getCurrentBucket()
    b.Value += i
    r.removeOldBuckets()
}

統計控制器是最基層和最重要的一個實現,上層所有的執行判斷都是基於他的資料進行邏輯處理的

上報執行狀態資訊

斷路器-->執行-->上報執行狀態資訊-->儲存到相應的Buckets

每一次斷路器邏輯的執行都會上報執行過程中的狀態,

// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
    // ...
    circuit.mutex.RLock()
    o := circuit.open
    circuit.mutex.RUnlock()
    if eventTypes[0] == "success" && o {
        circuit.setClose()
    }
    var concurrencyInUse float64
    if circuit.executorPool.Max > 0 {
        concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
    }
    select {
    case circuit.metrics.Updates <- &commandExecution{
        Types:            eventTypes,
        Start:            start,
        RunDuration:      runDuration,
        ConcurrencyInUse: concurrencyInUse,
    }:
    default:
        return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
    }

    return nil
}

circuit.metrics.Updates 這個通道就是處理上報資訊的,上報執行狀態自信的結構是metricExchange,結構體很簡單隻有4個欄位。要的就是

  • channel欄位Updates 他是一個有bufferchannel預設的數量是2000個,所有的狀態資訊都在他裡面
  • metricCollectors欄位,就是儲存的具體的這個command執行過程中的各種資訊
type metricExchange struct {
    Name    string
    Updates chan *commandExecution
    Mutex   *sync.RWMutex

    metricCollectors []metricCollector.MetricCollector
}

type commandExecution struct {
    Types            []string      `json:"types"`
    Start            time.Time     `json:"start_time"`
    RunDuration      time.Duration `json:"run_duration"`
    ConcurrencyInUse float64       `json:"concurrency_inuse"`
}

func newMetricExchange(name string) *metricExchange {
    m := &metricExchange{}
    m.Name = name

    m.Updates = make(chan *commandExecution, 2000)
    m.Mutex = &sync.RWMutex{}
    m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
    m.Reset()

    go m.Monitor()

    return m
}

在執行newMetricExchange的時候會啟動一個協程 go m.Monitor()去監控Updates的資料,然後上報給metricCollectors 儲存執行的資訊資料比如前面提到的呼叫次數失敗次數被拒絕次數等等

func (m *metricExchange) Monitor() {
    for update := range m.Updates {
        // we only grab a read lock to make sure Reset() isn't changing the numbers.
        m.Mutex.RLock()

        totalDuration := time.Since(update.Start)
        wg := &sync.WaitGroup{}
        for _, collector := range m.metricCollectors {
            wg.Add(1)
            go m.IncrementMetrics(wg, collector, update, totalDuration)
        }
        wg.Wait()

        m.Mutex.RUnlock()
    }
}

更新呼叫的是go m.IncrementMetrics(wg, collector, update, totalDuration),裡面判斷了他的狀態

func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
    // granular metrics
    r := metricCollector.MetricResult{
        Attempts:         1,
        TotalDuration:    totalDuration,
        RunDuration:      update.RunDuration,
        ConcurrencyInUse: update.ConcurrencyInUse,
    }
    switch update.Types[0] {
    case "success":
        r.Successes = 1
    case "failure":
        r.Failures = 1
        r.Errors = 1
    case "rejected":
        r.Rejects = 1
        r.Errors = 1
    // ...
    }
    // ...
    collector.Update(r)
    wg.Done()
}

流量控制

hystrix-go對流量控制的程式碼是很簡單的。用了一個簡單的令牌演算法,能得到令牌的就可以執行後繼的工作,執行完後要返還令牌。得不到令牌就拒絕,拒絕後呼叫使用者設定的callback方法,如果沒有設定就不執行。
結構體executorPool就是hystrix-go 流量控制的具體實現。欄位Max就是每秒最大的併發值。

type executorPool struct {
    Name    string
    Metrics *poolMetrics
    Max     int
    Tickets chan *struct{}
}

在建立executorPool的時候,會根據Max值來建立令牌。Max值如果沒有設定會使用預設值10

func newExecutorPool(name string) *executorPool {
    p := &executorPool{}
    p.Name = name
    p.Metrics = newPoolMetrics(name)
    p.Max = getSettings(name).MaxConcurrentRequests

    p.Tickets = make(chan *struct{}, p.Max)
    for i := 0; i < p.Max; i++ {
        p.Tickets <- &struct{}{}
    }

    return p
}

流量控制上報狀態

注意一下欄位 Metrics 他用於統計執行數量,比如:執行的總數量,最大的併發數 具體的程式碼就不貼上來了。這個數量也可以顯露出,供視覺化程式直觀的表現出來。

令牌使用完後是需要返還的,返回的時候才會做上面所說的統計工作。

func (p *executorPool) Return(ticket *struct{}) {
    if ticket == nil {
        return
    }

    p.Metrics.Updates <- poolMetricsUpdate{
        activeCount: p.ActiveCount(),
    }
    p.Tickets <- ticket
}

func (p *executorPool) ActiveCount() int {
    return p.Max - len(p.Tickets)
}

一次Command的執行的流程

上面把 統計控制器流量控制上報執行狀態講完了,主要的實現也就講的差不多了。最後就是串一次command的執行都經歷了啥:

 err := hystrix.Do("my_command", func() error {
    // talk to other services
    return nil
}, func(err error) error {
    // do this when services are down
    return nil
})

hystrix在執行一次command的前面也有提到過會呼叫GoC方法,下面我把程式碼貼出來來,篇幅問題去掉了一些程式碼,主要邏輯都在。就是在判斷斷路器是否已開啟得到Ticket得不到就限流,執行我們自己的的方法判斷context是否Done或者執行是否超時
當然,每次執行結果都要上報執行狀態,最後要返還Ticket

func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
    cmd := &command{
        run:      run,
        fallback: fallback,
        start:    time.Now(),
        errChan:  make(chan error, 1),
        finished: make(chan bool, 1),
    }
    //得到斷路器,不存在則建立
    circuit, _, err := GetCircuit(name)
    if err != nil {
        cmd.errChan <- err
        return cmd.errChan
    }
    //...
    // 返還ticket
    returnTicket := func() {
        // ...
        cmd.circuit.executorPool.Return(cmd.ticket)
    }
    // 上報執行狀態
    reportAllEvent := func() {
        err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
        // ...
    }
    go func() {
        defer func() { cmd.finished <- true }()
        // 檢視斷路器是否已開啟
        if !cmd.circuit.AllowRequest() {
            // ...
            returnOnce.Do(func() {
                returnTicket()
                cmd.errorWithFallback(ctx, ErrCircuitOpen)
                reportAllEvent()
            })
            return
        }
        // ...
        // 獲取ticket 如果得不到就限流
        select {
        case cmd.ticket = <-circuit.executorPool.Tickets:
            ticketChecked = true
            ticketCond.Signal()
            cmd.Unlock()
        default:
            // ...
            returnOnce.Do(func() {
                returnTicket()
                cmd.errorWithFallback(ctx, ErrMaxConcurrency)
                reportAllEvent()
            })
            return
        }
        // 執行我們自已的方法,並上報執行資訊
        returnOnce.Do(func() {
            defer reportAllEvent()
            cmd.runDuration = time.Since(runStart)
            returnTicket()
            if runErr != nil {
                cmd.errorWithFallback(ctx, runErr)
                return
            }
            cmd.reportEvent("success")
        })
    }()
    // 等待context是否被結束,或執行者超時,並上報
    go func() {
        timer := time.NewTimer(getSettings(name).Timeout)
        defer timer.Stop()

        select {
        case <-cmd.finished:
            // returnOnce has been executed in another goroutine
        case <-ctx.Done():
            // ...
            return
        case <-timer.C:
            // ...
        }
    }()

    return cmd.errChan
}

dashboard 視覺化hystrix的上報資訊

程式碼中StreamHandler就是把所有斷路器的狀態以流的方式不斷的推送到dashboard. 這部分程式碼我就不用說了,很簡單。
需要在你的服務端加3行程式碼,啟動我們的流服務

    hystrixStreamHandler := hystrix.NewStreamHandler()
    hystrixStreamHandler.Start()
    go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)

dashboard我使用的是docker版。

docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest

在下面輸入你服務的地址,我是
http://192.168.1.67:81/hystrix.stream

如果是叢集可以使用Turbine進行監控,有時間大家自己來看吧