時序資料基礎

時序資料特點

 時序資料TimeSeries是一連串隨時間推移而發生變化的相關事件。

 以下圖的 CPU 監控資料為例,同個 IP 的相關監控資料組成了一條時序資料,不相關資料則分佈在不同的時間序列上。

 常見時序資料有:

  • 監控日誌:機器的 CPU 負載變化
  • 使用者行為:使用者在電商網站上的訪問記錄
  • 金融行情:股票的日內成交記錄

 這類資料具有以下特點:

  • 必然帶有時間戳,可能存在時效性
  • 資料量巨大,並且生成速度極快
  • 更關注資料變化的趨勢,而非資料本身

關係型資料庫的不足

 當面對時序資料時,傳統的關係型資料庫就顯得有些力不從心。

關係型資料庫模型 時序資料庫需求
資料按主鍵索引組織、儲存 資料按時間戳進行組織、儲存,便於按時間維度查詢
資料持久化後永久存在 資料具有的生命週期,定期清理過期資料
支援複雜的 OLTP 功能(點查、改、刪) 支援的 OLAP 操作(基於時間視窗)
併發修改加鎖,提供事務保證 Last Write Win 解決寫衝突,無需事務

 兩者之間的存在的衝突:

  • 如果主鍵設計的不好,時序資料的順序插入可能變為隨機寫入,影響寫入效能
  • 傳統關係型資料為提高資料生命週期管理功能,需要定時執行清理任務,避免磁碟空間耗盡
  • 關係型資料庫對事務的支援,對於時序資料來說略顯多餘,還會影響寫入能力

 除此之外,關係型資料庫還存在以下天然短板:

  • 需要通過分表分庫sharding實現橫向擴充套件
分庫分表引入額外複雜度,需要維護對映關係。
此時 SQL 語言的查詢優勢不復存在,多數查詢都會退化為 KV 查詢
  • 寫時模式schema on write靈活度不足
關係資料庫新增欄位屬於 DDL 操作,會導致鎖表鎖庫。
頻繁的庫表變更影響系統穩定,無法適應快速的業務變更。

Redis 的不足

 儲存時序資料庫的另一個挑戰就是其誇張的資料生成速度。以使用者行為資料為例,如果一個介面的QPS是1萬。就意味著一秒鐘內會生成1萬條使用者行為記錄。假設這樣的介面有100個,那麼每秒鐘生成的記錄數可達100萬。

 一種解決方式是將訊息積壓至 Kafka 這類中介軟體,然後進行非同步處理。但對於某些業務場景來說,這一處理方式則顯得不合時宜。以股票成交資料為例,為了保障行情的時效性,無法採用非同步批處理的方式實現。為了實現極高的寫入吞吐量,通常會考慮使用 Redis 實現這一功能。

 然而這一方案也存在以下問題:

  • redis 不適合儲存大 Key,刪除 Key 的記憶體釋放操作可能導致長時間的阻塞
  • 假設資料以 list 的形式儲存,執行 lrange 命令的時間複雜度為O(S+N),訪問效能較差
  • 記憶體空間有限,無法儲存大量的資料

時序資料庫

 時序資料庫是一類專門用於儲存時序資料的資料管理系統,這類資料庫的設計思想大致可以總結為下面幾條:

  • 使用特殊設計的外存索引來組織資料
  • 強制使用 timestamp 作為唯一的主鍵
  • 不檢查寫衝突,避免加鎖,提高寫入效能
  • 對按時間順序寫入進行優化,提高寫入效能
  • 不支援細粒度的資料刪除功能,提高查詢寫入效能
  • 犧牲強一致性來提高系統的查詢吞吐量,提高查詢效能
  • 提供基於時間視窗的 OLAP 操作,放棄關聯查詢等高階功能
  • 通過無模式schemaless設計使系統更易於水平擴充套件

時序資料模型

 類似於關係型資料庫,時序資料庫也有自己的資料模型,並且兩者直接存在不少相似之處:

