1. 程式人生 > >influxdb原始碼閱讀之tsdb核心資料結構梳理

influxdb原始碼閱讀之tsdb核心資料結構梳理

go語言版本:1.9.2 linux/amd64

InfluxDB版本:1.7

influxdb儲存引擎tsdb程式碼目錄:github.com\influxdata\influxdb\tsdb

可以先閱讀以下對於tsdb的官方文件。 其採用的儲存模型是LSM-Tree模型,對其進行了一定的改造。將其稱之為Time-Structured Merge Tree (TSM)

當一個point寫入時,influxdb根據其所屬的database、measurements和timestamp選取一個對應的shard。每個 shard對應一個TSM儲存引擎。每個shard對應一段時間範圍的儲存。

一個TSM儲存引擎包含:

  • In-Memory Index 在shard之間共享,提供measurements,tags,和series的索引
  • WAL 同其他database的binlog一樣,當WAL的大小達到一定大小後,會重啟開啟一個WAL檔案。
  • Cache 記憶體中快取的WAL,加速查詢
  • TSM Files 壓縮後的series資料
  • FileStore TSM Files的封裝
  • Compactor 儲存資料的比較器
  • Compaction Planner 用來確定哪些TSM檔案需要compaction,同時避免併發compaction之間的相互干擾
  • Compression 用於壓縮持久化檔案
  • Writers/Readers 用於訪問檔案

shard通過CreateShard 來建立。可以看出其依次建立了所需的檔案目錄,然後建立Index 和Shard 資料結構。

Store:

github.com\influxdata\influxdb\tsdb\store.go

// Store manages shards and indexes for databases.
type Store struct {
	mu                sync.RWMutex
	shards            map[uint64]*Shard  // 所有 shards 的索引,key 為其 shard ID
	databases         map[string]struct{}
	sfiles            map[string]*SeriesFile
	SeriesFileMaxSize int64 // Determines size of series file mmap. Can be altered in tests.
	path              string    // 資料庫檔案在磁碟上的儲存路徑

	// shared per-database indexes, only if using "inmem".
	indexes map[string]interface{}

	// Maintains a set of shards that are in the process of deletion.
	// This prevents new shards from being created while old ones are being deleted.
	pendingShardDeletes map[uint64]struct{}

	EngineOptions EngineOptions

	baseLogger *zap.Logger
	Logger     *zap.Logger

	closing chan struct{}
	wg      sync.WaitGroup
	opened  bool
}

Store 是儲存結構中最頂層的抽象結構體,主要包含了 InfluxDB 中所有資料庫的 索引 和 實際儲存資料的 Shard 物件。InfluxDB 中的其他服務都需要通過 Store 物件對底層資料進行操作。store是influxdb的儲存模組,全域性只有一個該例項。主要負責將資料按一定格式寫入磁碟,並且維護influxdb相關的 儲存概念。例如:建立/刪除Shard、建立/刪除retention policy、排程shard的compaction、以及最重要的WriteToShard 等等。在store內部又包含index和engine2個抽象概念,index是對應shard的索引,engine是對應shard的儲存實現, 不同的engine採用不同的儲存格式和策略。後面要講的tsdb其實就是一個engine的實現。在influxdb啟動時,會建立 一個store例項,然後Open它,初始化時,它會載入已經存在的Shard ,並啟動一個Shard監控任務, 監控任務負責排程每個Shard的Compaction和對使用inmem索引的Shard計算每種Tag擁有的數值的基數(與配置中max-values-per-tag有關)。

Shard:

github.com\influxdata\influxdb\tsdb\shard.go

// Shard represents a self-contained time series database. An inverted index of
// the measurement and tag data is kept along with the raw time series data.
// Data can be split across many shards. The query engine in TSDB is responsible
// for combining the output of many shards into a single query result.
type Shard struct {
	path    string   // shard 在磁碟上的路徑
	walPath string   // 對應的 wal 檔案所在目錄
	id      uint64   // shard ID,就是在磁碟上的檔名

	database        string  // 所在資料庫名
	retentionPolicy string  // 對應儲存策略名

	sfile   *SeriesFile
	options EngineOptions

	mu      sync.RWMutex
	_engine Engine     // 儲存引擎,抽象介面,可插拔設計,目前是 tsm1 儲存引擎
	index   Index
	enabled bool

	// expvar-based stats.
	stats       *ShardStatistics
	defaultTags models.StatisticTags

	baseLogger *zap.Logger
	logger     *zap.Logger

	EnableOnOpen bool

	// CompactionDisabled specifies the shard should not schedule compactions.
	// This option is intended for offline tooling.
	CompactionDisabled bool
}

每一個 Shard 物件都有一個單獨的底層資料儲存引擎,engine 負責和底層的檔案資料打交道。Shard 還儲存了一個指向所在資料庫索引的指標,便於快速檢索到該 Shard 中的元資料資訊。儲存引擎,抽象介面,可插拔設計,目前是 tsm1 儲存引擎。Index也是一個抽象介面,可插拔設計,目前有inmem和tsi1。

