1. 程式人生 > >go微服務框架kratos學習筆記七(kratos warden 負載均衡 balancer)

go微服務框架kratos學習筆記七(kratos warden 負載均衡 balancer)

目錄

  • go微服務框架kratos學習筆記七(kratos warden 負載均衡 balancer)
    • demo
      • demo server
      • demo client
      • dao
      • service
      • p2c
      • roundrobin
    • grpc官方負載均衡工作流程
    • 原始碼

go微服務框架kratos學習筆記七(kratos warden 負載均衡 balancer)


本節看看kratos的學習負載均衡策略的使用。

kratos 的負載均衡和服務發現一樣也是基於grpc官方api實現的。

grpc官方的負載均衡自帶了一個round-robin輪詢策略、即像一個for迴圈一樣挨個服的發請求、但這顯然不能滿足我們的需求、於是kratos自帶了兩種負載均衡策略:

WRR (Weighted Round Robin)
該演算法在加權輪詢法基礎上增加了動態調節權重值,使用者可以在為每一個節點先配置一個初始的權重分,之後演算法會根據節點cpu、延遲、服務端錯誤率、客戶端錯誤率動態打分,在將打分乘使用者自定義的初始權重分得到最後的權重值。

P2C (Pick of two choices)
本演算法通過隨機選擇兩個node選擇優勝者來避免羊群效應,並通過ewma儘量獲取服務端的實時狀態。
服務端: 服務端獲取最近500ms內的CPU使用率(需要將cgroup設定的限制考慮進去,併除於CPU核心數),並將CPU使用率乘與1000後塞入每次grpc請求中的的Trailer中夾帶返回: cpu_usage uint64 encoded with string cpu_usage : 1000
客戶端: 主要引數:
server_cpu:通過每次請求中服務端塞在trailer中的cpu_usage拿到服務端最近500ms內的cpu使用率
inflight:當前客戶端正在傳送並等待response的請求數(pending request)
latency: 加權移動平均演算法計算出的介面延遲
client_success:加權移動平均演算法計算出的請求成功率(只記錄grpc內部錯誤,比如context deadline)
目前客戶端,已經預設使用p2c負載均衡演算法


// NewClient returns a new blank Client instance with a default client interceptor.
// opt can be used to add grpc dial options.
func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client {
    c := new(Client)
    if err := c.SetConfig(conf); err != nil {
        panic(err)
    }
    c.UseOpt(grpc.WithBalancerName(p2c.Name))
    c.UseOpt(opt...)
    return c
}

demo

本節使用在筆記四kratos warden-direct方式client呼叫 使用的direct服務發現方式、和相關程式碼。

demo操作
1、分別在兩個docker中啟動一個grpc demo服務。
2、啟動一個client demo服務採用預設p2c負載均衡方式呼叫grpc SayHello()方法

demo server

1、先啟動demo服務 (其實就是一個kratos工具new出來的demo服務、程式碼可參考筆記四、或者在最後的github地址裡面獲取整個demo完整程式碼):

demo client

package dao

import (
    "context"

    "github.com/bilibili/kratos/pkg/net/rpc/warden"

    "google.golang.org/grpc"

    "fmt"
    demoapi "call-server/api"
    "google.golang.org/grpc/balancer/roundrobin"
)

// target server addrs.
const target = "direct://default/10.0.75.2:30001,10.0.75.2:30002" // NOTE: example

// NewClient new member grpc client
func NewClient(cfg *warden.ClientConfig, opts ...grpc.DialOption) (demoapi.DemoClient, error) {
    client := warden.NewClient(cfg, opts...)
    conn, err := client.Dial(context.Background(), target)
    if err != nil {
        return nil, err
    }
    // 注意替換這裡:
    // NewDemoClient方法是在"api"目錄下程式碼生成的
    // 對應proto檔案內自定義的service名字,請使用正確方法名替換
    return demoapi.NewDemoClient(conn), nil
}

// NewClient new member grpc client
func NewGrpcConn(cfg *warden.ClientConfig, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
    fmt.Println("-----tag: NewGrpcConn...")
    //opts = append(opts, grpc.WithBalancerName(roundrobin.Name))
    client := warden.NewClient(cfg, opts...)
    
    conn, err := client.Dial(context.Background(), target)
    if err != nil {
        return nil, err
    }

    return conn, nil
}

target 填上兩個服務ip

