1. 程式人生 > >Prometheus時序資料庫-資料的插入

Prometheus時序資料庫-資料的插入

# Prometheus時序資料庫-資料的插入 ## 前言 在之前的文章裡,筆者詳細的闡述了Prometheus時序資料庫在記憶體和磁碟中的儲存結構。有了前面的鋪墊,筆者就可以在本篇文章闡述下資料的插入過程。 ## 監控資料的插入 在這裡,筆者並不會去討論Promtheus向各個Endpoint抓取資料的過程。而是僅僅圍繞著資料是如何插入Prometheus的過程做下闡述。對應方法: ``` func (a *headAppender) Add(lset labels.Labels, t int64, v float64) (uint64, error) { ...... // 如果lset對應的series沒有,則建一個。同時把新建的series放入倒排Posting對映裡面 s, created := a.head.getOrCreate(lset.Hash(), lset) if created { // 如果新建立了一個,則將新建的也放到a.series裡面 a.series = append(a.series, record.RefSeries{ Ref: s.ref, Labels: lset, }) } return s.ref, a.AddFast(s.ref, t, v) } ``` 我們就以下面的add函式呼叫為例: ``` app.Add(labels.FromStrings("foo", "bar"), 0, 0) ``` 首先是getOrCreate,顧名思義,不存在則建立一個。建立的過程包含了seriesHashMap/Postings(倒排索引)/LabelIndex的維護。如下圖所示: ![](https://oscimg.oschina.net/oscnet/up-bb69c43d88d81adb175754cfe3cea43460f.png) 然後是AddFast方法 ``` func (a *headAppender) AddFast(ref uint64, t int64, v float64) error{ // 拿出對應的memSeries s := a.head.series.getByID(ref) ...... // 設定為等待提交狀態 s.pendingCommit=true ...... // 為了事務概念,放入temp儲存,等待真正commit時候再寫入memSeries a.samples = append(a.samples, record.RefSample{Ref: ref,T: t,V: v,}) // } ``` Prometheus在add資料點的時候並沒有直接add到memSeries(也就是query所用到的結構體裡),而是加入到一個臨時的samples切片裡面。同時還將這個資料點對應的memSeries同步增加到另一個sampleSeries裡面。 ![](https://oscimg.oschina.net/oscnet/up-caf4f4f54755889dfaa960d7ac9214d39e7.png) ### 事務可見性 為什麼要這麼做呢?就是為了實現commit語義,只有commit過後資料才可見(能被查詢到)。否則,無法見到這些資料。而commit的動作主要就是WAL(Write Ahead Log)以及將headerAppender.samples資料寫到其對應的memSeries中。這樣,查詢就可見這些資料了,如下圖所示: ![](https://oscimg.oschina.net/oscnet/up-ab6300a60c3907ed962089e8ec9cf48e923.png) ### WAL 由於Prometheus最近的資料是儲存在記憶體裡面的,未防止伺服器宕機丟失資料。其在commit之前先寫了日誌WAL。等服務重啟的時候,再從WAL日誌裡面獲取資訊並重放。 ![](https://oscimg.oschina.net/oscnet/up-f0ed5e3bd362b5c56e4a9af31f746c3df30.png) 為了效能,Prometheus了另一個goroutine去做檔案的sync操作,所以並不能保證WAL不丟。進而也不能保證監控資料完全不丟。這點也是監控業務的特性決定的。 寫入程式碼為: ``` commit() |=> func (a *headAppender) log() error { ...... // 往WAL寫入對應的series資訊 if len(a.series) > 0 { rec = enc.Series(a.series, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return errors.Wrap(err, "log series") } } ...... // 往WAL寫入真正的samples if len(a.samples) > 0 { rec = enc.Samples(a.samples, buf) buf = rec[:0] if err := a.head.wal.Log(rec); err != nil { return errors.Wrap(err, "log samples") } } } ``` 對應的WAL日誌格式為: #### Series records ``` ┌────────────────────────────────────────────┐ │ type = 1 <1b> │ ├────────────────────────────────────────────┤ │ ┌─────────┬──────────────────────────────┐ │ │ │ id <8b> │ n = len(labels)