Engine:

github.com\influxdata\influxdb\tsdb\engine.go

Engine 是一個抽象介面,可插拔設計,對於 InfluxDB 來說,可以很方便地替換掉底層的儲存引擎,目前是 tsm1 儲存引擎。

// Engine represents a swappable storage engine for the shard.
type Engine interface {
	Open() error
	Close() error
	SetEnabled(enabled bool)
	SetCompactionsEnabled(enabled bool)
	ScheduleFullCompaction() error

	WithLogger(*zap.Logger)

	LoadMetadataIndex(shardID uint64, index Index) error

	CreateSnapshot() (string, error)
	Backup(w io.Writer, basePath string, since time.Time) error
	Export(w io.Writer, basePath string, start time.Time, end time.Time) error
	Restore(r io.Reader, basePath string) error
	Import(r io.Reader, basePath string) error
	Digest() (io.ReadCloser, int64, error)

	CreateIterator(ctx context.Context, measurement string, opt query.IteratorOptions) (query.Iterator, error)
	CreateCursorIterator(ctx context.Context) (CursorIterator, error)
	IteratorCost(measurement string, opt query.IteratorOptions) (query.IteratorCost, error)
	WritePoints(points []models.Point) error

	CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
	CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
	DeleteSeriesRange(itr SeriesIterator, min, max int64) error
	DeleteSeriesRangeWithPredicate(itr SeriesIterator, predicate func(name []byte, tags models.Tags) (int64, int64, bool)) error

	MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesN() int64

	MeasurementExists(name []byte) (bool, error)

	MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
	MeasurementFieldSet() *MeasurementFieldSet
	MeasurementFields(measurement []byte) *MeasurementFields
	ForEachMeasurementName(fn func(name []byte) error) error
	DeleteMeasurement(name []byte) error

	HasTagKey(name, key []byte) (bool, error)
	MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)
	TagKeyCardinality(name, key []byte) int

	// Statistics will return statistics relevant to this engine.
	Statistics(tags map[string]string) []models.Statistic
	LastModified() time.Time
	DiskSize() int64
	IsIdle() bool
	Free() error

	io.WriterTo
}

Index:

github.com\influxdata\influxdb\tsdb\engine.go

index是一個抽象介面

type Index interface {
	Open() error
	Close() error
	WithLogger(*zap.Logger)

	Database() string
	MeasurementExists(name []byte) (bool, error)
	MeasurementNamesByRegex(re *regexp.Regexp) ([][]byte, error)
	DropMeasurement(name []byte) error
	ForEachMeasurementName(fn func(name []byte) error) error

	InitializeSeries(keys, names [][]byte, tags []models.Tags) error
	CreateSeriesIfNotExists(key, name []byte, tags models.Tags) error
	CreateSeriesListIfNotExists(keys, names [][]byte, tags []models.Tags) error
	DropSeries(seriesID uint64, key []byte, cascade bool) error
	DropMeasurementIfSeriesNotExist(name []byte) error

	MeasurementsSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesN() int64
	SeriesSketches() (estimator.Sketch, estimator.Sketch, error)
	SeriesIDSet() *SeriesIDSet

	HasTagKey(name, key []byte) (bool, error)
	HasTagValue(name, key, value []byte) (bool, error)

	MeasurementTagKeysByExpr(name []byte, expr influxql.Expr) (map[string]struct{}, error)

	TagKeyCardinality(name, key []byte) int

	// InfluxQL system iterators
	MeasurementIterator() (MeasurementIterator, error)
	TagKeyIterator(name []byte) (TagKeyIterator, error)
	TagValueIterator(name, key []byte) (TagValueIterator, error)
	MeasurementSeriesIDIterator(name []byte) (SeriesIDIterator, error)
	TagKeySeriesIDIterator(name, key []byte) (SeriesIDIterator, error)
	TagValueSeriesIDIterator(name, key, value []byte) (SeriesIDIterator, error)

	// Sets a shared fieldset from the engine.
	FieldSet() *MeasurementFieldSet
	SetFieldSet(fs *MeasurementFieldSet)

	// Size of the index on disk, if applicable.
	DiskSizeBytes() int64

	// Bytes estimates the memory footprint of this Index, in bytes.
	Bytes() int

	// To be removed w/ tsi1.
	SetFieldName(measurement []byte, name string)

	Type() string
	// Returns a unique reference ID to the index instance.
	// For inmem, returns a reference to the backing Index, not ShardIndex.
	UniqueReferenceID() uintptr

	Rebuild()
}

看一下tsm1 engine結構體:

github.com\influxdata\influxdb\tsdb\engine\tsm1\engine.go

