1. 程式人生 > >Codis原始碼解析——處理slot操作(1)

Codis原始碼解析——處理slot操作(1)

上一篇我們講了slot在叢集中的分配方式,重點講了auto-rebalance的原理。之前我們說過,再啟動dashboard的時候,有一個goroutine專門用來處理slot的操作。這一篇我們就來看看slot的操作是如何進行的。我們這裡舉例也是用叢集中有兩個group和1024個從未分配的slot。

首先複習一下,在slot還處於未分配狀態的時候,上下文中的1024個SlotMapping如下所示。後面就是以SlotMapping為單位進行處理,所以這裡一定要對其結構掌握清楚

這裡寫圖片描述

接下來,當我們使用auto-rebalance對叢集進行處理後,每個slot都被指定了相應的遷移計劃

func
(s *Topom) ProcessSlotAction() error { for s.IsOnline() { var ( marks = make(map[int]bool) //分配slot的時候點選彈窗的confirm之後,這個plans才能取出值 plans = make(map[int]bool) ) var accept = func(m *models.SlotMapping) bool { if marks[m.GroupId] || marks[m.Action.TargetId] { return
false } if plans[m.Id] { return false } return true } //對plans和marks進行初始化 var update = func(m *models.SlotMapping) bool { //只有在槽當前的GroupId為0的時候,marks[m.GroupId]才是false if m.GroupId != 0 { marks[m.GroupId] = true
} marks[m.Action.TargetId] = true plans[m.Id] = true return true } //按照預設的配置檔案,這個值是100,並行遷移的slot數量,是一個閥值 var parallel = math2.MaxInt(1, s.config.MigrationParallelSlots) //第一次的時候plans為空,所以下面的方法一定會執行一次,這個過程中plans會初始化。後面如果plans的長度大於100,就直接對所有plans做處理; //否則如果叢集中所有Slotmapping中Action.state最小的那個Slotmapping如果處於pending,preparing或者prepared,也可以跳出迴圈對plans進行處理 for parallel > len(plans) { //對是否滿足plans的處理情況做過濾,後面會講這個方法 _, ok, err := s.SlotActionPrepareFilter(accept, update) if err != nil { return err } else if !ok { break } } //在指定slot的分配plan之前,這個一直是return nil if len(plans) == 0 { return nil } var fut sync2.Future //從plans中取出具體的每個slot的遷移計劃,前面我們已經說過,plans的鍵是每一個slot的id,值是要遷移到的groupId for sid, _ := range plans { fut.Add() go func(sid int) { log.Warnf("slot-[%d] process action", sid) //針對每個slot做處理 var err = s.processSlotAction(sid) if err != nil { status := fmt.Sprintf("[ERROR] Slot[%04d]: %s", sid, err) s.action.progress.status.Store(status) } else { s.action.progress.status.Store("") } //在Future的vmap中儲存slotId和對應的error,並呼叫WaitGroup.Done fut.Done(strconv.Itoa(sid), err) }(sid) } //當所有slot操作結束之後,遍歷Future的vmap,取出有error的並返回 for _, v := range fut.Wait() { if v != nil { return v.(error) } } time.Sleep(time.Millisecond * 10) } return nil }

一個slot共有七種狀態,分別是:
nothing(用空字串表示)、pending、preparing、prepared、migrating、finished、syncing

在看每個slot具體的操作之前,可以先看一下SlotActionPrepareFilter這個方法。