關係模型 時序模型 含義
table metric / measurement 表 → 指標(時間序列集合)
column value / field 列 → 值(無索引列)
index tag 索引 → 標籤(索引列)
row point 記錄行 → 資料點(時間序列中某個時刻的資料)
primary key timestamp 行主鍵 → 點時間戳(時間序列內唯一標識)

 其中 tag 的概念較為重要:

  • tag 是一個字串型別的鍵值對
  • tag 並不是時序資料,而是元資料
  • tag 的唯一組合確定一個時間序列
  • tag 可以方便實現粗粒度的聚合
  • tag 決定了索引的組織形式
  • tag 組合的數量數量不宜過多

常見的時序資料庫

 時序資料庫排行榜中介紹了不少的時序資料庫,其中比較具有代表性的有以下兩款。

OpenTSDB

 OpenTSDB 是一種基於 HBase 來構建的分散式、可擴充套件的時間序列資料庫。OpenTSDB 被廣泛應用於儲存、索引和服務從大規模計算機系統(網路裝置、作業系統、應用程式)採集來的監控指標資料,並且使這些資料易於訪問和視覺化。

 OpenTSDB 由時間序列守護程式 (TSD) 以及一組命令列實用程式組成。每個 TSD 都是獨立的。 沒有主節點,沒有共享狀態。

  • 優點:

    TSD 是無狀態的,所有狀態資料儲存在 HBase 中,天然支援水平擴充套件

  • 缺點:

    • Hadoop 全家桶運維難度較大,需要專人負責

      • 新版本的 OpenTSDB 底層支援 Cassandra
    • 儲存模型過於簡化

      • 單值模型且只能儲存數值型別資料
      • 單個 metric 最多支援 8 個 tag key
    • 雖然利用了 HBase 作為底層儲存,但是沒有提供對 MapReduce 的支援。

InfluxDB

 時序資料庫 InfluxDB 是一款專門處理高寫入和查詢負載的時序資料庫,基於 InfluxDB 能夠快速構建具有海量時序資料處理能力的分析和監控軟體。

 該專案的發起者是 influxdata 公司,該公司提供了一套用於處理時序資料的完整解決方案,InfluxDB 是該解決方案中的核心產品。

  • 優點:

    • 開箱即用,運維簡單
    • 多值儲存模型、豐富的資料型別
    • 提供了類 SQL 的查詢語言
    • 獨創 TSM 索引
  • 缺點:

    • 開源版本不支援叢集部署,對於大規模應用來說,使用前需要慎重考慮

深入InfluxDB

資料模型

 InfluxDB 的資料模型已經很接近傳統的關係模型:

database 名稱空間,相互隔離
retention policy 儲存策略,定義資料生命週期
bucket database + retention policy
measurement 相關時間序列集合
tag 標籤索引,索引列
field 時序資料,無索引列
point 資料點,時間序列中某個時刻的資料
time 時間戳,時間序列內唯一標識

 保留策略retention policy 用於管理資料生命週期,其中包含了:

  • 持續時間duration:指定了資料保留時間,過期的資料將自動從資料庫中刪除

  • 副本個數replication factor:指定了叢集模式下,需要維護資料副本的個數(僅在叢集模式下有效)

  • 分片粒度hard duration):指定了 shard group 的時間跨度(影響資料分片大小)

 保留策略與 database 存在以下關係:

  • 一個 database 可以有多個 RP,每個 RP 只屬於一個 database 1:N
  • 建立 point 時可以指定 RP,一個 measurement 可以有不同的 RP N:N

 這意味著:

同個 measurement 可能存在兩個有著完全相同的 time 的 point。為了解決資料重複的問題,InfluxDB 2 引入了一個 bucket 的概念,用於避免這一情況。

Series

時間序列 Series 在 InfluxDB 中也是個核心概念:

series key measurement + tag + field key
series 時間序列,serial key 相同的資料集合
series cardinality 序列基數,series key 的數量

 為了唯一標識一個時間序列,InfluxDB 引入了 Serieskey 的概念:

