1. 程式人生 > >Codis原始碼解析——proxy的啟動

Codis原始碼解析——proxy的啟動

proxy啟動的時候,首先檢查輸入的命令列,一般情況下,啟動proxy的命令如下:

nohup ./bin/codis-proxy --ncpu=2 --config=./conf/proxy.conf --log=./logs/proxy.log --log-level=WARN &

程式會解析這行命令引數,下面舉個例子(例項程式碼是cmd/proxy/main.go),有關於go的並行,這裡要特別說明一下,如果不顯示的指明GOMAXPROCS的話,goroutine都是執行在同一個CPU核心上,一個goroutine得到時間片的時候,其他的goroutine都在等待。所以還是要根據輸入手動指定一下的。

d, err := docopt.Parse(usage, nil, true, "", false)
var ncpu int
if n, ok := utils.ArgumentInteger(d, "--ncpu"); ok {
    ncpu = n
} else {
    ncpu = 4
}
runtime.GOMAXPROCS(ncpu)

config制定了配置檔案的目錄,程式會讀取配置檔案中的配置項,填充到pkg/proxy/config.go中的config struct中。

當所有config屬性填充完畢之後,呼叫pkg/proxy/proxy.go中的構造方法,得到一個Proxy,首先我們看看proxy這個struct的結構

