1. 程式人生 > >Codis原始碼解析——sentinel的重同步(2)

Codis原始碼解析——sentinel的重同步(2)

Topom.ha.monitor本身相當於一個上帝視角的sentinel。它本身並不是一個實際的sentinel伺服器,但是它負責收集各個sentinel的監控資訊,並對叢集作出反饋。這一講我們就來看看Topom.ha.monitor。這一篇的原始碼也有助於大家理解併發模型中context的使用。

下面引數中的servers []string就是新增的sentinel的ip:port所組成的字串陣列,有多少個sentinel,陣列的長度就有多少

func (s *Topom) rewatchSentinels(servers []string) {
    if s.ha.monitor != nil
{ s.ha.monitor.Cancel() s.ha.monitor = nil } if len(servers) == 0 { s.ha.masters = nil } else { //建立Topom中的ha.monitor s.ha.monitor = redis.NewSentinel(s.config.ProductName, s.config.ProductAuth) s.ha.monitor.LogFunc = log.Warnf s.ha.monitor.ErrFunc = log.WarnErrorf go
func(p *redis.Sentinel) { var trigger = make(chan struct{}, 1) //一個延時工具類,要麼休眠一秒,要麼休眠現在距離deadline的時間,取決於哪個更短 //如果現在已經過了deadline,就不休眠 delayUntil := func(deadline time.Time) { //如果從Sentinel中Context.Done()讀出值,就表示這個sentinel的context已經被cancel for
!p.IsCanceled() { var d = deadline.Sub(time.Now()) if d <= 0 { return } time.Sleep(math2.MinDuration(d, time.Second)) } } go func() { defer close(trigger) callback := func() { select { case trigger <- struct{}{}: default: } } for !p.IsCanceled() { timeout := time.Minute * 15 retryAt := time.Now().Add(time.Second * 10) if !p.Subscribe(servers, timeout, callback) { delayUntil(retryAt) } else { callback() } } }() go func() { for _ = range trigger { var success int for i := 0; i != 10 && !p.IsCanceled() && success != 2; i++ { timeout := time.Second * 5 masters, err := p.Masters(servers, timeout) if err != nil { log.WarnErrorf(err, "fetch group masters failed") } else { if !p.IsCanceled() { s.SwitchMasters(masters) } success += 1 } delayUntil(time.Now().Add(time.Second * 5)) } } }() }(s.ha.monitor) } log.Warnf("rewatch sentinels = %v", servers) } //一個context被取消的標準就是能從Context.Done()中讀出值 func (s *Sentinel) IsCanceled() bool { select { case <-s.Context.Done(): return true default: return false } }

Subscribe是讓sentinel訂閱名為”+switch-master”的channel,並從這個channel中讀取主從切換的資訊。將訂閱成功與否寫到results := make(chan bool, len(sentinels))中在,最後再遍歷results

