1. 程式人生 > >Codis原始碼解析——dashboard的啟動(2)

Codis原始碼解析——dashboard的啟動(2)

1 重新整理redis狀態

首先認識兩個重要的struct

type Future struct {
    sync.Mutex
    wait sync.WaitGroup
    vmap map[string]interface{}
}
type RedisStats struct {
    //儲存了叢集中Redis伺服器的各種資訊和統計數值,詳見redis的info命令
    Stats map[string]string `json:"stats,omitempty"`
    Error *rpc.RemoteError  `json:"error,omitempty"`
Sentinel map[string]*redis.SentinelGroup `json:"sentinel,omitempty"` UnixTime int64 `json:"unixtime"` Timeout bool `json:"timeout,omitempty"` }

接下來看看dashboard如何重新整理redis狀態

func (s *Topom) RefreshRedisStats(timeout time.Duration) (*sync2.Future, error) {
    s.mu.Lock()
    defer
s.mu.Unlock() //從快取中讀出slots,group,proxy,sentinel等資訊封裝在context struct中 ctx, err := s.newContext() if err != nil { return nil, err } var fut sync2.Future goStats := func(addr string, do func(addr string) (*RedisStats, error)) { fut.Add() go func() { stats := s.newRedisStats(addr, timeout, do) stats.UnixTime = time.Now().Unix() //vmap中新增鍵為addr,值為RedisStats的map
fut.Done(addr, stats) }() } //遍歷ctx中的group,再遍歷每個group中的Server。如果對group和Server結構不清楚的,可以看看/pkg/models/group.go檔案 //每個Group除了id,還有一個屬性就是GroupServer。每個GroupServer有自己的地址、資料中心、action等等 for _, g := range ctx.group { for _, x := range g.Servers { goStats(x.Addr, func(addr string) (*RedisStats, error) { //前面我們已經說過,Topom中有三個redis pool,分別是action,stats,ha。pool本質上就是map[String]*list.List。 //這個是從stats的pool中根據Server的地址從pool中取redis client,如果沒有client,就建立 //然後加入到pool裡面,並通過Info命令獲取詳細資訊。整個流程和下面的sentinel類似,這裡就不放具體的方法實現了 m, err := s.stats.redisp.InfoFull(addr) if err != nil { return nil, err } return &RedisStats{Stats: m}, nil }) } } //通過sentinel維護codis叢集中每一組的主備關係 for _, server := range ctx.sentinel.Servers { goStats(server, func(addr string) (*RedisStats, error) { c, err := s.ha.redisp.GetClient(addr) if err != nil { return nil, err } //實際上就是將client加入到Pool的pool屬性裡面去,pool本質上就是map[String]*list.List //鍵是client的addr,值是client本身 //如果client不存在,就新建一個空的list defer s.ha.redisp.PutClient(c) m, err := c.Info() if err != nil { return nil, err } sentinel := redis.NewSentinel(s.config.ProductName, s.config.ProductAuth) //獲得map[string]*SentinelGroup,鍵是每一組的master的名字,SentinelGroup則是主從對 p, err := sentinel.MastersAndSlavesClient(c) if err != nil { return nil, err } return &RedisStats{Stats: m, Sentinel: p}, nil }) } //前面的所有gostats執行完之後,遍歷Future的vmap,將值賦給Topom.stats.servers go func() { stats := make(map[string]*RedisStats) for k, v := range fut.Wait() { stats[k] = v.(*RedisStats) } s.mu.Lock() defer s.mu.Unlock() s.stats.servers = stats }() return &fut, nil } func (p *Pool) GetClient(addr string) (*Client, error) { c, err := p.getClientFromCache(addr) if err != nil || c != nil { return c, err } return NewClient(addr, p.auth, p.timeout) } func (p *Pool) getClientFromCache(addr string) (*Client, error) { p.mu.Lock() defer p.mu.Unlock() if p.closed { return nil, ErrClosedPool } if list := p.pool[addr]; list != nil { for i := list.Len(); i != 0; i-- { c := list.Remove(list.Front()).(*Client) //一個client可被回收的條件是,Pool的timeout為0,或者這個client上一次使用距離現在小於Pool.timeout //ha和stats裡面的Pool的timeout為5秒,action的則根據配置檔案dashboard.toml中的migration_timeout一項來決定 if p.isRecyclable(c) { return c, nil } else { c.Close() } } } return nil, nil } type Client struct { conn redigo.Conn Addr string Auth string Database int LastUse time.Time Timeout time.Duration }