type Proxy struct {
    mu sync.Mutex

    xauth string
    //不要搞混了,這個是/pkg/models/proxy.go下面的Proxy struct,也就是proxy成功建立之後,
    //在zookeeper的/codis3/codis-wujiang/proxy/proxy-token中展示的資訊
    model *models.Proxy

    //接收退出資訊的channel
    exit struct {
        C chan struct{}
    }
    online bool
    closed bool
config *Config //Router中儲存了叢集中所有sharedBackendConnPool和slot,用於將redis請求轉發給相應的slot進行處理 router *Router ignore []byte //監聽proxy的19000埠的Listener,也就是proxy實際工作的埠 lproxy net.Listener //監聽proxy_admin的11080埠的Listener,也就是codis叢集和proxy進行互動的埠 ladmin net.Listener ha struct { //上帝視角sentinel,並不是真正的物理伺服器 //s := &Sentinel{Product: product, Auth: auth} //s.Context, s.Cancel = context.WithCancel(context.Background()) monitor *redis.Sentinel //int為groupId,標示每個group的主伺服器 masters map[int]string //當前叢集中的所有物理sentinel servers []string } //java客戶端Jodis與codis叢集互動,就是通過下面的struct,裡面儲存了zkClient以及"/jodis/codis-wujiang/proxy-token"這個路徑 jodis *Jodis }

當然,重點還是得到proxy的這個方法。這個方法是proxy啟動過程中最重要的一步,內容也很多,初次看會比較頭疼

func New(config *Config) (*Proxy, error) {
    //config的引數校驗
    if err := config.Validate(); err != nil {
        return nil, errors.Trace(err)
    }
    if err := models.ValidateProduct(config.ProductName); err != nil {
        return nil, errors.Trace(err)
    }

    //新建一個Proxy,後面就是它填充屬性
    s := &Proxy{}
    s.config = config
    s.exit.C = make(chan struct{})

    //通過config new一個Router,這一步只是初始化了Router中的兩個sharedBackendConnPool的結構,
    //也就是map[string]*sharedBackendConn
    s.router = NewRouter(config)
    s.ignore = make([]byte, config.ProxyHeapPlaceholder.Int64())

    s.model = &models.Proxy{
        StartTime: time.Now().String(),
    }
    s.model.ProductName = config.ProductName
    s.model.DataCenter = config.ProxyDataCenter
    //獲得當前程序的程序號
    s.model.Pid = os.Getpid()
    //返回路徑的字串
    s.model.Pwd, _ = os.Getwd()
    //獲取當前作業系統資訊和主機名
    if b, err := exec.Command("uname", "-a").Output(); err != nil {
        log.WarnErrorf(err, "run command uname failed")
    } else {
        s.model.Sys = strings.TrimSpace(string(b))
    }
    s.model.Hostname = utils.Hostname

    //將config中的引數設定到models.proxy裡,主要設定的引數是admin_addr,proxy_addr,設定之後呼叫net.Listen進行監聽,成功的話將Listener存入proxy,生成Token和xauth,以及proxy.Jodis。這裡也設定了zk最終的樹形路徑

    //如果配置檔案中的jodis_compatible設定的是false,就採用codis3的zk路徑,jodis/codis-demo/proxy-token。如果相容這裡設定的true,就還採用codis2時期的/zk/codis/db_productName
    if err := s.setup(config); err != nil {
        s.Close()
        return nil, err
    }

    //到這一步,其實proxy已經新建完畢,控制檯打印出新建的proxy資訊
    log.Warnf("[%p] create new proxy:\n%s", s, s.model.Encode())

    unsafe2.SetMaxOffheapBytes(config.ProxyMaxOffheapBytes.Int64())

    //新建一個路由表,對傳送到11080埠的請求做處理
    go s.serveAdmin()
    //啟動goroutine來監聽傳送到19000埠的redis請求
    go s.serveProxy()

    s.startMetricsJson()
    s.startMetricsInfluxdb()
    s.startMetricsStatsd()

    return s, nil
}

models.Proxy的詳細資訊會掛載在zk目錄”/codis3/codis-wujiang/proxy/proxy-token”下

{
    "id": 1,
    "token": "0317b8f67921f8c7a2d19d372cc9511b",
    "start_time": "2017-07-28 14:44:36.462306337 +0800 CST",
    "admin_addr": "*.*.*.*:11080",
    "proto_type": "tcp4",
    "proxy_addr": "*.*.*.*:19000",
    "jodis_path": "/jodis/codis-wujiang/proxy-0317b8f67921f8c7a2d19d372cc9511b",
    "product_name": "codis-wujiang",
    "pid": 45092,
    "pwd": "/app/codis",
    "sys": "Linux cnsz22vla888.novalocal 2.6.32-504.el6.x86_64 #1 SMP Wed Oct 15 04:27:16 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux",
    "hostname": "cnsz22vla888.novalocal",
    "datacenter": ""
}

路由表那裡,著重介紹一下。路由表本質上其實就是一個map,key是路徑 ==> “/”,value是該路徑所對應的處理函式 ==> 在我們這裡就是newApiServer(s)出來的處理函式。codis-proxy啟動之後,這一步是啟動http服務,當叢集配置命令請求向prxoy_admin的11080埠發過來之後,轉發做相應處理。newApiServer方法使用了這裡使用了martini框架。有關於martini框架的詳細資訊,可以參見http://www.oschina.net/p/martini/

func (s *Proxy) serveAdmin() {
    if s.IsClosed() {
        return
    }
    defer s.Close()

    log.Warnf("[%p] admin start service on %s", s, s.ladmin.Addr())

    eh := make(chan error, 1)
    go func(l net.Listener) {
        //新建路由表
        h := http.NewServeMux()
        //這裡表示newApiServer用來處理所有"/"路徑的請求
        h.Handle("/", newApiServer(s))
        hs := &http.Server{Handler: h}
        //對每個net.Listener的連線,新建goroutine讀請求,並呼叫srv.Handler進行處理
        eh <- hs.Serve(l)
    }(s.ladmin)

    select {
    case <-s.exit.C:
        log.Warnf("[%p] admin shutdown", s)
    case err := <-eh:
        log.ErrorErrorf(err, "[%p] admin exit on error", s)
    }
}

//在/pkg/proxy/proxy_api.go中,請求轉發到不同的路徑,r是路由規則,最終的返回結果是由路由規則得到的handler
func newApiServer(p *Proxy) http.Handler {
    m := martini.New()
    m.Use(martini.Recovery())
    m.Use(render.Renderer())
    m.Use(func(w http.ResponseWriter, req *http.Request, c martini.Context) {
        path := req.URL.Path
        if req.Method != "GET" && strings.HasPrefix(path, "/api/") {
            var remoteAddr = req.RemoteAddr
            var headerAddr string
            for _, key := range []string{"X-Real-IP", "X-Forwarded-For"} {
                if val := req.Header.Get(key); val != "" {
                    headerAddr = val
                    break
                }
            }
            log.Warnf("[%p] API call %s from %s [%s]", p, path, remoteAddr, headerAddr)
        }
        c.Next()
    })
    m.Use(gzip.All())
    m.Use(func(c martini.Context, w http.ResponseWriter) {
        w.Header().Set("Content-Type", "application/json; charset=utf-8")
    })

    api := &apiServer{proxy: p}

    r := martini.NewRouter()
    r.Get("/", func(r render.Render) {
        r.Redirect("/proxy")
    })
    r.Any("/debug/**", func(w http.ResponseWriter, req *http.Request) {
        http.DefaultServeMux.ServeHTTP(w, req)
    })

    r.Group("/proxy", func(r martini.Router) {
        r.Get("", api.Overview)
        r.Get("/model", api.Model)
        r.Get("/stats", api.StatsNoXAuth)
        r.Get("/slots", api.SlotsNoXAuth)
    })
    r.Group("/api/proxy", func(r martini.Router) {
        r.Get("/model", api.Model)
        r.Get("/xping/:xauth", api.XPing)
        r.Get("/stats/:xauth", api.Stats)
        r.Get("/stats/:xauth/:flags", api.Stats)
        r.Get("/slots/:xauth", api.Slots)
        r.Put("/start/:xauth", api.Start)
        r.Put("/stats/reset/:xauth", api.ResetStats)
        r.Put("/forcegc/:xauth", api.ForceGC)
        r.Put("/shutdown/:xauth", api.Shutdown)
        r.Put("/loglevel/:xauth/:value", api.LogLevel)
        r.Put("/fillslots/:xauth", binding.Json([]*models.Slot{}), api.FillSlots)
        r.Put("/sentinels/:xauth", binding.Json(models.Sentinel{}), api.SetSentinels)
        r.Put("/sentinels/:xauth/rewatch", api.RewatchSentinels)
    })

    m.MapTo(r, (*martini.Routes)(nil))
    m.Action(r.Handle)
    return m
}

後面的serveProxy,s.acceptConn(l)是啟動goroutine來監聽redis請求,啟動的時候肯定是沒有請求過來的,所以我們看到這裡為止, 下一節Codis原始碼解析——proxy監聽redis請求會詳細紹NewSession(c, s.config).Start(s.router)這裡具體做了什麼。

func (s *Proxy) serveProxy() {
    if s.IsClosed() {
        return
    }
    defer s.Close()

    log.Warnf("[%p] proxy start service on %s", s, s.lproxy.Addr())

    eh := make(chan error, 1)
    go func(l net.Listener) (err error) {
        defer func() {
            eh <- err
        }()
        for {
            //啟動goroutine監聽19000埠請求,有請求到來的時候,返回net.Conn
            c, err := s.acceptConn(l)
            if err != nil {
                return err
            }
            NewSession(c, s.config).Start(s.router)
        }
    }(s.lproxy)

    if d := s.config.BackendPingPeriod.Duration(); d != 0 {
        go s.keepAlive(d)
    }

    select {
    case <-s.exit.C:
        log.Warnf("[%p] proxy shutdown", s)
    case err := <-eh:
        log.ErrorErrorf(err, "[%p] proxy exit on error", s)
    }
}

最後面的三個方法,由於通常啟動的時候,配置檔案中的MetircsReporterServer,MetircsInfluxdbServer和MetircsStatsdPeriod三個欄位都為空字串,實際上什麼都沒做。

再回到主main函式,看看最後的一段程式碼

//啟動時dashboard、coordinator.name和slots這三個引數都為空,所以此時這三個goroutine都沒有執行
switch {
case dashboard != "":
    go AutoOnlineWithDashboard(s, dashboard)
case coordinator.name != "":
    go AutoOnlineWithCoordinator(s, coordinator.name, coordinator.addr)
case slots != nil:
    go AutoOnlineWithFillSlots(s, slots)
}
//未關閉,但也不線上的時候,控制檯每秒輸出日誌
for !s.IsClosed() && !s.IsOnline() {
    log.Warnf("[%p] proxy waiting online ...", s)
    time.Sleep(time.Second)
}

到這裡,整個proxy啟動過程結束。啟動之後,Proxy會預設處於 waiting 狀態,以一秒一次的頻率重新整理狀態,監聽proxy_addr 地址(預設配置檔案中的19000埠),但是不會 accept 連線,通過fe或者命令列新增到叢集並完成叢集狀態的同步,才能改變狀態為online。新增proxy到叢集的過程,詳見個人另一篇部落格Codis原始碼解析——proxy新增到叢集

這裡寫圖片描述

到這裡,我們總結一下,proxy啟動過程中的流程:
讀取配置檔案,獲取Config物件。根據Config新建Proxy,填充Proxy的各個屬性,這裡面比較重要的是填充models.Proxy(詳細資訊可以在zk中檢視),並且與zk連線、註冊相關路徑。啟動goroutine監聽11080埠的codis叢集發過來的請求並進行轉發,以及監聽發到19000埠的redis請求並進行相關處理。