func (s *Topom) SlotActionPrepareFilter(accept, update func(m *models.SlotMapping) bool) (int, bool, error) {
    s.mu.Lock()
    defer s.mu.Unlock()
    //載入上下文
    ctx, err := s.newContext()
    if err != nil {
        return 0, false, err
    }

    //找到所有Action.State既不為空也不是pending的SlotMapping中Action.Index最小的SlotMapping
    var minActionIndex = func(filter func(m *models.SlotMapping) bool) (picked *models.SlotMapping) {
        for _, m := range ctx.slots {
            if m.Action.State == models.ActionNothing {
                continue
            }
            if filter(m) {
                if picked != nil && picked.Action.Index < m.Action.Index {
                    continue
                }
                //只有一個slot沒有執行過update方法,accept才會返回true;也就是說,一個slot只會被處理一次
                if accept == nil || accept(m) {
                    picked = m
                }
            }
        }
        return picked
    }

    var m = func() *models.SlotMapping {
        var picked = minActionIndex(func(m *models.SlotMapping) bool {
            return m.Action.State != models.ActionPending
        })
        if picked != nil {
            return picked
        }
        if s.action.disabled.IsTrue() {
            return nil
        }
        //如果前面找不到Action.State既不為空也不是pending的SlotMapping中Action.Index最小的SlotMapping
        //就去找Action.State為pending的SlotMapping中Action.Index最小的SlotMapping
        return minActionIndex(func(m *models.SlotMapping) bool {
            return m.Action.State == models.ActionPending
        })
    }()

    if m == nil {
        return 0, false, nil
    }

    if update != nil && !update(m) {
        return 0, false, nil
    }

    log.Warnf("slot-[%d] action prepare:\n%s", m.Id, m.Encode())

    //變更每個SlotMapping的action.state,並與zk互動
    //另外,Action.state符合preparing或者prepared的時候,要根據SlotMapping的引數同步到Slot
    switch m.Action.State {

    case models.ActionPending:

        defer s.dirtySlotsCache(m.Id)

        //Action.State指向下一階段
        m.Action.State = models.ActionPreparing
        //只是更新zk
        if err := s.storeUpdateSlotMapping(m); err != nil {
            return 0, false, err
        }

        fallthrough

    case models.ActionPreparing:

        defer s.dirtySlotsCache(m.Id)

        log.Warnf("slot-[%d] resync to prepared", m.Id)

        m.Action.State = models.ActionPrepared
        //同步SlotMapping操作,後面會有介紹
        if err := s.resyncSlotMappings(ctx, m); err != nil {
            log.Warnf("slot-[%d] resync-rollback to preparing", m.Id)
            m.Action.State = models.ActionPreparing
            s.resyncSlotMappings(ctx, m)
            log.Warnf("slot-[%d] resync-rollback to preparing, done", m.Id)
            return 0, false, err
        }
        if err := s.storeUpdateSlotMapping(m); err != nil {
            return 0, false, err
        }

        fallthrough

    case models.ActionPrepared:

        defer s.dirtySlotsCache(m.Id)

        log.Warnf("slot-[%d] resync to migrating", m.Id)

        m.Action.State = models.ActionMigrating
        if err := s.resyncSlotMappings(ctx, m); err != nil {
            log.Warnf("slot-[%d] resync to migrating failed", m.Id)
            return 0, false, err
        }
        if err := s.storeUpdateSlotMapping(m); err != nil {
            return 0, false, err
        }

        fallthrough

    case models.ActionMigrating:

        return m.Id, true, nil

    case models.ActionFinished:

        return m.Id, true, nil

    //如果不屬於以上任何一種情況,直接返回invalid
    default:

        return 0, false, errors.Errorf("slot-[%d] action state is invalid", m.Id)

    }
}

很顯然,上面的方法取出的最小的Action.State的Slotmapping是

這裡寫圖片描述

當一個SlotMapping處於preparing和prepared轉檯的時候,會將其狀態推進到下一階段,並同步SlotMapping,根據[]*models.SlotMapping建立1024個models.Slot,再填充1024個pkg/proxy/slots.go中的Slot,此過程中Router為每個Slot都分配了對應的backendConn。下面就來看看這個同步方法。

func (s *Topom) resyncSlotMappings(ctx *context, slots ...*models.SlotMapping) error {
    if len(slots) == 0 {
        return nil
    }
    var fut sync2.Future
    for _, p := range ctx.proxy {
        fut.Add()
        go func(p *models.Proxy) {
            //ApiClient中儲存了proxy的address以及xauth資訊。其中xauth是根據ProductName,ProductAuth以及proxy的token生成的
            err := s.newProxyClient(p).FillSlots(ctx.toSlotSlice(slots, p)...)
            if err != nil {
                log.ErrorErrorf(err, "proxy-[%s] resync slots failed", p.Token)
            }
            fut.Done(p.Token, err)
        }(p)
    }
    for t, v := range fut.Wait() {
        switch err := v.(type) {
        case error:
            if err != nil {
                return errors.Errorf("proxy-[%s] resync slots failed", t)
            }
        }
    }
    return nil
}