RedisStats中的sentinel如下所示,有幾組主備,就有幾組SentinelGroup,鍵是product-name與group-id拼起來的

這裡寫圖片描述

newContext一步主要就是呼叫refillCache,過載了四個快取,分別是refillCacheSlots,refillCacheGroup,refillCacheProxy和refillCacheSentinel。這四個方法基本一致,以refillCacheSlots為例。方法傳入的是Topom.cache.slots

type context struct {
    slots []*models.SlotMapping
    group map[int]*models.Group
    proxy map[string]*models.Proxy

    sentinel *models.Sentinel

    hosts struct {
        sync.Mutex
        m map[string]net.IP
    }
    method int
}
//重新填充topom.cache中的資料,並賦給context結構
func (s *Topom) newContext() (*context, error) {
    if s.closed {
        return nil, ErrClosedTopom
    }
    if s.online {
        if err := s.refillCache(); err != nil {
            return nil, err
        } else {
            ctx := &context{}
            ctx.slots = s.cache.slots
            ctx.group = s.cache.group
            ctx.proxy = s.cache.proxy
            ctx.sentinel = s.cache.sentinel
            ctx.hosts.m = make(map[string]net.IP)
            ctx.method, _ = models.ParseForwardMethod(s.config.MigrationMethod)
            return ctx, nil
        }
    } else {
        return nil, ErrNotOnline
    }
}
func (s *Topom) refillCacheSlots(slots []*models.SlotMapping) ([]*models.SlotMapping, error) {
    //如果cache中的slots為空,就直接返回store裡面的slots
    if slots == nil {
        return s.store.SlotMappings()
    }
    for i, _ := range slots {
        //如果cache中的slots[i]不為空,直接進入下一個迴圈
        if slots[i] != nil {
            continue
        }
        //如果slots[i]為空,就從store中取出對應的SlotMapping並賦值給cache中的這個slot
        m, err := s.store.LoadSlotMapping(i, false)
        if err != nil {
            return nil, err
        }
        if m != nil {
            slots[i] = m
        } else {
            //如果store中取出的對應的SlotMapping也為空,就新建一個SlotMapping賦值給當前slot
            slots[i] = &models.SlotMapping{Id: i}
        }
    }
    return slots, nil
}
func (s *Store) LoadSlotMapping(sid int, must bool) (*SlotMapping, error) {
    //返回值b是zkClient根據路徑轉化成的byte陣列
    b, err := s.client.Read(s.SlotPath(sid), must)
    if err != nil || b == nil {
        return nil, err
    }
    m := &SlotMapping{}
    //將byte陣列封裝在實體類SlotMapping實體類中
    if err := jsonDecode(m, b); err != nil {
        return nil, err
    }
    return m, nil
}
func (s *Store) SlotPath(sid int) string {
    return SlotPath(s.product, sid)
}
//這裡的codisDir是/codis3
func SlotPath(product string, sid int) string {
    return filepath.Join(CodisDir, product, "slots", fmt.Sprintf("slot-%04d", sid))
}
type SlotMapping struct {
    Id      int `json:"id"`
    GroupId int `json:"group_id"`

    Action struct {
        Index    int    `json:"index,omitempty"`
        State    string `json:"state,omitempty"`
        TargetId int    `json:"target_id,omitempty"`
    } `json:"action"`
}