其中我多加了一個NewGrpcConn() 函式 、主要用來提取grpc連線。這裡我用了kratos自帶的pool型別來做連線池。

關於這個池、它在 kratos pkg/container/pool 有兩種實現方式 SliceList方式。

package pool

import (
    "context"
    "errors"
    "io"
    "time"

    xtime "github.com/bilibili/kratos/pkg/time"
)

var (
    // ErrPoolExhausted connections are exhausted.
    ErrPoolExhausted = errors.New("container/pool exhausted")
    // ErrPoolClosed connection pool is closed.
    ErrPoolClosed = errors.New("container/pool closed")

    // nowFunc returns the current time; it's overridden in tests.
    nowFunc = time.Now
)

// Config is the pool configuration struct.
type Config struct {
    // Active number of items allocated by the pool at a given time.
    // When zero, there is no limit on the number of items in the pool.
    Active int
    // Idle number of idle items in the pool.
    Idle int
    // Close items after remaining item for this duration. If the value
    // is zero, then item items are not closed. Applications should set
    // the timeout to a value less than the server's timeout.
    IdleTimeout xtime.Duration
    // If WaitTimeout is set and the pool is at the Active limit, then Get() waits WatiTimeout
    // until a item to be returned to the pool before returning.
    WaitTimeout xtime.Duration
    // If WaitTimeout is not set, then Wait effects.
    // if Wait is set true, then wait until ctx timeout, or default flase and return directly.
    Wait bool
}

type item struct {
    createdAt time.Time
    c         io.Closer
}

func (i *item) expired(timeout time.Duration) bool {
    if timeout <= 0 {
        return false
    }
    return i.createdAt.Add(timeout).Before(nowFunc())
}

func (i *item) close() error {
    return i.c.Close()
}

// Pool interface.
type Pool interface {
    Get(ctx context.Context) (io.Closer, error)
    Put(ctx context.Context, c io.Closer, forceClose bool) error
    Close() error
}

dao

dao中新增一個連線池。

package dao

import (
    "context"
    "time"

    demoapi "call-server/api"
    "call-server/internal/model"

    "github.com/bilibili/kratos/pkg/cache/memcache"
    "github.com/bilibili/kratos/pkg/cache/redis"
    "github.com/bilibili/kratos/pkg/conf/paladin"
    "github.com/bilibili/kratos/pkg/database/sql"
    "github.com/bilibili/kratos/pkg/net/rpc/warden"
    "github.com/bilibili/kratos/pkg/sync/pipeline/fanout"
    xtime "github.com/bilibili/kratos/pkg/time"
    //grpcempty "github.com/golang/protobuf/ptypes/empty"
    //"github.com/pkg/errors"

    "github.com/google/wire"
    "github.com/bilibili/kratos/pkg/container/pool"
    "io"
    "reflect"
    "google.golang.org/grpc"

)

var Provider = wire.NewSet(New, NewDB, NewRedis, NewMC)

//go:generate kratos tool genbts
// Dao dao interface
type Dao interface {
    Close()
    Ping(ctx context.Context) (err error)
    // bts: -nullcache=&model.Article{ID:-1} -check_null_code=$!=nil&&$.ID==-1
    Article(c context.Context, id int64) (*model.Article, error)
    //SayHello(c context.Context, req *demoapi.HelloReq) (resp *grpcempty.Empty, err error)

    //get an demo grpcConn/grpcClient/ from rpc pool
    GrpcConnPut(ctx context.Context, cc *grpc.ClientConn) (err error)
    GrpcConn(ctx context.Context) (gcc *grpc.ClientConn, err error)
    GrpcClient(ctx context.Context) (cli demoapi.DemoClient, err error)
}

// dao dao.
type dao struct {
    db         *sql.DB
    redis      *redis.Redis
    mc         *memcache.Memcache
    cache      *fanout.Fanout
    demoExpire int32
    rpcPool    pool.Pool 
}

// New new a dao and return.
func New(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d Dao, cf func(), err error) {
    return newDao(r, mc, db)
}