//timeout為15min
func (s *Sentinel) Subscribe(sentinels []string, timeout time.Duration, onMajoritySubscribed func()) bool {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    timeout += time.Second * 5
    results := make(chan bool, len(sentinels))

    //叢集中sentinel數量的半數以上
    var majority = 1 + len(sentinels)/2

    var subscribed atomic2.Int64
    for i := range sentinels {
        go func(sentinel string) {
            notified, err := s.subscribeDispatch(cntx, sentinel, timeout, func() {
                if subscribed.Incr() == int64(majority) {
                    onMajoritySubscribed()
                }
            })
            if err != nil {
                s.errorf(err, "sentinel-[%s] subscribe failed", sentinel)
            }
            results <- notified
        }(sentinels[i])
    }

    for alive := len(sentinels); ; alive-- {
        //如果超過半數sentinel都沒有訂閱成功
        if alive < majority {
            if cntx.Err() == nil {
                s.printf("sentinel subscribe lost majority (%d/%d)", alive, len(sentinels))
            }
            return false
        }
        select {
        case <-cntx.Done():
            if cntx.Err() != context.DeadlineExceeded {
                s.printf("sentinel subscribe canceled (%v)", cntx.Err())
            }
            return false
        case notified := <-results:
            if notified {
                s.printf("sentinel subscribe notified +switch-master")
                return true
            }
        }
    }
}
//訂閱"+switch-master"成功則返回true
func (s *Sentinel) subscribeDispatch(ctx context.Context, sentinel string, timeout time.Duration,
    onSubscribed func()) (bool, error) {
    var err = s.dispatch(ctx, sentinel, timeout, func(c *Client) error {
        return s.subscribeCommand(c, sentinel, onSubscribed)
    })
    if err != nil {
        switch errors.Cause(err) {
        case context.Canceled, context.DeadlineExceeded:
            return false, nil
        default:
            return false, err
        }
    }
    return true, nil
}
func (s *Sentinel) subscribeCommand(client *Client, sentinel string,
    onSubscribed func()) error {
    var channels = []interface{}{"+switch-master"}
    if err := client.Flush("SUBSCRIBE", channels...); err != nil {
        return errors.Trace(err)
    }
    for _, sub := range channels {
        values, err := redigo.Values(client.Receive())
        if err != nil {
            return errors.Trace(err)
        } else if len(values) != 3 {
            return errors.Errorf("invalid response = %v", values)
        }
        s, err := redigo.Strings(values[:2], nil)
        if err != nil || s[0] != "subscribe" || s[1] != sub.(string) {
            return errors.Errorf("invalid response = %v", values)
        }
    }
    onSubscribed()
    for {
        values, err := redigo.Values(client.Receive())
        if err != nil {
            return errors.Trace(err)
        } else if len(values) < 2 {
            return errors.Errorf("invalid response = %v", values)
        }
        message, err := redigo.Strings(values, nil)
        if err != nil || message[0] != "message" {
            return errors.Errorf("invalid response = %v", values)
        }
        s.printf("sentinel-[%s] subscribe event %v", sentinel, message)

        //從訂閱的channel中讀取訊息
        switch message[1] {
        case "+switch-master":
            if len(message) != 3 {
                return errors.Errorf("invalid response = %v", values)
            }
            var params = strings.SplitN(message[2], " ", 2)
            if len(params) != 2 {
                return errors.Errorf("invalid response = %v", values)
            }
            _, yes := s.isSameProduct(params[0])
            if yes {
                return nil
            }
        }
    }
}

注意,到上面為止,是叢集中的sentinel訂閱了redis伺服器之間主從切換的資訊,只有哨兵知道哪臺是master。對於codis叢集來講,並不清楚哪臺slave被推上了master。下面我們要做的,就是讓哨兵感知到的新的master同樣被codis叢集感知到,也就是將其推到每個group的第一臺server。

最後一步,通過SENTINEL INFO命令得到當前的主伺服器,然後在各個group中更新主伺服器資訊。比方說,如果超過半數sentinel認為group中序號為1的server才是master,就把這臺伺服器和序號為0的server進行交換

