OpenFalcon 原始碼分析(Nodata元件)
Nodata版本
VERSION = "0.0.11"
Nodata元件功能
nodata用於檢測監控資料的上報異常。nodata和實時報警judge模組協同工作,過程為: 配置了nodata的採集項超時未上報資料,nodata生成一條預設的模擬資料;使用者配置相應的報警策略,收到mock資料就產生報警。採集項上報異常檢測,作為judge模組的一個必要補充,能夠使judge的實時報警功能更加可靠、完善。【官方描述】
Nodata元件邏輯圖
系統流圖
官方系統流圖
模組結構
官方模組結構圖
main入口分析
func main() { //命令引數解析 cfg := flag.String("c", "cfg.json", "configuration file") version := flag.Bool("v", false, "show version") versionGit := flag.Bool("vg", false, "show version") flag.Parse() //版本輸出 if *version { fmt.Println(g.VERSION) os.Exit(0) } //gitcommit序列號輸出 if *versionGit { fmt.Println(g.VERSION, g.COMMIT) os.Exit(0) } // 全域性配置格式化 g.ParseConfig(*cfg) // 統計 g.StartProc() // 快取Nodata配置 config.Start()// 【參考詳細分析】 // 快取Nodata配置主機的採集資料點 collector.Start()// 【參考詳細分析】 // judge策略判斷 judge.Start()// 【參考詳細分析】 // http API服務 http.Start()// 【參考詳細分析】 select {} }
config.Start() 從DB載入Nodata配置(dashboard配置nodata策略寫入mysql)
# 載入nodata配置主函式 func Start() { if !g.Config().Config.Enabled { log.Println("config.Start warning, not enabled") return } service.InitDB()//初始化DB StartNdConfigCron()//載入nodata配置快取至記憶體 log.Println("config.Start ok") } ## 初始化DB連線 func InitDB() { _, err := GetDbConn(dbBaseConnName)//"db.base"連線conn初始化並儲存至記憶體Map if err != nil { log.Fatalln("config.InitDB error", err) return // never go here } log.Println("config.InitDB ok") } ### GetDbConn實現函式 ### makeDbConn函式實現sql客戶端連線dbconn ### 記憶體map dbConnMap[connName]儲存dbconn func GetDbConn(connName string) (c *sql.DB, e error) { dbLock.Lock() defer dbLock.Unlock() var err error var dbConn *sql.DB dbConn = dbConnMap[connName] if dbConn == nil { dbConn, err = makeDbConn()//建立sql客戶端連線 if err != nil { closeDbConn(dbConn) return nil, err } dbConnMap[connName] = dbConn } err = dbConn.Ping()//dbconn檢測,conn.Ping() if err != nil { closeDbConn(dbConn)//dbconn關閉,conn.Close() delete(dbConnMap, connName) return nil, err } return dbConn, err } func makeDbConn() (conn *sql.DB, err error) { conn, err = sql.Open("mysql", g.Config().Config.Dsn) if err != nil { return nil, err } conn.SetMaxIdleConns(int(g.Config().Config.MaxIdle)) err = conn.Ping() return conn, err } ## func StartNdConfigCron() { ndconfigCron.AddFuncCC(ndconfigCronSpec, func() { start := time.Now().Unix() cnt, _ := syncNdConfig()// end := time.Now().Unix() if g.Config().Debug { log.Printf("config cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } // 統計 g.ConfigCronCnt.Incr() g.ConfigLastTs.SetCnt(end - start) g.ConfigLastCnt.SetCnt(int64(cnt)) }, 1) ndconfigCron.Start()// } #### 獲取nodata配置、重新格式化及快取配置全域性公開Map(NdConfigMap) func syncNdConfig() (cnt int, errt error) { // 獲取nodata配置函式呼叫 configs := service.GetMockCfgFromDB() // 重新格式化配置NodateConfig結構 nm := nmap.NewSafeMap() for _, ndc := range configs { endpoint := ndc.Endpoint metric := ndc.Metric tags := ndc.Tags if endpoint == "" { log.Printf("bad config: %+v\n", ndc) continue } pk := cutils.PK(endpoint, metric, tags) nm.Put(pk, ndc) } // 快取map SetNdConfigMap(nm) return nm.Size(), nil//返回map長度 } ##### 底層獲取nodata配置實現函式 func GetMockCfgFromDB() map[string]*cmodel.NodataConfig { ret := make(map[string]*cmodel.NodataConfig) dbConn, err := GetDbConn("nodata.mockcfg") //獲取dbConn連線 if err != nil { log.Println("db.get_conn error, mockcfg", err) return ret } q := fmt.Sprintf("SELECT id,name,obj,obj_type,metric,tags,dstype,step,mock FROM mockcfg") rows, err := dbConn.Query(q) //執行mockcfg表查詢語句 if err != nil { log.Println("db.query error, mockcfg", err) return ret } defer rows.Close() for rows.Next() {//迭代查詢結果集 t := MockCfg{} tags := "" err := rows.Scan(&t.Id, &t.Name, &t.Obj, &t.ObjType, &t.Metric, &tags, &t.Type, &t.Step, &t.Mock) if err != nil { log.Println("db.scan error, mockcfg", err) continue } t.Tags = cutils.DictedTagstring(tags) //"tagskey=value"格式化map[Key]Value err = checkMockCfg(&t) //檢測配置是否有效 if err != nil { log.Println("check mockcfg, error:", err) continue } endpoints := getEndpoint(t.ObjType, t.Obj) //獲取endpoint列表(hosts slice),後面有objtype為"host/group/other"處理函式分析。 if len(endpoints) < 1 { continue } for _, ep := range endpoints { uuid := cutils.PK(ep, t.Metric, t.Tags)//UUID format 'endpoint/metric/k=v,k=v(tags)' ncfg := cmodel.NewNodataConfig(t.Id, t.Name, t.ObjType, ep, t.Metric, t.Tags, t.Type, t.Step, t.Mock) val, found := ret[uuid] if !found {//如果不存在則新建 ret[uuid] = ncfg continue } //如果存在,判斷配置型別 if isSpuerNodataCfg(val, ncfg) { // val is spuer than ncfg, so drop ncfg log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, val.Name, ncfg.Name) } else { ret[uuid] = ncfg //如果原val配置型別為group,而新ncfg配置型別為host則覆蓋原有配置 log.Printf("nodata.mockcfg conflict, %s, used %s, drop %s", uuid, ncfg.Name, val.Name) } } } return ret//返回map[UUID]*cmodel.NodataConfig } ###### 根據objType獲取Hosts slice func getEndpoint(objType string, obj string) []string { switch objType { case "host"://型別Host與處理函式 return getEndpointFromHosts(obj) case "group"://型別group與處理函式 return getEndpointFromGroups(obj) case "other"://型別other與處理函式 return getEndpointFromOther(obj) default: return make([]string, 0) } } //型別Host與處理函式 func getEndpointFromHosts(hosts string) []string { ret := make([]string, 0) hlist := strings.Split(hosts, "\n")//分隔處理 for _, host := range hlist { nh := strings.TrimSpace(host) if nh != "" { ret = append(ret, nh) } } return ret } //型別group與處理函式 func getEndpointFromGroups(grps string) []string { grplist := strings.Split(grps, "\n") // get host map, avoid duplicating hosts := make(map[string]string) for _, grp := range grplist { ngrp := strings.TrimSpace(grp) if len(ngrp) < 1 { continue } hostmap := GetHostsFromGroup(grp) //根據Group名稱獲取主機MAP for hostname := range hostmap { if hostname != "" { hosts[hostname] = hostname } } } // get host slice ret := make([]string, 0) for key := range hosts { ret = append(ret, key) } return ret } //型別other與處理函式,同類型Host與處理函式 func getEndpointFromOther(other string) []string { return getEndpointFromHosts(other) }
collector.Start() 快取Nodata配置主機的採集資料點
# 執行收集nodata資料主函式 func Start() { if !g.Config().Collector.Enabled { log.Println("collector.Start warning, not enabled") return } StartCollectorCron()//週期任務,收集nodata資料 log.Println("collector.Start ok") } ## 定時任務執行,收集函式呼叫與任務執行 func StartCollectorCron() { collectorCron.AddFuncCC("*/20 * * * * ?", func() { start := time.Now().Unix() cnt := collectDataOnce()//收集函式呼叫 end := time.Now().Unix() if g.Config().Debug { log.Printf("collect cron, cnt %d, time %ds, start %s\n", cnt, end-start, ttime.FormatTs(start)) } //統計 g.CollectorCronCnt.Incr() g.CollectorLastTs.SetCnt(end - start) g.CollectorLastCnt.SetCnt(int64(cnt)) g.CollectorCnt.IncrBy(int64(cnt)) }, 1) collectorCron.Start() } ### 收集功能實現函式 func collectDataOnce() int { keys := config.Keys() keysLen := len(keys) // 併發+同步控制 cfg := g.Config().Collector//collector全域性配置 concurrent := int(cfg.Concurrent) //併發數 if concurrent < 1 || concurrent > 50 { concurrent = 10 } sema := tsema.NewSemaphore(concurrent)//建立併發同步 batch := int(cfg.Batch)//全域性批量處理數 if batch < 100 || batch > 1000 { batch = 200 //batch不能太小, 否則channel將會很大 } //根據nodata配置數長度和批量處理數建立channel長度 batchCnt := (keysLen + batch - 1) / batch rch := make(chan int, batchCnt+1) i := 0 for i < keysLen { leftLen := keysLen - i fetchSize := batch // 每次處理batch個配置 if leftLen < fetchSize { fetchSize = leftLen } fetchKeys := keys[i : i+fetchSize] // 併發collect資料 sema.Acquire() //執行緒批量處理(fetchKeys, fetchSize) go func(keys []string, keySize int) { defer sema.Release() size, err := fetchItemsAndStore(keys, keySize)//批查獲取函式呼叫 if err != nil { log.Printf("fetchItemAndStore fail, size:%v, error:%v", size, err) } if g.Config().Debug { log.Printf("fetchItemAndStore keys:%v, key_size:%v, ret_size:%v", keys, keySize, size) } rch <- size }(fetchKeys, fetchSize) i += fetchSize } collectCnt := 0 //計數 for i := 0; i < batchCnt; i++ { select { case cnt := <-rch: collectCnt += cnt } } return collectCnt } #### 獲取資料實現函式 func fetchItemsAndStore(fetchKeys []string, fetchSize int) (size int, errt error) { if fetchSize < 1 { return } // form request args args := make([]*cmodel.GraphLastParam, 0) for _, key := range fetchKeys { ndcfg, found := config.GetNdConfig(key) //根據hostname返回nodata配置 if !found { continue } endpoint := ndcfg.Endpoint //endpoint主機物件 counter := cutils.Counter(ndcfg.Metric, ndcfg.Tags) // 格式metric/tags(k=v,k=v) arg := &cmodel.GraphLastParam{endpoint, counter} //請求引數 args = append(args, arg) } if len(args) < 1 { return } resp, err := queryLastPoints(args)//API呼叫查詢endpoint最近一次採集資料。(POST請求API元件api呼叫,檢視後面函式分析) if err != nil { return 0, err } // store items fts := time.Now().Unix()//儲存Items時間float time,Judge邏輯用到 for _, glr := range resp { //log.Printf("collect:%v\n", glr) if glr == nil || glr.Value == nil { continue } AddItem(cutils.PK2(glr.Endpoint, glr.Counter), NewDataItem(glr.Value.Timestamp, float64(glr.Value.Value), "OK", fts))//快取收集到的監控資料(ItemMap)。Value.Timestamp資料項時間戳,Value.Value資料項值,"OK"資料項狀態,fts資料項儲存時間。 } return len(resp), nil } ##### config.GetNdConfig 根據hostname返回NodataConfig配置 func GetNdConfig(key string) (*cmodel.NodataConfig, bool) { rwlock.RLock() defer rwlock.RUnlock() val, found := NdConfigMap.Get(key)//map操作 if found && val != nil { return val.(*cmodel.NodataConfig), true } return &cmodel.NodataConfig{}, false } ##### API元件POST請求實現函式 func queryLastPoints(param []*cmodel.GraphLastParam) (resp []*cmodel.GraphLastResp, err error) { cfg := g.Config() uri := fmt.Sprintf("%s/api/v1/graph/lastpoint", cfg.PlusApi.Addr) //介面定義 var req *httplib.BeegoHttpRequest headers := map[string]string{"Content-type": "application/json"} req, err = requests.CurlPlus(uri, "POST", "nodata", cfg.PlusApi.Token, headers, map[string]string{}) //Curl請求頭與方法 if err != nil { return } req.SetTimeout(time.Duration(cfg.PlusApi.ConnectTimeout)*time.Millisecond, time.Duration(cfg.PlusApi.RequestTimeout)*time.Millisecond) b, err := json.Marshal(param) if err != nil { return } req.Body(b)//請求體 err = req.ToJson(&resp)//請求執行與response json if err != nil { return } return resp, nil } ##### 快取資料點 func AddItem(key string, val *DataItem) { listv, found := ItemMap.Get(key) if !found { ll := tlist.NewSafeListLimited(3) //每個採集指標,快取最新的3個數據點 ll.PushFrontViolently(val) ItemMap.Put(key, ll) return } listv.(*tlist.SafeListLimited).PushFrontViolently(val) } func NewDataItem(ts int64, val float64, fstatus string, fts int64) *DataItem { return &DataItem{Ts: ts, Value: val, FStatus: fstatus, FTs: fts} } type GraphLastResp struct { Endpointstring`json:"endpoint"` Counterstring`json:"counter"` Value*RRDData`json:"value"` }
judge.Start()
# Judge執行入口函式 func Start() { StartJudgeCron()//執行呼叫 log.Println("judge.Start ok") } # 定時任務Judge執行函式 func StartJudgeCron() { judgeCron.AddFuncCC(judgeCronSpec, func() { start := time.Now().Unix() judge()//執行judge函式呼叫 end := time.Now().Unix() if g.Config().Debug { log.Printf("judge cron, time %ds, start %s\n", end-start, ttime.FormatTs(start)) } // 統計 g.JudgeCronCnt.Incr() g.JudgeLastTs.SetCnt(end - start) // 觸發Mock列表資料傳送 sender.SendMockOnceAsync() }, 1) judgeCron.Start()//執行Cron } ## judge實現函式 func judge() { now := time.Now().Unix() keys := config.Keys() for _, key := range keys { ndcfg, found := config.GetNdConfig(key) //根據hostname返回nodata配置 if !found { //策略不存在,不處理 continue } step := ndcfg.Step mock := ndcfg.Mock item, found := collector.GetFirstItem(key) //根據hostname返回collector最近一次配置 if !found { //沒有資料,未開始採集,不處理 continue } //3*step(or 180)超時時間 lastTs := now - getTimeout(step) if item.FStatus != "OK" || item.FTs < lastTs { //資料採集失敗,不處理 continue } //採集到的資料為mock資料,則認為上報超時了 if fCompare(mock, item.Value) == 0 { //判斷採集到的最後一次資料項值timestamp+step(上報週期) //如果小於當前時間則認為已經上報超時了,過了應該上報的週期 //如果大於當前時間則表示還在有效的上報週期內 if LastTs(key)+step <= now { TurnNodata(key, now)//設定nodata狀態 genMock(genTs(now, step), key, ndcfg) //新增到Mock列表 } continue } //判斷採集到的最後一次資料項值timestamp,如果小於 //3*step(or 180)超時時間則認為資料過期,則認為上報超時 if item.Ts < lastTs { if LastTs(key)+step <= now { TurnNodata(key, now) genMock(genTs(now, step), key, ndcfg) } continue } TurnOk(key, now) } } ### 返回最後一次採集資料的timestamp func LastTs(key string) int64 { statusLock.RLock() var ts int64 = 0 v, found := StatusMap.Get(key) if !found { statusLock.RUnlock() return ts } ts = v.(*NodataStatus).Ts statusLock.RUnlock() return ts } // NoData Data Item Struct type DataItem struct { Tsint64//timestamp Valuefloat64 FStatus string// OK|ERR FTsint64//儲存時間float } // Nodata Status Struct type NodataStatus struct { Keystring Status string // OK|NODATA Cntint Tsint64 } ### 設定為Nodata狀態 func TurnNodata(key string, ts int64) { statusLock.Lock() v, found := StatusMap.Get(key) if !found { // create new status ns := NewNodataStatus(key, "NODATA", 1, ts) StatusMap.Put(key, ns) statusLock.Unlock() return } // update status ns := v.(*NodataStatus) ns.Status = "NODATA" ns.Cnt += 1 ns.Ts = ts statusLock.Unlock() return } ### 新增Item至Mock列表 func genMock(ts int64, key string, ndcfg *cmodel.NodataConfig) { sender.AddMock(key, ndcfg.Endpoint, ndcfg.Metric, cutils.SortedTags(ndcfg.Tags), ts, ndcfg.Type, ndcfg.Step, ndcfg.Mock) } func AddMock(key string, endpoint string, metric string, tags string, ts int64, dstype string, step int64, value interface{}) { item := &cmodel.JsonMetaData{metric, endpoint, ts, step, value, dstype, tags} MockMap.Put(key, item)//put into map }
sender.SendMockOnceAsync() 傳送模擬資料
# 傳送Mock資料入口函式 func SendMockOnceAsync() { go SendMockOnce() } ## 併發傳送和統計 func SendMockOnce() int { if !sema.TryAcquire() { return -1 } defer sema.Release() // not enabled if !g.Config().Sender.Enabled { return 0 } start := time.Now().Unix() cnt, _ := sendMock()//呼叫傳送功能模組 end := time.Now().Unix() if g.Config().Debug { log.Printf("sender cron, cnt %d, time %ds, start %s", cnt, end-start, ttime.FormatTs(start)) } // 統計 g.SenderCronCnt.Incr() g.SenderLastTs.SetCnt(end - start) g.SenderCnt.IncrBy(int64(cnt)) return cnt } ### 傳送模擬資料 func sendMock() (cnt int, errt error) { cfg := g.Config().Sender//全域性配置載入 batch := int(cfg.Batch)//批量值 connTimeout := cfg.ConnectTimeout //連線超時 requTimeout := cfg.RequestTimeout//API請求超時 // 傳送至transfer元件 mocks := MockMap.Slice()//獲取Mock列表 MockMap.Clear()//清空列表空間 mockSize := len(mocks) i := 0 for i < mockSize { leftLen := mockSize - i sendSize := batch if leftLen < sendSize { sendSize = leftLen } fetchMocks := mocks[i : i+sendSize]//取一批量 i += sendSize items := make([]*cmodel.JsonMetaData, 0) //整理為slice for _, val := range fetchMocks { if val == nil { continue } items = append(items, val.(*cmodel.JsonMetaData)) } cntonce, err := sendItemsToTransfer(items, len(items), "nodata.mock", time.Millisecond*time.Duration(connTimeout), time.Millisecond*time.Duration(requTimeout)) //API POT呼叫,傳送Mock至Transfer if err == nil { if g.Config().Debug { log.Println("send items:", items) } cnt += cntonce } } return cnt, nil } // API介面呼叫傳送Mock至Transfer func sendItemsToTransfer(items []*cmodel.JsonMetaData, size int, httpcliname string, connT, reqT time.Duration) (cnt int, errt error) { if size < 1 { return } cfg := g.Config() transUlr := fmt.Sprintf("http://%s/api/push", cfg.Sender.TransferAddr) //請求介面API hcli := thttpclient.GetHttpClient(httpcliname, connT, reqT) // 請求體 itemsBody, err := json.Marshal(items) if err != nil { log.Println(transUlr+", format body error,", err) errt = err return } // 構造與執行API req, err := http.NewRequest("POST", transUlr, bytes.NewBuffer(itemsBody)) req.Header.Set("Content-Type", "application/json; charset=UTF-8")//請求內容型別 req.Header.Set("Connection", "close") postResp, err := hcli.Do(req)//執行POST if err != nil { log.Println(transUlr+", post to dest error,", err) errt = err return } defer postResp.Body.Close() //響應狀態200判斷 if postResp.StatusCode/100 != 2 { log.Println(transUlr+", post to dest, bad response,", postResp.Body) errt = fmt.Errorf("request failed, %s", postResp.Body) return } return size, nil }
http.Start() http API服務
func Start() { go startHttpServer() } func configRoutes() { configCommonRoutes()//公共API路由,可參考Agent模組 configProcHttpRoutes()// configDebugHttpRoutes()//Debug API路由 } func startHttpServer() { if !g.Config().Http.Enabled { return } addr := g.Config().Http.Listen if addr == "" { return } configRoutes()//配置路由 s := &http.Server{//httpServer例項 Addr:addr, MaxHeaderBytes: 1 << 30, } log.Println("http.startHttpServer ok, listening", addr) log.Fatalln(s.ListenAndServe())//監聽與服務 } ## Proc統計API模組 func configProcHttpRoutes() { // counters http.HandleFunc("/proc/counters", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/statistics/all", func(w http.ResponseWriter, r *http.Request) { }) // judge.status, /proc/status/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/status/", func(w http.ResponseWriter, r *http.Request) { }) // collector.last.item, /proc/collect/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/collect/", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg http.HandleFunc("/proc/config", func(w http.ResponseWriter, r *http.Request) { }) // config.mockcfg /proc/config/$endpoint/$metric/$tags-pairs http.HandleFunc("/proc/config/", func(w http.ResponseWriter, r *http.Request) { }) // config.hostgroup, /group/$grpname http.HandleFunc("/proc/group/", func(w http.ResponseWriter, r *http.Request) { urlParam := r.URL.Path[len("/proc/group/"):] RenderDataJson(w, service.GetHostsFromGroup(urlParam)) }) } ## Debug API func configDebugHttpRoutes() { http.HandleFunc("/debug/collector/collect", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/config/sync", func(w http.ResponseWriter, r *http.Request) { }) http.HandleFunc("/debug/sender/send", func(w http.ResponseWriter, r *http.Request) { }) }
思考與查證
- nodata config、collector 、judge各模組執行週期是多少?
- Judge主要判斷Nodata主機的兩點要素是?
擴充套件學習
- ofollow,noindex">github.com/toolkits/cron 定時任務golang Cron庫
- github.com/go-sql-driver/mysql Mysql連線客戶端