總結一下,重新整理redis的過程中,首先建立上下文,從cache中讀取slots,group,proxy,sentinel等資訊,如果讀不到就通過store從zk上獲取,如果zk中也為空就建立。遍歷叢集中的redis伺服器以及主從關係,建立RedisStats並與addr關聯形成map,儲存在future的vmap中。全部儲存完後,再把vmap寫入Topom.stats.servers

我們可以在控制檯上打印出Topom.stats.redisp的相關資訊。因為goroutine中設定了每個一秒休眠,所以叢集的redisp實際上是每秒重新整理一次

stats redisp: &{{0 0}  map[*.*.*.*:6379:0xc4206933e0 *.*.*.*:6380:0xc420693890 127.0.0.1:6379:0xc4206be540] 5000000000 {0xc420320000} false}

2 重新整理proxy狀態

重新整理proxy狀態的程式碼和重新整理redis的類似,就不贅述了。可以參照Codis原始碼解析——proxy新增到叢集
最後的步驟

3 處理同步操作

首先要明白,同步操作,指的就是一個group中的主從codis-server伺服器之間進行資料的同步,GroupServer是Group的一個屬性,標明瞭當前group中的所有codis-server的地址和action等等資訊

type Group struct {
    Id      int            `json:"id"`
    Servers []*GroupServer `json:"servers"`

    Promoting struct {
        Index int    `json:"index,omitempty"`
        State string `json:"state,omitempty"`
    } `json:"promoting"`

    OutOfSync bool `json:"out_of_sync"`
}

type GroupServer struct {
    Addr       string `json:"server"`
    DataCenter string `json:"datacenter"`

    Action struct {
        Index int    `json:"index,omitempty"`
        State string `json:"state,omitempty"`
    } `json:"action"`

    ReplicaGroup bool `json:"replica_group"`
}

我們直接看ProcessSyncAction,在/pkg/topom/topom_action.go檔案中

func (s *Topom) ProcessSyncAction() error {
    //同步操作之前的準備工作
    addr, err := s.SyncActionPrepare()
    if err != nil || addr == "" {
        return err
    }
    log.Warnf("sync-[%s] process action", addr)

    //執行同步操作
    exec, err := s.newSyncActionExecutor(addr)
    if err != nil || exec == nil {
        return err
    }
    return s.SyncActionComplete(addr, exec() != nil)
}

同步操作之前的準備工作是,使用s.newContext()獲取上下文,從上下文中,遍歷每個group中的每個codis-server,從Action.State為pending的codis-server中,選出Action.Index最小的那臺伺服器,並獲取其所在的group,如果這個group的Promoting.State為nothing,這臺伺服器就可以從主伺服器同步資料。將這個codis-server的Action.Index設為0,Action.State設為syncing,更新zk中儲存的資訊,並將cache中關於這臺伺服器的資訊設為nil,這樣下次就會從store中重新載入資料到cache。

下一步,檢查當前server在group中的index,如果index不為0,就表示這臺server不是group中的主伺服器(codis是將group中index為0的那臺server作為主的),下一步就是當前server從主服務同步資料,通過redigo傳送同步命令

return func() error {
    c, err := redis.NewClient(addr, s.config.ProductAuth, time.Minute*30)
    if err != nil {
        log.WarnErrorf(err, "create redis client to %s failed", addr)
        return err
    }
    defer c.Close()
    if err := c.SetMaster(master); err != nil {
        log.WarnErrorf(err, "redis %s set master to %s failed", addr, master)
        return err
    }
    return nil
}, nil
func NewClient(addr string, auth string, timeout time.Duration) (*Client, error) {
    c, err := redigo.Dial("tcp", addr, []redigo.DialOption{
        redigo.DialConnectTimeout(math2.MinDuration(time.Second, timeout)),
        redigo.DialPassword(auth),
        redigo.DialReadTimeout(timeout), redigo.DialWriteTimeout(timeout),
    }...)
    if err != nil {
        return nil, errors.Trace(err)
    }
    return &Client{
        conn: c, Addr: addr, Auth: auth,
        LastUse: time.Now(), Timeout: timeout,
    }, nil
}
func (c *Client) SetMaster(master string) error {
    host, port, err := net.SplitHostPort(master)
    if err != nil {
        return errors.Trace(err)
    }
    c.conn.Send("MULTI")
    c.conn.Send("CONFIG", "SET", "masterauth", c.Auth)
    c.conn.Send("SLAVEOF", host, port)
    c.conn.Send("CONFIG", "REWRITE")
    c.conn.Send("CLIENT", "KILL", "TYPE", "normal")
    values, err := redigo.Values(c.Do("EXEC"))
    if err != nil {
        return errors.Trace(err)
    }
    for _, r := range values {
        if err, ok := r.(redigo.Error); ok {
            return errors.Trace(err)
        }
    }
    return nil
}