func (s *Sentinel) Masters(sentinels []string, timeout time.Duration) (map[int]string, error) {
    cntx, cancel := context.WithTimeout(s.Context, timeout)
    defer cancel()

    timeout += time.Second * 5
    results := make(chan map[int]*SentinelMaster, len(sentinels))

    var majority = 1 + len(sentinels)/2

    for i := range sentinels {
        go func(sentinel string) {
            //通過SENTINEL INFO命令得到哨兵感知到的master
            masters, err := s.mastersDispatch(cntx, sentinel, timeout)
            if err != nil {
                s.errorf(err, "sentinel-[%s] masters failed", sentinel)
            }
            results <- masters
        }(sentinels[i])
    }

    masters := make(map[int]string)
    current := make(map[int]*SentinelMaster)

    var voted int
    for alive := len(sentinels); ; alive-- {
        if alive == 0 {
            switch {
            case cntx.Err() != context.DeadlineExceeded && cntx.Err() != nil:
                s.printf("sentinel masters canceled (%v)", cntx.Err())
                return nil, errors.Trace(cntx.Err())
            case voted != len(sentinels):
                s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
            }
            if voted < majority {
                return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
            }
            return masters, nil
        }
        select {
        case <-cntx.Done():
            switch {
            case cntx.Err() != context.DeadlineExceeded:
                s.printf("sentinel masters canceled (%v)", cntx.Err())
                return nil, errors.Trace(cntx.Err())
            default:
                s.printf("sentinel masters voted = (%d/%d) masters = %d (%v)", voted, len(sentinels), len(masters), cntx.Err())
            }
            //最終通過的方案必須是半數以上sentinel同意的
            if voted < majority {
                return nil, errors.Errorf("lost majority (%d/%d)", voted, len(sentinels))
            }
            return masters, nil
        case m := <-results:
            if m == nil {
                continue
            }
            //構造sentinels選舉出的master
            for gid, master := range m {
                if current[gid] == nil || current[gid].Epoch < master.Epoch {
                    current[gid] = master
                    masters[gid] = master.Addr
                }
            }
            voted += 1
        }
    }
}
func (s *Topom) SwitchMasters(masters map[int]string) error {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.closed {
        return ErrClosedTopom
    }
    s.ha.masters = masters

    if len(masters) != 0 {
        cache := &redis.InfoCache{
            Auth: s.config.ProductAuth, Timeout: time.Millisecond * 100,
        }
        for gid, master := range masters {
            if err := s.trySwitchGroupMaster(gid, master, cache); err != nil {
                log.WarnErrorf(err, "sentinel switch group master failed")
            }
        }
    }
    return nil
}
//執行codis叢集可感知的主從切換
func (s *Topom) trySwitchGroupMaster(gid int, master string, cache *redis.InfoCache) error {
    ctx, err := s.newContext()
    if err != nil {
        return err
    }
    g, err := ctx.getGroup(gid)
    if err != nil {
        return err
    }

    var index = func() int {
        for i, x := range g.Servers {
            if x.Addr == master {
                return i
            }
        }
        for i, x := range g.Servers {
            rid1 := cache.GetRunId(master)
            rid2 := cache.GetRunId(x.Addr)
            if rid1 != "" && rid1 == rid2 {
                return i
            }
        }
        return -1
    }()
    if index == -1 {
        return errors.Errorf("group-[%d] doesn't have server %s with runid = '%s'", g.Id, master, cache.GetRunId(master))
    }
    if index == 0 {
        return nil
    }
    defer s.dirtyGroupCache(g.Id)

    log.Warnf("group-[%d] will switch master to server[%d] = %s", g.Id, index, g.Servers[index].Addr)

    //執行主從切換,我們之前說過,codis叢集中預設每個group的第一個server為master
    g.Servers[0], g.Servers[index] = g.Servers[index], g.Servers[0]
    g.OutOfSync = true
    return s.storeUpdateGroup(g)
}

下一步,在每個Proxy中設定其ha.servers為當前ctx中的sentinel,再執行一次上面的rewatchSentinels方法。

    var fut sync2.Future
    for _, p := range ctx.proxy {
        fut.Add()
        go func(p *models.Proxy) {
            err := s.newProxyClient(p).SetSentinels(ctx.sentinel)
            if err != nil {
                log.ErrorErrorf(err, "proxy-[%s] resync sentinel failed", p.Token)
            }
            fut.Done(p.Token, err)
        }(p)
    }
    for t, v := range fut.Wait() {
        switch err := v.(type) {
        case error:
            if err != nil {
                return errors.Errorf("proxy-[%s] sentinel failed", t)
            }
        }
    }
    p.OutOfSync = false
    //更新zk資訊
    return s.storeUpdateSentinel(p)

總結一下,當一臺sentinel第一次被新增到codis叢集,或者是脫離codis叢集之後,需要執行resync操作來重新對叢集做監控。首先遍歷所有server,放棄其原先監控的資訊。格式化之後,再重新監控叢集中的所有group,並根據dashboard.toml中的配置進行監控設定。最後,新建Topom.ha.monitor上帝視角sentinel,讓叢集中的所有sentinel訂閱”+switch-master”,如果發生主從切換(即可以從channel中讀出值),要從哨兵中讀出當前的master地址,並在每個codis group中將對應的server推到group的第一個。設定每個Proxy的ha.servers為當前ctx中的sentinel,再執行一次上面的rewatchSentinels方法,最後再將sentinel的OutofSync更新為true,然後再更新zk下儲存的資訊。
說明
如有轉載,請註明出處
http://blog.csdn.net/antony9118/article/details/78141271