每個資料點都有 SerieskeySerieskey 相同的資料點屬於同個時間序列,會儲存在同一個檔案中,方便後續查詢

 Serieskey 的數量稱為序列基數 series cardinality

序列基數是一個重要的效能指標,InfluxDB 會為每個 Serieskey 在記憶體中維護一個索引記錄,因此序列基數能夠直觀反映當前資料的記憶體壓力

 上圖的 series cardinality 為 4,其中包含以下 series key:

measurement + tags + field
census, location=1, scientist=langstroth, butterflier

census, location=1, scientist=langstroth, honeybees

census, location=1, scientist=perpetual, butterflier

census, location=1, scientist=perpetual, honeybees

 注意:即便兩條記錄的 measurement、time、tag、field 完全一致,但只要使用的是不同的 RP,那麼它們就屬於不同的 series,會被儲存在不同的 bucket 中。

查詢語言

InfluxDB 提供了兩種查詢語言:

  • InfluxQL:類 SQL 的宣告式查詢語言,同時具有 DDL 與 DML 功能
  • Flux:函式式查詢語言,僅支援 DML 功能,能支援複雜的查詢條件,但不支援增刪改操作

下面通過一些實際操作來熟悉一下 InfluxQL:

# 建立資料庫
CREATE DATABASE "sample_data"
USE sample_data # 插入樣例資料,格式參考:https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial
INSERT census,location=1,scientist=langstroth butterflies=12i,honeybees=23i 1439827200000000000
INSERT census,location=1,scientist=perpetua butterflies=1i,honeybees=30i 1439827200000000000
INSERT census,location=1,scientist=langstroth butterflies=11i,honeybees=28i 1439827560000000000
INSERT census,location=1,scientist=perpetua butterflies=3i,honeybees=28i 1439827560000000000
INSERT census,location=2,scientist=langstroth butterflies=2i,honeybees=11i 1439848440000000000
INSERT census,location=2,scientist=langstroth butterflies=1i,honeybees=10i 1439848800000000000
INSERT census,location=2,scientist=perpetua butterflies=8i,honeybees=23i 1439849160000000000
INSERT census,location=2,scientist=perpetua butterflies=7i,honeybees=22i 1439849520000000000 # 顯示資料庫中的表
# measurement 無需預先定義,由 InfluxDB 動態建立
SHOW MEASUREMENTS # 顯示資料庫中的 field key
SHOW FIELD KEYS # 顯示資料庫中的 tag key
SHOW TAG KEYS # 顯示資料庫中的 tag value
SHOW TAG VALUES WITH KEY = scientist # 查詢所有資料
SELECT * FROM census; # 對 location = 1 的資料求和
SELECT SUM(butterflies) AS butterflies, SUM(honeybees) AS honeybees FROM census WHERE location = '1'; # 刪除 location = 1 的資料
DELETE FROM census WHERE location = '1';
SELECT * FROM census; # 更新特定資料
SELECT * FROM census;
INSERT census,location=2,scientist=perpetua butterflies=10i,honeybees=50i 1439849520000000000
SELECT * FROM census; # 更新資料時要保證資料型別一致,否則會報錯
INSERT census,location=2,scientist=perpetua butterflies=abc,honeybees=efg 1439849520000000000 # 刪除資料庫
DROP DATABASE sample_data;

Flux 無命令列支援,只能通過 http 介面請求。有興趣可以參考下面的指令碼,動手嘗試一下:

curl -XPOST 127.0.0.1:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-H 'Authorization: Token root:123456' \
-d '
from(bucket:"sample_data")
|> range(start:time(v: 1439827200000), stop:time(v: 143984952000))
|> filter(fn:(r) => r._measurement == "census" and r.location == "1" and (r._field == "honeybees" or r._field == "butterflies"))
|> limit(n: 100)'

整體架構

在瞭解完查詢語言之後,接下來看看 InfluxDB 的整體架構:

 上圖將 InfluxDB 分為了 4 層,上面 database 與 RP 這兩層之前已經介紹過,我們重點關注下面兩層:

shard 儲存的時序資料的磁碟檔案
shard group shard 容器,責管理資料的生命週期,清除過期的資料
shard duration shard duration

 由於時序資料的資料量通常十分巨大,因此 InfluxDB 的設計中引入了分片的策略。並且同時採用了兩種分片策略:

  • shard group 層採用了基於時間的分片策略,方便實現按照時間條件範圍查詢
  • shard 層則是基於 hashmod 進行分片,避免出現寫熱點產生效能瓶頸

 每個 shard 由 WAL、Cache、TSM檔案 3 部分組成:

 整個資料的寫入流程簡化為 3 個步驟:

  1. 先寫入 WAL
  2. 然後寫入 Cache
  3. 最終持久化為 TSM File

WAL

 預寫日誌Write-Ahead-Log是一種常見的提高資料庫優化手段,能夠在保證資料安全的同時,提升系統的寫入效能。

 InfluxDB WAL 由一組定長的 segement 檔案構成,每個檔案大小約為 10MB。這些 segment 檔案只允許追加,不允許修改。

Cache

 Cache 是 WAL 的一個記憶體快照,保證 WAL 中的資料對使用者實時可見。

 當 Cache 空閒或者過滿時,對應的 WAL 將被壓縮並轉換為 TSM,最終釋放記憶體空間。

 每次重啟時會根據 WAL 重新構造 Cache。

TSM File

 TSM 是一組儲存在磁碟上的外存索引檔案,細節將在後續進行介紹。

 它們之間的關係可以簡單描述為:

  • Cache = WAL
  • Cache + TSM = 完整的資料

儲存引擎發展史

 在討論 TSM 之前,線回顧一下 InfluxDB 儲存引擎的發展歷程:

LSM tree 時代 (0.8.x)

  • 引擎:LevelDB、RocksDB、HyperLevelDB、LMDB
  • 優點:極高的寫入吞吐量,且支援資料壓縮
  • 缺點:
    • 懶刪除機制導致刪除操作耗時,且過期資料無法及時清理
    • 按照時間維度分庫可以規避上述問題,但是又會導致新問題:
      • 單個程序開啟過多的檔案導致控制代碼耗盡
      • 過多的 WAL 可能會令順序追加退化為隨機寫入

B+Tree 時代 (0.9.x)

  • 引擎:BoltDB
  • 優點:單檔案模型,穩定性更好,Go 實現更易嵌入
  • 缺點:
    • 檔案較大時,寫放大效應會導致 IOPS 會極劇上升
    • 通過在 Bolt 前嵌入自研的 WAL 模組緩解了這一問題:
      • 合併多個相鄰的寫入操作,減少 fsync
      • 將隨機寫入變為順序追加,減少寫放大

TSM-Tree 時代 (0.9.5)

  • 引擎:Time Structured Merge Tree
  • 特點:
    • 整體實現借鑑 LSM tree 架構,能有效規避寫放大
    • 更少的資料庫檔案,避免順序寫退化為隨機寫,不再出現檔案控制代碼耗盡的情況
    • 針對時序資料特性,採用了更具針對性的資料壓縮演算法

 關於 LSM-Tree 與 B-Tree 的寫放大分析,可以參考這篇文章:https://www.cnblogs.com/buttercup/p/12991585.html

TSM 解析

資料組織

 TSM 是一個列存引擎columnar storage,內部按照 SeriesKey 對時序資料進行組織:

  • 每個 SeriesKey 對應一個數組,裡面儲存著 time,value 構成的時間點資料
  • 同個 SeriesKey 的資料儲存在一起,不同的 SeriesKey 的資料分開儲存

 列存引擎的優勢

  • 高效處理動態 schema 與稀疏資料

    新增列時,對現有的資料無影響。並且由於不同列相互分離,可以直接忽略 null 值,不需要耗費空間儲存標記

  • 同類型的資料能夠進行高效的壓縮

    同個列的資料必然具有相同的資料型別,可以採取不同的壓縮手段進行優化。

  • 查詢時能減少不必要的 I/O

    查詢時能夠指定要返回的資料列,可以按需遍歷使用者指定的列,對於 OLAP 操作更友好。

 列存引擎的劣勢

  • 儲存稠密資料需要付出額外代價

    當多個列具有相同的時間戳時,timestamp 會被重複儲存。

  • 資料變更操作需要更多的 I/O

    列分開儲存後,查、改、刪操作可能要同時修改多個檔案。

  • 無法提供原子性操作,事務實現困難

    無法實現高效的悲觀鎖,如果使用場景中需要用到事務,建議使用行存引擎。