func newDao(r *redis.Redis, mc *memcache.Memcache, db *sql.DB) (d *dao, cf func(), err error) {
    var cfg struct {
        DemoExpire xtime.Duration
    }
    if err = paladin.Get("application.toml").UnmarshalTOML(&cfg); err != nil {
        return
    }

    // new pool
    pool_config := &pool.Config{
        Active:      0,
        Idle:        0,
        IdleTimeout: xtime.Duration(0 * time.Second),
        WaitTimeout: xtime.Duration(30 * time.Millisecond),
    }

    rpcPool := pool.NewSlice(pool_config)
    rpcPool.New = func(ctx context.Context) (cli io.Closer, err error) {
        wcfg := &warden.ClientConfig{}
        paladin.Get("grpc.toml").UnmarshalTOML(wcfg)
        if cli, err = NewGrpcConn(wcfg); err != nil {
            return
        }

        return
    }

    d = &dao{
        db:         db,
        redis:      r,
        mc:         mc,
        cache:      fanout.New("cache"),
        demoExpire: int32(time.Duration(cfg.DemoExpire) / time.Second),
        rpcPool:    rpcPool,
    }
    cf = d.Close
    return
}

// Close close the resource.
func (d *dao) Close() {
    d.cache.Close()
}

// Ping ping the resource.
func (d *dao) Ping(ctx context.Context) (err error) {
    return nil
}

func (d *dao) GrpcClient(ctx context.Context) (cli demoapi.DemoClient, err error) {
    var cc io.Closer
    if cc, err = d.rpcPool.Get(ctx); err != nil {
        return
    }

    cli = demoapi.NewDemoClient(reflect.ValueOf(cc).Interface().(*grpc.ClientConn))
    return
}

func (d *dao) GrpcConnPut(ctx context.Context, cc *grpc.ClientConn) (err error) {
    err = d.rpcPool.Put(ctx, cc, false)
    return
}

func (d *dao) GrpcConn(ctx context.Context) (gcc *grpc.ClientConn, err error) {
    var cc io.Closer
    if cc, err = d.rpcPool.Get(ctx); err != nil {
        return
    }

    gcc = reflect.ValueOf(cc).Interface().(*grpc.ClientConn)
    return
}

service

// SayHello grpc demo func.
func (s *Service) SayHello(ctx context.Context, req *pb.HelloReq) (reply *empty.Empty, err error) {
    reply = new(empty.Empty)
    var cc demoapi.DemoClient
    var gcc *grpc.ClientConn
    if gcc, err = s.dao.GrpcConn(ctx); err != nil {
        return
    }
    defer s.dao.GrpcConnPut(ctx, gcc)
    cc = demoapi.NewDemoClient(gcc)
    //if cc, err = s.dao.GrpcClient(ctx); err != nil {
    //  return
    //}
    cc.SayHello(ctx, req)
    fmt.Printf("hello %s", req.Name)
    return
}

好了現在測試 、 佈局如下 :

p2c

roundrobin

輪詢方式只需要在NewGrpcConn()裡面加語一句配置項即可,它會覆蓋掉p2c的配置項。

opts = append(opts, grpc.WithBalancerName(roundrobin.Name))

grpc官方負載均衡工作流程

我們目前也只是使用了Api、最後來瞧瞧官方grpc的工作流程 :

gRPC開源元件官方並未直接提供服務註冊與發現的功能實現,但其設計文件已提供實現的思路,並在不同語言的gRPC程式碼API中已提供了命名解析和負載均衡介面供擴充套件。

  1. 服務啟動後,gPRC客戶端通過resolve發起一個名稱解析請求。名稱會被解析為一個或更多的IP地址,每個地址指明它是一個伺服器地址還是一個負載均衡器地址,並且包含一個Opt指明哪一個客戶端的負載均衡策略應該被使用(例如: 輪詢排程或grpclb)。

  2. 客戶端實現一個負載均衡策略。
    注意:如果任何一個被解析器返回的地址是均衡器地址,那麼這個客戶端會使用grpclb策略,而不管請求的Opt配置的是哪種負載均衡策略。否則,客戶端會使用一個Opt項配置負載均衡策略。如果沒有負載均衡策略,那麼客戶端會使用預設的取第一個可用伺服器地址的策略。

  3. 負載均衡策略對每一個伺服器地址建立一個子通道。

  4. 當呼叫rpc請求時,負載均衡策略會決定應該傳送到哪個子通道(例如: 哪個伺服器)。
    grpclb策略下,客戶端按負載均衡器返回的順序傳送請求到伺服器。如果伺服器列表為空,呼叫將會阻塞直到收到一個非空的列表。

原始碼

本節測試程式碼 : https://github.com/ailumiyana/kratos-note/tree/master/warden/bala