同步的過程中有兩個方法比較複雜,分別是FillSlots和toSlotSlice。這一節我們先來看toSlotSlice。這個方法實際上就是將SlotMapping切片轉化為Slot切片,在Slot結構體重記錄了這個Slot在遷移的不同階段,接到的請求由哪個BackendAddr進行處理。

type Slot struct {
    Id     int  `json:"id"`
    Locked bool `json:"locked,omitempty"`

    BackendAddr        string `json:"backend_addr,omitempty"`
    BackendAddrGroupId int    `json:"backend_addr_group_id,omitempty"`
    MigrateFrom        string `json:"migrate_from,omitempty"`
    MigrateFromGroupId int    `json:"migrate_from_group_id,omitempty"`

    ForwardMethod int `json:"forward_method,omitempty"`

    ReplicaGroups [][]string `json:"replica_groups,omitempty"`
}
func (ctx *context) toSlotSlice(slots []*models.SlotMapping, p *models.Proxy) []*models.Slot {
    var slice = make([]*models.Slot, len(slots))
    for i, m := range slots {
        slice[i] = ctx.toSlot(m, p)
    }
    return slice
}
func (ctx *context) toSlot(m *models.SlotMapping, p *models.Proxy) *models.Slot {
    slot := &models.Slot{
        Id:     m.Id,
        Locked: ctx.isSlotLocked(m),

        ForwardMethod: ctx.method,
    }
    switch m.Action.State {
    case models.ActionNothing, models.ActionPending:
        //這個getGroupMaster實際上就是從每個Group中取出第一臺,因為codis中認定group中新增的第一臺是主伺服器
        slot.BackendAddr = ctx.getGroupMaster(m.GroupId)
        slot.BackendAddrGroupId = m.GroupId
        slot.ReplicaGroups = ctx.toReplicaGroups(m.GroupId, p)
    case models.ActionPreparing:
        slot.BackendAddr = ctx.getGroupMaster(m.GroupId)
        slot.BackendAddrGroupId = m.GroupId
    case models.ActionPrepared:
        fallthrough
    case models.ActionMigrating:
        slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
        slot.BackendAddrGroupId = m.Action.TargetId
        slot.MigrateFrom = ctx.getGroupMaster(m.GroupId)
        slot.MigrateFromGroupId = m.GroupId
    case models.ActionFinished:
        slot.BackendAddr = ctx.getGroupMaster(m.Action.TargetId)
        slot.BackendAddrGroupId = m.Action.TargetId
    default:
        log.Panicf("slot-[%d] action state is invalid:\n%s", m.Id, m.Encode())
    }
    return slot
}

其中如果slot處於migrating狀態,migrate.bc就不為空,如果恰好有請求發到這個slot,proxy就會執行一次SLOTSMGRTTAGONE讓這個slot遷移完成,再由其backend.bc來執行請求

下面的ReplicaGroups是專門為了主從讀寫分離設定的。從優先順序我們可以看到,讀請求會優先轉發到和proxy在一臺伺服器上的codis-server,也就是優先順序最高的0

func (ctx *context) toReplicaGroups(gid int, p *models.Proxy) [][]string {
    g := ctx.group[gid]
    switch {
    case g == nil:
        return nil
    case g.Promoting.State != models.ActionNothing:
        return nil
    case len(g.Servers) <= 1:
        return nil
    }
    var dc string
    var ip net.IP
    if p != nil {
        dc = p.DataCenter
        ip = ctx.lookupIPAddr(p.AdminAddr)
    }
    //replica的訪問優先順序
    getPriority := func(s *models.GroupServer) int {
        if ip == nil || dc != s.DataCenter {
            return 2
        }
        if ip.Equal(ctx.lookupIPAddr(s.Addr)) {
            return 0
        } else {
            return 1
        }
    }
    var groups [3][]string
    for _, s := range g.Servers {
        if s.ReplicaGroup {
            p := getPriority(s)
            groups[p] = append(groups[p], s.Addr)
        }
    }
    var replicas [][]string
    for _, l := range groups {
        if len(l) != 0 {
            replicas = append(replicas, l)
        }
    }
    return replicas
}

有關於FillSlot和每個槽的處理方法processSlotAction,我們會在下一篇講