1. 程式人生 > >LevelDB 原始碼解析之 Varint 編碼

LevelDB 原始碼解析之 Varint 編碼

> GitHub: https://github.com/storagezhang > > Emai: [email protected] > > 華為雲社群: https://bbs.huaweicloud.com/blogs/253047 > > LevelDB: https://github.com/google/leveldb # Varint 編碼 LevelDB 內部採用變長編碼,對資料進行壓縮,減少儲存空間,再採用 CRC 校驗資料。 整型資料是以 32(64) 位來表示的,以 32 位為例,儲存需要 4 個位元組。 如果一個整數的大小在 256 以內,那麼只需要一個位元組就可以儲存這個整數,可以節省 3 個位元組。 Varint 就是根據這種思想來序列化整數的,它是一種使用一個或多個位元組序列化整數的方法,會把整型資料編碼為變長位元組。 Varint 中的每個位元組都設定為最高有效位: - 如果該位為 0,表示結束,當前位元組的剩餘 7 位就是該資料的表示。 - 表示整數 1,需要一個位元組:0000 0001 - 如果該位為 1,表示後續的位元組也是該整型資料的一部分; - 表示整數 300,需要兩個位元組:1010 1100 0000 0010 這也表示 Varint 編碼後是按小端排序的。 > 位元組順序,又稱端序或尾序(英語:Endianness),在電腦科學領域中,指電腦記憶體中或在數字通訊鏈路中,組成多位元組的字的位元組的排列順序。 > > 位元組的排列方式有兩個通用規則。例如,將一個多位數的低位放在較小的地址處,高位放在較大的地址處,則稱**小端序**;反之則稱**大端序**。在網路應用中,位元組序是一個必須被考慮的因素,因為不同機器型別可能採用不同標準的位元組序,所以均按照網路標準轉化。 因此,32 位整型資料經過 Varint 編碼後佔用 1~5 個位元組(5 * 8 - 5 > 32),64 位整型資料編碼後佔用 1~10 個位元組(10 * 8 - 10 > 64)。 在實際場景中,由於小數字的使用率遠遠高於大數字,所以在大部分場景中,通過 Varint 編碼的資料都可以起到很好的壓縮效果。 # 編碼實現 `EncodeVarint64` 將 `uint64_t` 編碼為 Varint 型別的位元組流: ```c++ char* EncodeVarint64(char* dst, uint64_t v) { static const int B = 128; uint8_t* ptr = reinterpret_cast(dst); while (v >= B) { // B=128=0x80, v|B 表示在最高位上加 1 // *ptr 是 uint8_t 型別的,即每次取下 7 位資料 *(ptr++) = v | B; // 右移 7 位, 繼續處理後面的資料 v >>= 7; } // 處理最後一個位元組的小於 128 的資料 *(ptr++) = static_cast(v); return reinterpret_cast(ptr); } ``` `EncodeVarint32` 將 `uint32_t` 編碼為 Varint 型別的位元組流,其實現與 `EncodeVarint64` 類似,但是可能因為最多 5 個位元組,所以是硬編碼的: ```c++ char* EncodeVarint32(char* dst, uint32_t v) { uint8_t* ptr = reinterpret_cast(dst); static const int B = 128; if (v < (1 << 7)) { // v < 0x80,可以用 7 位表示,佔一個位元組 *(ptr++) = v; } else if (v < (1 << 14)) { // 0x80 <= v < 0x4000,可以用 14 位表示,佔兩個位元組 *(ptr++) = v | B; *(ptr++) = v >> 7; } else if (v < (1 << 21)) { // 0x4000 <= v < 0x200000,可以用 21 位表示,佔三個位元組 *(ptr++) = v | B; *(ptr++) = (v >> 7) | B; *(ptr++) = v >> 14; } else if (v < (1 << 28)) { // 0x200000 <= v < 0x10000000,可以用 28 位表示,佔四個位元組 *(ptr++) = v | B; *(ptr++) = (v >> 7) | B; *(ptr++) = (v >> 14) | B; *(ptr++) = v >> 21; } else { // 0x10000000 <= v < 0x100000000,可以用 35 位表示,佔五個位元組 *(ptr++) = v | B; *(ptr++) = (v >> 7) | B; *(ptr++) = (v >> 14) | B; *(ptr++) = (v >> 21) | B; *(ptr++) = v >> 28; } return reinterpret_cast(ptr); } ``` # 解碼實現 解碼就是編碼的逆過程,同樣是利用位運算進行。 `GetVarint64Ptr` 將輸入的 Varint 型別位元組流轉換成 `uint64_t` 整型資料: ```c++ const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) { uint64_t result = 0; for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) { uint64_t byte = *(reinterpret_cast(p)); p++; if (byte & 128) { // byte & 0x80 判斷最高有效位為 1 // byte & 0x7f:獲取 7 位有效資料 // (b & 0x7F) << shift:Varint 編碼是小端排序,每處理一個數據,都需要向高位移動 7 位 // result | ((byte & 127) << shift):連線高位資料和低位資料 result |= ((byte & 127) << shift); } else { // byte & 0x80 判斷最高有效位為 0,最後 7 位資料 result |= (byte << shift); *value = result; return reinterpret_cast(p); } } return nullptr; } ``` `GetVarint32Ptr` 與`GetVarint64Ptr` 演算法相同,唯一的區別在於對小於 128 的資料進行特判,如果小於則直接返回結果,這樣設計的原因是大部分數字都比 128 小,可以通過行內函數提高計算效率。 ```c++ inline const char* GetVarint32Ptr(const char* p, const char* limit, uint32_t* value) { if (p < limit) { uint32_t result = *(reinterpret_cast(p)); if ((result & 128) == 0) { *value = result; return p + 1; } } return GetVarint32PtrFallback(p, limit, value); } const char* GetVarint32PtrFallback(const char* p, const char* limit, uint32_t* value) { uint32_t result = 0; for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { uint32_t byte = *(reinterpret_cast(p)); p++; if (byte & 128) { result |= ((byte & 127) << shift); } else { result |= (byte << shift); *value = result; return reinterpret_cast(p); } } return nullptr