// Engine represents a storage engine with compressed blocks.
type Engine struct {
	mu sync.RWMutex

	index tsdb.Index  // 資料庫索引資訊,目前沒和儲存引擎放在一起,看起來後續會更改設計作為儲存引擎的一部分

	// The following group of fields is used to track the state of level compactions within the 以下欄位組用於跟蹤其中的級別壓縮狀態

	// Engine. The WaitGroup is used to monitor the compaction goroutines, the 'done' channel is used to signal those goroutines to shutdown. Every request to disable level compactions will call 'Wait' on 'wg', with the first goroutine to arrive (levelWorkers == 0 while holding the lock) will close the done channel and re-assign 'nil' to the variable. Re-enabling will decrease 'levelWorkers', and when it decreases to zero, level compactions will be started back up again.
	// WaitGroup用於監視壓縮goroutines,'done'通道用於通知那些goroutine要關閉。 每個禁用級別壓縮的請求都會在'wg'上呼叫'Wait',第一個goroutine到達(levelWorkers == 0,同時保持鎖定)將關閉done通道併為變數重新分配'nil'。 重新啟用會減少“levelWorkers”,當它減少到零時,級別壓縮將再次啟動。

	wg           *sync.WaitGroup // waitgroup for active level compaction goroutines
	done         chan struct{}   // channel to signal level compactions to stop
	levelWorkers int             // Number of "workers" that expect compactions to be in a disabled state  期望壓縮處於禁用狀態的work數量

	snapDone chan struct{}   // channel to signal snapshot compactions to stop
	snapWG   *sync.WaitGroup // waitgroup for running snapshot compactions

	id           uint64
	path         string
	sfile        *tsdb.SeriesFile
	logger       *zap.Logger // Logger to be used for important messages
	traceLogger  *zap.Logger // Logger to be used when trace-logging is on.
	traceLogging bool

	fieldset *tsdb.MeasurementFieldSet // 所有 measurement 對應的 fields 物件

	WAL            *WAL              // WAL 檔案物件
	Cache          *Cache            // WAL 檔案在記憶體中的快取
	Compactor      *Compactor        // 壓縮合並管理物件
	CompactionPlan CompactionPlanner
	FileStore      *FileStore         // 資料檔案物件

	MaxPointsPerBlock int

	// CacheFlushMemorySizeThreshold specifies the minimum size threshold for
	// the cache when the engine should write a snapshot to a TSM file
	CacheFlushMemorySizeThreshold uint64  // Cache 超過指定大小後內容會被寫入一個新的 TSM 檔案

	// CacheFlushWriteColdDuration specifies the length of time after which if
	// no writes have been committed to the WAL, the engine will write
	// a snapshot of the cache to a TSM file
	CacheFlushWriteColdDuration time.Duration   // Cache 超過多長時間後還沒有資料寫入,會將內容寫入新的 TSM 檔案

	// WALEnabled determines whether writes to the WAL are enabled.  If this is false,
	// writes will only exist in the cache and can be lost if a snapshot has not occurred.
	WALEnabled bool

	// Invoked when creating a backup file "as new".建立備份檔案“as new”時呼叫。
	formatFileName FormatFileNameFunc

	// Controls whether to enabled compactions when the engine is open 控制引擎開啟時是否啟用壓縮
	enableCompactionsOnOpen bool

	stats *EngineStatistics

	// Limiter for concurrent compactions.併發壓縮的限制器。
	compactionLimiter limiter.Fixed

	scheduler *scheduler

	// provides access to the total set of series IDs
	seriesIDSets tsdb.SeriesIDSets

	// seriesTypeMap maps a series key to field type
	seriesTypeMap *radix.Tree
}

tms1 儲存引擎設計,參考github.com\influxdata\influxdb\tsdb\engine\tsm1\DESIGN.md

A TSM file由四部分組成:header, blocks, index and the footer

Header由一個magic number和一個版本號組成

Blocks data組成結構:

Blocks data是由CRC32和資料的序列。塊資料對檔案是不透明的。 CRC32用於恢復,以確保塊由於我們控制之外的錯誤而未被破壞。塊的長度儲存在索引中。

Index組成結構:

索引由一系列索引條目組成,這些索引條目按鍵按字典順序排序,然後按時間排序。每個索引條目如上圖,每個塊條目由塊的最小和最大時間,塊所在檔案的偏移量以及塊的大小組成。

索引結構可以提供對所有塊的有效訪問,以及減少獲取塊資料的時間成本。給定key和時間戳,我們確切地知道哪個檔案包含該時間戳的塊以及該塊所在的位置以及要檢索塊的資料量。如果我們知道我們需要讀取檔案中的所有或多個塊,我們可以使用該大小來確定給定IO中的讀取量。

*儲存在塊資料中的塊長度可能會被刪除,因為我們將它儲存在索引中

Footer組成結構:

footer儲存索引開頭偏移量.