檔案格式

 TSM File 是一組列存格式的只讀檔案,每個檔案對應一組特定的 SeriesKey。

 每個檔案由 4 部分構成:

  • Header:幻數 + 版本號
  • DataBlock:時序資料塊
  • IndexBlock:時序資料索引
  • Footer:索引塊指標

 Block 與 Series 的對應關係:

  • 每個 IndexBlock 只屬於一個 Series
  • 每個 DataBlock 只儲存一個 Field 的資料

 DataBlock 的結構較為簡單,其中儲存了壓縮過的時序資料:

DataBlock
Type 資料型別
Length 時間戳資料長度
Timestamps 時間戳列表(壓縮後)
Values 的時序資料列表(壓縮後)

 IndexBlock 則較為複雜,每個 IndexBlock 由一個 Meta 和多個 Entry 構成:

IndexBlock
Index Meta 索引資訊
Index Entry 索引記錄列表
IndexMeta
Series Key 索引所屬的 Series
Index Entry Count 索引記錄數量
IndexEntry
Min Time 資料開始時間
Max Time 資料結束時間
Offset 資料塊的起始位置
Size 資料塊的長度

 Meta 中儲存了 IndexBlock 對應的 SeriesKey 以及對應的 Entry 數量。

 每個 Entry 對應一個 DataBlock,描述了這個 DataBlock 對應的時間區間,以及實際的儲存地址。

 當需要查詢 TSM 中的資料時,只需要將 IndexBlock 載入到記憶體中。就可以定位到相應的資料,提高查詢效率。

壓縮演算法

 InfluxDB 中的資料型別可以分為五種 timestampfloat, int, bool, string。為了達到最優的壓縮效果,InfluxDB 針對不同型別的資料,使用了不同的壓縮演算法。不過這些壓縮演算法的原理都大同小異:使用變長編碼來儲存資料有效位,避免儲存無效的 0 bit。

timestamp

 時序資料都是按照時間順序進行排序的,因此首先會使用 delta-delta 編碼精簡資料:

相鄰的兩個時間戳相減,減少了資料的有效位長度,有利於後續的壓縮

 若時間按固定區間分佈,優先使用遊程編碼run-length encoding進行壓縮:

如果時間戳間隔固定,則使用兩個 64bit 資料可以編碼264個時間戳

 若編碼後所有值均小於260,則使用simple8b編碼,將多個值打包進單個 64bit 整數中:

simple8b 將 64 位整數分為兩部分:

  • selector(4bit) 用於指定剩餘 60bit 中儲存的整數的個數與有效位長度
  • payload(60bit) 則是用於儲存多個定長的整數

根據一個查詢表,將資料模式匹配到最優的 selector,然後將多個數據編碼至 payload

 如果無法不滿足以上壓縮條件,則直接儲存原始資料。

float

 Facebook 工程師通過觀察時序資料,發現相鄰時序資料進行異或操作後,僅有中間一小部分發生了變化。

 根據這個規律,發明了一個簡單高效的浮點數壓縮演算法:先異或求值,然後儲存中間的有效資料。

 通過這一演算法,他們將浮點資料的平均儲存空間壓縮至 1.37 位元組。

 

 演算法過程可以參考這篇論文,或者直接參考下面的實現:

