時序資料基礎
時序資料特點
時序資料TimeSeries
是一連串隨時間推移而發生變化的相關事件。
以下圖的 CPU 監控資料為例,同個 IP 的相關監控資料組成了一條時序資料,不相關資料則分佈在不同的時間序列上。
常見時序資料有:
- 監控日誌:機器的 CPU 負載變化
- 使用者行為:使用者在電商網站上的訪問記錄
- 金融行情:股票的日內成交記錄
這類資料具有以下特點:
- 必然帶有時間戳,可能存在時效性
- 資料量巨大,並且生成速度極快
- 更關注資料變化的趨勢,而非資料本身
關係型資料庫的不足
當面對時序資料時,傳統的關係型資料庫就顯得有些力不從心。
關係型資料庫模型 | 時序資料庫需求 |
---|---|
資料按主鍵索引組織、儲存 | 資料按時間戳進行組織、儲存,便於按時間維度查詢 |
資料持久化後永久存在 | 資料具有的生命週期,定期清理過期資料 |
支援複雜的 OLTP 功能(點查、改、刪) | 支援的 OLAP 操作(基於時間視窗) |
併發修改加鎖,提供事務保證 | Last Write Win 解決寫衝突,無需事務 |
兩者之間的存在的衝突:
- 如果主鍵設計的不好,時序資料的順序插入可能變為隨機寫入,影響寫入效能
- 傳統關係型資料為提高資料生命週期管理功能,需要定時執行清理任務,避免磁碟空間耗盡
- 關係型資料庫對事務的支援,對於時序資料來說略顯多餘,還會影響寫入能力
除此之外,關係型資料庫還存在以下天然短板:
- 需要通過分表分庫
sharding
實現橫向擴充套件
此時 SQL 語言的查詢優勢不復存在,多數查詢都會退化為 KV 查詢
- 寫時模式
schema on write
靈活度不足
頻繁的庫表變更影響系統穩定,無法適應快速的業務變更。
Redis 的不足
儲存時序資料庫的另一個挑戰就是其誇張的資料生成速度。以使用者行為資料為例,如果一個介面的QPS是1萬。就意味著一秒鐘內會生成1萬條使用者行為記錄。假設這樣的介面有100個,那麼每秒鐘生成的記錄數可達100萬。
一種解決方式是將訊息積壓至 Kafka 這類中介軟體,然後進行非同步處理。但對於某些業務場景來說,這一處理方式則顯得不合時宜。以股票成交資料為例,為了保障行情的時效性,無法採用非同步批處理的方式實現。為了實現極高的寫入吞吐量,通常會考慮使用 Redis 實現這一功能。
然而這一方案也存在以下問題:
- redis 不適合儲存大 Key,刪除 Key 的記憶體釋放操作可能導致長時間的阻塞
- 假設資料以 list 的形式儲存,執行 lrange 命令的時間複雜度為O(S+N),訪問效能較差
- 記憶體空間有限,無法儲存大量的資料
時序資料庫
時序資料庫是一類專門用於儲存時序資料的資料管理系統,這類資料庫的設計思想大致可以總結為下面幾條:
- 使用特殊設計的外存索引來組織資料
- 強制使用 timestamp 作為唯一的主鍵
- 不檢查寫衝突,避免加鎖,提高寫入效能
- 對按時間順序寫入進行優化,提高寫入效能
- 不支援細粒度的資料刪除功能,提高查詢寫入效能
- 犧牲強一致性來提高系統的查詢吞吐量,提高查詢效能
- 提供基於時間視窗的 OLAP 操作,放棄關聯查詢等高階功能
- 通過無模式
schemaless
設計使系統更易於水平擴充套件
時序資料模型
類似於關係型資料庫,時序資料庫也有自己的資料模型,並且兩者直接存在不少相似之處:
關係模型 | 時序模型 | 含義 |
---|---|---|
table | metric / measurement | 表 → 指標(時間序列集合) |
column | value / field | 列 → 值(無索引列) |
index | tag | 索引 → 標籤(索引列) |
row | point | 記錄行 → 資料點(時間序列中某個時刻的資料) |
primary key | timestamp | 行主鍵 → 點時間戳(時間序列內唯一標識) |
其中 tag 的概念較為重要:
- tag 是一個字串型別的鍵值對
- tag 並不是時序資料,而是元資料
- tag 的唯一組合確定一個時間序列
- tag 可以方便實現粗粒度的聚合
- tag 決定了索引的組織形式
- tag 組合的數量數量不宜過多
常見的時序資料庫
時序資料庫排行榜中介紹了不少的時序資料庫,其中比較具有代表性的有以下兩款。
OpenTSDB
OpenTSDB 是一種基於 HBase 來構建的分散式、可擴充套件的時間序列資料庫。OpenTSDB 被廣泛應用於儲存、索引和服務從大規模計算機系統(網路裝置、作業系統、應用程式)採集來的監控指標資料,並且使這些資料易於訪問和視覺化。
OpenTSDB 由時間序列守護程式 (TSD) 以及一組命令列實用程式組成。每個 TSD 都是獨立的。 沒有主節點,沒有共享狀態。
優點:
TSD 是無狀態的,所有狀態資料儲存在 HBase 中,天然支援水平擴充套件缺點:
Hadoop 全家桶運維難度較大,需要專人負責
- 新版本的 OpenTSDB 底層支援 Cassandra
儲存模型過於簡化
- 單值模型且只能儲存數值型別資料
- 單個 metric 最多支援 8 個 tag key
雖然利用了 HBase 作為底層儲存,但是沒有提供對 MapReduce 的支援。
InfluxDB
時序資料庫 InfluxDB 是一款專門處理高寫入和查詢負載的時序資料庫,基於 InfluxDB 能夠快速構建具有海量時序資料處理能力的分析和監控軟體。
該專案的發起者是 influxdata 公司,該公司提供了一套用於處理時序資料的完整解決方案,InfluxDB 是該解決方案中的核心產品。
優點:
- 開箱即用,運維簡單
- 多值儲存模型、豐富的資料型別
- 提供了類 SQL 的查詢語言
- 獨創 TSM 索引
缺點:
- 開源版本不支援叢集部署,對於大規模應用來說,使用前需要慎重考慮
深入InfluxDB
資料模型
InfluxDB 的資料模型已經很接近傳統的關係模型:
|
![]() |
保留策略retention policy
用於管理資料生命週期,其中包含了:
持續時間
duration
:指定了資料保留時間,過期的資料將自動從資料庫中刪除副本個數
replication factor
:指定了叢集模式下,需要維護資料副本的個數(僅在叢集模式下有效)分片粒度
hard duration)
:指定了 shard group 的時間跨度(影響資料分片大小)
保留策略與 database 存在以下關係:
- 一個 database 可以有多個 RP,每個 RP 只屬於一個 database
1:N
- 建立 point 時可以指定 RP,一個 measurement 可以有不同的 RP
N:N
這意味著:
Series
時間序列 Series 在 InfluxDB 中也是個核心概念:
series key | measurement + tag + field key |
series | 時間序列,serial key 相同的資料集合 |
series cardinality | 序列基數,series key 的數量 |
為了唯一標識一個時間序列,InfluxDB 引入了 Serieskey 的概念:
Serieskey 的數量稱為序列基數 series cardinality
:
上圖的 series cardinality 為 4,其中包含以下 series key:
measurement + tags + field |
---|
census, location=1, scientist=langstroth, butterflier census, location=1, scientist=langstroth, honeybees census, location=1, scientist=perpetual, butterflier census, location=1, scientist=perpetual, honeybees |
注意:即便兩條記錄的 measurement、time、tag、field 完全一致,但只要使用的是不同的 RP,那麼它們就屬於不同的 series,會被儲存在不同的 bucket 中。
查詢語言
InfluxDB 提供了兩種查詢語言:
- InfluxQL:類 SQL 的宣告式查詢語言,同時具有 DDL 與 DML 功能
- Flux:函式式查詢語言,僅支援 DML 功能,能支援複雜的查詢條件,但不支援增刪改操作
下面通過一些實際操作來熟悉一下 InfluxQL:
# 建立資料庫
CREATE DATABASE "sample_data"
USE sample_data
# 插入樣例資料,格式參考:https://docs.influxdata.com/influxdb/v1.8/write_protocols/line_protocol_tutorial
INSERT census,location=1,scientist=langstroth butterflies=12i,honeybees=23i 1439827200000000000
INSERT census,location=1,scientist=perpetua butterflies=1i,honeybees=30i 1439827200000000000
INSERT census,location=1,scientist=langstroth butterflies=11i,honeybees=28i 1439827560000000000
INSERT census,location=1,scientist=perpetua butterflies=3i,honeybees=28i 1439827560000000000
INSERT census,location=2,scientist=langstroth butterflies=2i,honeybees=11i 1439848440000000000
INSERT census,location=2,scientist=langstroth butterflies=1i,honeybees=10i 1439848800000000000
INSERT census,location=2,scientist=perpetua butterflies=8i,honeybees=23i 1439849160000000000
INSERT census,location=2,scientist=perpetua butterflies=7i,honeybees=22i 1439849520000000000
# 顯示資料庫中的表
# measurement 無需預先定義,由 InfluxDB 動態建立
SHOW MEASUREMENTS
# 顯示資料庫中的 field key
SHOW FIELD KEYS
# 顯示資料庫中的 tag key
SHOW TAG KEYS
# 顯示資料庫中的 tag value
SHOW TAG VALUES WITH KEY = scientist
# 查詢所有資料
SELECT * FROM census;
# 對 location = 1 的資料求和
SELECT SUM(butterflies) AS butterflies, SUM(honeybees) AS honeybees FROM census WHERE location = '1';
# 刪除 location = 1 的資料
DELETE FROM census WHERE location = '1';
SELECT * FROM census;
# 更新特定資料
SELECT * FROM census;
INSERT census,location=2,scientist=perpetua butterflies=10i,honeybees=50i 1439849520000000000
SELECT * FROM census;
# 更新資料時要保證資料型別一致,否則會報錯
INSERT census,location=2,scientist=perpetua butterflies=abc,honeybees=efg 1439849520000000000
# 刪除資料庫
DROP DATABASE sample_data;
Flux 無命令列支援,只能通過 http 介面請求。有興趣可以參考下面的指令碼,動手嘗試一下:
curl -XPOST 127.0.0.1:8086/api/v2/query -sS \
-H 'Accept:application/csv' \
-H 'Content-type:application/vnd.flux' \
-H 'Authorization: Token root:123456' \
-d '
from(bucket:"sample_data")
|> range(start:time(v: 1439827200000), stop:time(v: 143984952000))
|> filter(fn:(r) => r._measurement == "census" and r.location == "1" and (r._field == "honeybees" or r._field == "butterflies"))
|> limit(n: 100)'
整體架構
在瞭解完查詢語言之後,接下來看看 InfluxDB 的整體架構:
上圖將 InfluxDB 分為了 4 層,上面 database 與 RP 這兩層之前已經介紹過,我們重點關注下面兩層:
shard | 儲存的時序資料的磁碟檔案 |
shard group | shard 容器,責管理資料的生命週期,清除過期的資料 |
shard duration | shard duration |
由於時序資料的資料量通常十分巨大,因此 InfluxDB 的設計中引入了分片的策略。並且同時採用了兩種分片策略:
- shard group 層採用了基於時間的分片策略,方便實現按照時間條件範圍查詢
- shard 層則是基於 hashmod 進行分片,避免出現寫熱點產生效能瓶頸
每個 shard 由 WAL、Cache、TSM檔案 3 部分組成:
整個資料的寫入流程簡化為 3 個步驟:
- 先寫入 WAL
- 然後寫入 Cache
- 最終持久化為 TSM File
WAL
預寫日誌Write-Ahead-Log
是一種常見的提高資料庫優化手段,能夠在保證資料安全的同時,提升系統的寫入效能。
InfluxDB WAL 由一組定長的 segement 檔案構成,每個檔案大小約為 10MB。這些 segment 檔案只允許追加,不允許修改。
Cache
Cache 是 WAL 的一個記憶體快照,保證 WAL 中的資料對使用者實時可見。
當 Cache 空閒或者過滿時,對應的 WAL 將被壓縮並轉換為 TSM,最終釋放記憶體空間。
每次重啟時會根據 WAL 重新構造 Cache。
TSM File
TSM 是一組儲存在磁碟上的外存索引檔案,細節將在後續進行介紹。
它們之間的關係可以簡單描述為:
- Cache = WAL
- Cache + TSM = 完整的資料
儲存引擎發展史
在討論 TSM 之前,線回顧一下 InfluxDB 儲存引擎的發展歷程:
LSM tree 時代 (0.8.x)
- 引擎:LevelDB、RocksDB、HyperLevelDB、LMDB
- 優點:極高的寫入吞吐量,且支援資料壓縮
- 缺點:
- 懶刪除機制導致刪除操作耗時,且過期資料無法及時清理
- 按照時間維度分庫可以規避上述問題,但是又會導致新問題:
- 單個程序開啟過多的檔案導致控制代碼耗盡
- 過多的 WAL 可能會令順序追加退化為隨機寫入
B+Tree 時代 (0.9.x)
- 引擎:BoltDB
- 優點:單檔案模型,穩定性更好,Go 實現更易嵌入
- 缺點:
- 檔案較大時,寫放大效應會導致 IOPS 會極劇上升
- 通過在 Bolt 前嵌入自研的 WAL 模組緩解了這一問題:
- 合併多個相鄰的寫入操作,減少 fsync
- 將隨機寫入變為順序追加,減少寫放大
TSM-Tree 時代 (0.9.5)
- 引擎:Time Structured Merge Tree
- 特點:
- 整體實現借鑑 LSM tree 架構,能有效規避寫放大
- 更少的資料庫檔案,避免順序寫退化為隨機寫,不再出現檔案控制代碼耗盡的情況
- 針對時序資料特性,採用了更具針對性的資料壓縮演算法
關於 LSM-Tree 與 B-Tree 的寫放大分析,可以參考這篇文章:https://www.cnblogs.com/buttercup/p/12991585.html
TSM 解析
資料組織
TSM 是一個列存引擎columnar storage
,內部按照 SeriesKey 對時序資料進行組織:
- 每個 SeriesKey 對應一個數組,裡面儲存著 time,value 構成的時間點資料
- 同個 SeriesKey 的資料儲存在一起,不同的 SeriesKey 的資料分開儲存
列存引擎的優勢
高效處理動態 schema 與稀疏資料
新增列時,對現有的資料無影響。並且由於不同列相互分離,可以直接忽略 null 值,不需要耗費空間儲存標記
同類型的資料能夠進行高效的壓縮
同個列的資料必然具有相同的資料型別,可以採取不同的壓縮手段進行優化。
查詢時能減少不必要的 I/O
查詢時能夠指定要返回的資料列,可以按需遍歷使用者指定的列,對於 OLAP 操作更友好。
列存引擎的劣勢
儲存稠密資料需要付出額外代價
當多個列具有相同的時間戳時,timestamp 會被重複儲存。
資料變更操作需要更多的 I/O
列分開儲存後,查、改、刪操作可能要同時修改多個檔案。
無法提供原子性操作,事務實現困難
無法實現高效的悲觀鎖,如果使用場景中需要用到事務,建議使用行存引擎。
檔案格式
TSM File 是一組列存格式的只讀檔案,每個檔案對應一組特定的 SeriesKey。
每個檔案由 4 部分構成:
- Header:幻數 + 版本號
- DataBlock:時序資料塊
- IndexBlock:時序資料索引
- Footer:索引塊指標
Block 與 Series 的對應關係:
- 每個 IndexBlock 只屬於一個 Series
- 每個 DataBlock 只儲存一個 Field 的資料
DataBlock 的結構較為簡單,其中儲存了壓縮過的時序資料:
DataBlock | |
---|---|
Type | 資料型別 |
Length | 時間戳資料長度 |
Timestamps | 時間戳列表(壓縮後) |
Values | 的時序資料列表(壓縮後) |
IndexBlock 則較為複雜,每個 IndexBlock 由一個 Meta 和多個 Entry 構成:
|
|
|
Meta 中儲存了 IndexBlock 對應的 SeriesKey 以及對應的 Entry 數量。
每個 Entry 對應一個 DataBlock,描述了這個 DataBlock 對應的時間區間,以及實際的儲存地址。
當需要查詢 TSM 中的資料時,只需要將 IndexBlock 載入到記憶體中。就可以定位到相應的資料,提高查詢效率。
壓縮演算法
InfluxDB 中的資料型別可以分為五種 timestamp
,float
, int
, bool
, string
。為了達到最優的壓縮效果,InfluxDB 針對不同型別的資料,使用了不同的壓縮演算法。不過這些壓縮演算法的原理都大同小異:使用變長編碼來儲存資料有效位,避免儲存無效的 0 bit。
timestamp
時序資料都是按照時間順序進行排序的,因此首先會使用 delta-delta
編碼精簡資料:
若時間按固定區間分佈,優先使用遊程編碼run-length encoding
進行壓縮:
264
個時間戳 若編碼後所有值均小於260
,則使用simple8b
編碼,將多個值打包進單個 64bit 整數中:
- selector(4bit) 用於指定剩餘 60bit 中儲存的整數的個數與有效位長度
- payload(60bit) 則是用於儲存多個定長的整數
根據一個查詢表,將資料模式匹配到最優的 selector,然後將多個數據編碼至 payload
如果無法不滿足以上壓縮條件,則直接儲存原始資料。
float
Facebook 工程師通過觀察時序資料,發現相鄰時序資料進行異或操作後,僅有中間一小部分發生了變化。
根據這個規律,發明了一個簡單高效的浮點數壓縮演算法:先異或求值,然後儲存中間的有效資料。
通過這一演算法,他們將浮點資料的平均儲存空間壓縮至 1.37 位元組。
演算法過程可以參考這篇論文,或者直接參考下面的實現:
@Data
@Accessors(fluent = true)
@ToString
static class Block {
int leadingZero;
int tailingZero;
int blockSize;
long value;
boolean valueOf(int i) {
Validate.isTrue(i < blockSize);
return ((value >>> (blockSize-1-i)) & 0x1) > 0;
}
boolean fallInSameBlock(Block block) {
return block != null && block.leadingZero == leadingZero && block.tailingZero == tailingZero;
}
}
static Block calcBlock(double x, double y) {
long a = Double.doubleToRawLongBits(x);
long b = Double.doubleToRawLongBits(y);
long xor = a ^ b;
Block block = new Block().
leadingZero(Long.numberOfLeadingZeros(xor)).
tailingZero(Long.numberOfTrailingZeros(xor));
return block.value(xor >>> block.tailingZero()).
blockSize(block.value() == 0 ? 0 : 64 - block.leadingZero() - block.tailingZero());
}
static Pair<Long, Pair<Integer, byte[]>> encode(double[] values) {
int offset = 0;
BitSet buffer = new BitSet();
boolean ctrlBit;
double previous = values[0];
Block prevBlock = null;
for (int n=1; n<values.length; n++) {
Block block = calcBlock(previous, values[n]);
if (block.value() == 0) {
buffer.clear(offset++);
} else {
buffer.set(offset++);
buffer.set(offset++, ctrlBit = ! block.fallInSameBlock(prevBlock));
if (ctrlBit) {
int leadingZero = block.leadingZero();
int blockSize = block.blockSize();
Validate.isTrue(leadingZero < (1 << 6));
Validate.isTrue(blockSize < (1 << 7));
for (int i = 5; i > 0; i--) {
buffer.set(offset++, ((leadingZero >> (i - 1)) & 0x1) > 0);
}
for (int i = 6; i > 0; i--) {
buffer.set(offset++, ((blockSize >> (i - 1)) & 0x1) > 0);
}
}
for (int i = 0; i < block.blockSize(); i++) {
buffer.set(offset++, block.valueOf(i));
}
}
previous = values[n];
prevBlock = block;
}
return Pair.of(Double.doubleToLongBits(values[0]), Pair.of(offset, buffer.toByteArray()));
}
static List<Double> decode(Pair<Long, Pair<Integer, byte[]>> data) {
long previous = data.getLeft();
int dataLen = data.getRight().getKey();
BitSet buffer = BitSet.valueOf(data.getRight().getValue());
List<Double> values = new ArrayList<>();
values.add(Double.longBitsToDouble(previous));
int offset = 0;
Block blockMeta = null;
while (offset < dataLen) {
if (! buffer.get(offset++)) {
values.add(0d);
} else {
boolean ctrlBit = buffer.get(offset++);
if (ctrlBit) {
int leadingZero = 0;
int blockSize = 0;
for (int i = 0; i < 5; i++) {
leadingZero = (leadingZero << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
for (int i = 0; i < 6; i++) {
blockSize = (blockSize << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
blockMeta = new Block().leadingZero(leadingZero).blockSize(blockSize).
tailingZero(64 - leadingZero - blockSize);
}
Validate.notNull(blockMeta);
long value = 0;
for (int i = 0; i < blockMeta.blockSize(); i++) {
value = (value << 1) | (buffer.get(offset++) ? 0x1 : 0x0);
}
previous ^= (value << blockMeta.tailingZero());
values.add(Double.longBitsToDouble(previous));
}
}
Validate.isTrue(offset == dataLen);
return values;
}
public static void main(String[] args) {
double[] values = new double[]{15.5, 14.0625, 3.25, 8.625, 13.1, 0, 25.5};
Pair<Long, Pair<Integer, byte[]>> data = encode(values);
System.out.println(data.getRight().getKey()); // 編碼後的資料長度,單位 bits
System.out.println(decode(data)); // 解碼後的資料
}
int
對於整形資料,首先會使用 ZigZag 編碼精簡資料。
然後嘗試使用 RLE 或 simple8b 對精簡後的資料進行壓縮。
如果無法不滿足壓縮條件,則儲存原始資料。
bool
直接使用 Bitmap 對資料進行編碼。
string
將多個字串拼接在一起,然後使用 Snappy 進行壓縮
參考資料
InfluxDB
壓縮編碼