同步之後,會將這臺codis-server的Action.State設定為”synced”或者”synced_failed”,並在zk中更新相關資訊,抹除cache。

注意,儘管整個過程中,都用了鎖,每次還是會檢查group的Promoting.State是否nothing,codis-server的Action.Index是否為0,Action.State是否為syncing,只有全部符合才進行同步

4 處理slot操作

到這裡,dashboard的啟動工作已經完成,可以看到,dashboard啟動過程中,實際上啟動了很多goroutine來對後續操作進行處理,這些我們都會在後面的文章的具體章節中做分析,這一節只需要關注到dashboard啟動過程中做了什麼即可。

相關推薦

myBatis原始碼解析-快取篇2

上一章分析了mybatis的原始碼的日誌模組,像我們經常說的mybatis一級快取,二級快取,快取究竟在底層是怎樣實現的。此次開始分析快取模組 1. 原始碼位置,mybatis原始碼包位於org.apache.ibatis.cache下,如圖 2. 先從org.apache.ibatis.cache下的cac

Codis原始碼解析——dashboard啟動2

1 重新整理redis狀態 首先認識兩個重要的struct type Future struct { sync.Mutex wait sync.WaitGroup vmap map[string]interface{} } typ

Codis原始碼解析——dashboard啟動1

dashboard是codis的叢集管理工具,支援proxy和server的新增、刪除、資料遷移,所有對叢集的操作必須通過dashboard。dashboard的啟動過程和proxy類似。dashboard的啟動只是初始化一些必要的資料結構,複雜的在於對叢集的操

TiKV 原始碼解析系列文章gRPC Server 的初始化和啟動流程

作者:屈鵬 本篇 TiKV 原始碼解析將為大家介紹 TiKV 的另一週邊元件—— grpc-rs。grpc-rs 是 PingCA

解析MySQL binlog --2FORMAT_DESCRIPTION_EVENT

mysql binlog 該格式描述事件時binlog version 4中為了取代之前版本的START_EVENT_3事件而引入的。是binlog文件的第一個事件,並在一個binlog文件中僅出現一次。具體定義:binlog-version:binlog版本mysql-server version:

Dubbo——快速啟動2

快速啟動 Dubbo 採用全 Spring 配置方式,透明化接入應用,對應用沒有任何 API 侵入,只需用 Spring 載入 Dubbo 的配置即可,Dubbo 基於 Spring 的 Schema 擴充套件 進行載入。 mvn: <!-- dubbo 依賴-->

mybatis原始碼-解析配置檔案之配置檔案Configuration解析(超詳細, 值得收藏)

1. 簡介 1.1 系列內容 本系列文章講解的是mybatis解析配置檔案內部的邏輯, 即 Reader reader = Resources.getResourceAsReader("mybatis-config.xml"); SqlSessionFact

mybatis原始碼-解析配置檔案解析的流程

1. 簡介 在之前的文章《mybatis 初步使用(IDEA的Maven專案, 超詳細)》中, 講解了mybatis的初步使用, 並總結了以下mybatis的執行流程: 通過 Resources 工具類讀取 mybatis-config.xml,