@Data
@Accessors(fluent = true)
@ToString
static class Block {
int leadingZero;
int tailingZero;
int blockSize;
long value; boolean valueOf(int i) {
Validate.isTrue(i < blockSize);
return ((value >>> (blockSize-1-i)) & 0x1) > 0;
} boolean fallInSameBlock(Block block) {
return block != null && block.leadingZero == leadingZero && block.tailingZero == tailingZero;
}
} static Block calcBlock(double x, double y) {
long a = Double.doubleToRawLongBits(x);
long b = Double.doubleToRawLongBits(y);
long xor = a ^ b; Block block = new Block().
leadingZero(Long.numberOfLeadingZeros(xor)).
tailingZero(Long.numberOfTrailingZeros(xor)); return block.value(xor >>> block.tailingZero()).
blockSize(block.value() == 0 ? 0 : 64 - block.leadingZero() - block.tailingZero());
} static Pair<Long, Pair<Integer, byte[]>> encode(double[] values) {
int offset = 0;
BitSet buffer = new BitSet(); boolean ctrlBit;
double previous = values[0];
Block prevBlock = null;
for (int n=1; n<values.length; n++) {
Block block = calcBlock(previous, values[n]);
if (block.value() == 0) {
buffer.clear(offset++);
} else {
buffer.set(offset++);
buffer.set(offset++, ctrlBit = ! block.fallInSameBlock(prevBlock));
if (ctrlBit) {
int leadingZero = block.leadingZero();
int blockSize = block.blockSize();
Validate.isTrue(leadingZero < (1 << 6));
Validate.isTrue(blockSize < (1 << 7));
for (int i = 5; i > 0; i--) {
buffer.set(offset++, ((leadingZero >> (i - 1)) & 0x1) > 0);
}
for (int i = 6; i > 0; i--) {
buffer.set(offset++, ((blockSize >> (i - 1)) & 0x1) > 0);
}
}
for (int i = 0; i < block.blockSize(); i++) {
buffer.set(offset++, block.valueOf(i));
}
}
previous = values[n];
prevBlock = block;
} return Pair.of(Double.doubleToLongBits(values[0]), Pair.of(offset, buffer.toByteArray()));
} static List<Double> decode(Pair<Long, Pair<Integer, byte[]>> data) { long previous = data.getLeft();
int dataLen = data.getRight().getKey();
BitSet buffer = BitSet.valueOf(data.getRight().getValue()); List<Double> values = new ArrayList<>();
values.add(Double.longBitsToDouble(previous)); int offset = 0;
Block blockMeta = null;
while (offset < dataLen) {
if (! buffer.get(offset++)) {
values.add(0d);
} else {
boolean ctrlBit = buffer.get(offset++);
if (ctrlBit) {
int leadingZero = 0;
int blockSize = 0;
for (int i = 0; i < 5; i++) {
leadingZero = (leadingZero << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
for (int i = 0; i < 6; i++) {
blockSize = (blockSize << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
blockMeta = new Block().leadingZero(leadingZero).blockSize(blockSize).
tailingZero(64 - leadingZero - blockSize);
}
Validate.notNull(blockMeta);
long value = 0;
for (int i = 0; i < blockMeta.blockSize(); i++) {
value = (value << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
previous ^= (value << blockMeta.tailingZero());
values.add(Double.longBitsToDouble(previous));
}
} Validate.isTrue(offset == dataLen);
return values;
} public static void main(String[] args) {
double[] values = new double[]{15.5, 14.0625, 3.25, 8.625, 13.1, 0, 25.5};
Pair<Long, Pair<Integer, byte[]>> data = encode(values);
System.out.println(data.getRight().getKey()); // 編碼後的資料長度,單位 bits
System.out.println(decode(data)); // 解碼後的資料
}

int

 對於整形資料,首先會使用 ZigZag 編碼精簡資料。

 然後嘗試使用 RLE 或 simple8b 對精簡後的資料進行壓縮。

 如果無法不滿足壓縮條件,則儲存原始資料。

bool

 直接使用 Bitmap 對資料進行編碼。

string

 將多個字串拼接在一起,然後使用 Snappy 進行壓縮

參考資料