原始碼剖析】Launcher 8.0 原始碼 25---使用者操作2模式切換

 模式就是介面,除普通模式外,Launcher還有兩個特殊模式,分別是overView模式和Springloader模式。此處採用狀態模式這種設計模式,共有三個狀態。 overView模式是長按桌面空白處,出現特殊功能,比如設定桌布,新增widget,特殊設定(橫屏開關

caffe原始碼解析:層layer的註冊與管理

caffe中所有的layer都是類的結構,它們的構造相關的函式都註冊在一個全域性變數g_registry_ 中。 首先這個變數的型別 CreatorRegistry是一個map定義, public: typedef shared_ptr<Layer<Dt

mybatis原始碼-解析配置檔案之配置檔案Mapper解析

其中, mappers作為configuration節點的一部分配置, 在本文章中, 我們講解解析mappers節點, 即 xxxMapper.xml 檔案的解析。 1 解析入口 在解析 mybatis-config.xml 時, 會進行解析 xxxMapper.xml 的檔案。 在圖示流程的 XMLCo

java集合類原始碼詳解-ArrayList2

上次關於ArrayList的結構沒有做總結。這次還是補充在自己部落格裡面吧。 ArrayList繼承自一個抽象類。實現了四個介面。 AbstractList繼承自AbstractCollection。AbstractCollection繼承自Object。 ArrayL

CentOS 7安裝Oracle 11gR2以及設定自啟動2

6、建立表空間和使用者授權 (1)、連線資料庫 $ sqlplus / as sysdba (2)、建立資料庫表空間 語法: create tablespace 表空間名 datafile ‘實體地址(相當於檔案路徑)’ size初始大小(單位M) autoextend on next每次

python原始碼分析----記憶體分配2

早就應該寫部分的內容了。。。。最近比較負能量。。。傷不起啊。。 上一篇說到了,在python的記憶體分配中兩個非常重要的方法:PyObject_Malloc和PyObject_Free 在具體的來這兩個方法之前,先要看看別的一些東西 //這裡用usedpool構成了一個雙

Selenium2Library原始碼解析與擴充套件

一直覺得Selenium2Library對selenium的封裝很贊,最近模擬它的結構封裝給一個同事寫了個C# selenium的demo,過程中看了細看了一部分原始碼。加上之前封裝的內容,分享一波。 注1:以下涉及到RF的指令碼全未加延時sleep,如需除錯

Hystrix 原始碼解析 —— 請求執行之失敗回退邏輯

本文主要基於 Hystrix 1.5.X 版本 1. 概述本文主要分享 Hystrix 命令執行(四)之失敗回退邏輯。 建議 :對 RxJava 已經有一定的瞭解的基礎上閱讀本文。 Hystrix 執行命令整體流程如下圖: 紅圈 :Hy

TiKV 原始碼解析系列文章Prometheus

開發十年,就只剩下這套架構體系了! >>>   

Dubbo原始碼解析之SPI:擴充套件類的載入過程

Dubbo是一款開源的、高效能且輕量級的Java RPC框架,它提供了三大核心能力:面向介面的遠端方法呼叫、智慧容錯和負載均衡,以及服務自動註冊和發現。 Dubbo最早是阿里公司內部的RPC框架,於 2011 年開源,之後迅速成為國內該類開源專案的佼佼者,2018年2月,通過投票正式成為 Apache基金會孵

myBatis原始碼解析-日誌篇1

上半年在進行知識儲備,下半年爭取寫一點好的部落格來記錄自己原始碼之路。在學習原始碼的路上也掌握了一些設計模式,可所謂一舉兩得。本次打算寫Mybatis的原始碼解讀。 準備工作 1. 下載mybatis原始碼 下載地址:https://github.com/mybatis/mybatis-3  2.

myBatis原始碼解析-資料來源篇3

前言:我們使用mybatis時,關於資料來源的配置多使用如c3p0,druid等第三方的資料來源。其實mybatis內建了資料來源的實現,提供了連線資料庫,池的功能。在分析了快取和日誌包的原始碼後,接下來分析mybatis中的資料來源實現。 類圖:mybatis中關於資料來源的原始碼包路徑如下: