轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/531
文中程式碼位置: https://github.com/devYun/mycache
我在看一些優秀的開源庫的時候看到一個有意思的快取庫 fastcache,在它的介紹主要有以下幾點特點:
- 讀寫資料要快,即使在併發下;
- 即使在數 GB 的快取中,也要保持很好的效能,以及儘可能減少 GC 次數;
- 設計儘可能簡單;
本文會通過模仿它寫一個簡單的快取庫,從而研究其核心是如何實現這樣的目標的。希望各位能有所收穫。
設計思想
在專案中,我們經常會用到 Go 快取庫比如說 patrickmn/go-cache
庫。但很多快取庫其實都是用一個簡單的 Map 來存放資料,這些庫在使用的時候,當併發低,資料量少的時候是沒有問題的,但是在資料量比較大併發比較高的時候會延長 GC 時間,增加記憶體分配次數。
比如,我們使用一個簡單的例子:
func main() {
a := make(map[string]string, 1e9)
for i := 0; i < 10; i++ {
runtime.GC()
}
runtime.KeepAlive(a)
}
在這個例子中,預分配了大小是10億(1e9)
的 map,然後我們通過 gctrace
輸出一下 GC 情況:
做實驗的環境是 Linux,機器配置是 16C 8G ,想要更深入理解 GC,可以看這篇:《 Go語言GC實現原理及原始碼分析 https://www.luozhiyun.com/archives/475 》
[root@localhost gotest]# GODEBUG=gctrace=1 go run main.go
...
gc 6 @13.736s 17%: 0.010+1815+0.004 ms clock, 0.17+0/7254/21744+0.067 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 7 @15.551s 18%: 0.012+1796+0.005 ms clock, 0.20+0/7184/21537+0.082 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 8 @17.348s 19%: 0.008+1794+0.004 ms clock, 0.14+0/7176/21512+0.070 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 9 @19.143s 19%: 0.010+1819+0.005 ms clock, 0.16+0/7275/21745+0.085 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
上面展示了最後 5 次 GC 的情況,下面我們看看具體的含義是什麼:
gc 1 @0.004s 4%: 0.22+1.4+0.021 ms clock, 1.7+0.009/0.40/0.073+0.16 ms cpu, 4->5->1 MB, 5 MB goal, 8 P
gc 10 @20.963s 19%: 0.011+1844+0.004 ms clock, 0.18+0/7373/22057+0.076 ms cpu, 73984->73984->73984 MB, 147968 MB goal, 16 P (forced)
gc 10 :程式啟動以來第10次GC
@20.963s:距離程式啟動到現在的時間
19%:當目前為止,GC 的標記工作所用的CPU時間佔總CPU的百分比
垃圾回收的時間
0.011 ms:標記開始 STW 時間
1844 ms:併發標記時間
0.004 ms:標記終止 STW 時間
垃圾回收佔用cpu時間
0.18 ms:標記開始 STW 時間
0 ms:mutator assists佔用的時間
7373 ms:標記執行緒佔用的時間
22057 ms:idle mark workers佔用的時間
0.076 ms:標記終止 STW 時間
記憶體
73984 MB:標記開始前堆佔用大小
73984 MB:標記結束後堆佔用大小
73984 MB:標記完成後存活堆的大小
147968 MB goal:標記完成後正在使用的堆記憶體的目標大小
16 P:使用了多少處理器
可以從上面的輸出看到每次 GC 處理的時間非常的長,佔用的 CPU 資源也非常多。那麼造成這樣的原因是什麼呢?
string
實際上底層資料結構是由兩部分組成,其中包含指向位元組陣列的指標和陣列的大小:
type StringHeader struct {
Data uintptr
Len int
}
由於 StringHeader
中包含指標,所以每次 GC 的時候都會掃描每個指標,那麼在這個巨大的 map
中是包含了非常多的指標的,所以造成了巨大的資源消耗。
在上面的例子 map a
中資料大概是這樣儲存:
一個 map 中裡面有多個 bucket
,bucket
裡面有一個 bmap
陣列用來存放資料,但是由於 key
和 value
都是 string
型別的,所以在 GC 的時候還需要根據 StringHeader
中的 Data
指標掃描 string
資料。
對於這種情況,如果所有的string位元組都在一個單一記憶體片段中,我們就可以通過偏移來追蹤某個字串在這段記憶體中的開始和結束位置。通過追蹤偏移,我們不在需要在我們大陣列中儲存指標,GC也不在會被困擾。如下:
如同上面所示,如果我們將字串中的位元組資料拷貝到一個連續的位元組陣列 chunks
中,併為這個位元組陣列提前分配好記憶體,並且僅儲存字串在陣列中的偏移而不是指標。
除了上面所說的優化內容以外,還有其他的方法嗎?
其實我們還可以直接從系統 OS 中呼叫 mmap syscall
進行記憶體分配,這樣 GC 就永遠不會對這塊記憶體進行記憶體管理,因此也就不會掃描到它。如下:
func main() {
test := "hello syscall"
data, _ := syscall.Mmap(-1, 0, 13, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
p := (*[13]byte)(unsafe.Pointer(&data[0]))
for i := 0; i < 13; i++ {
p[i] = test[i]
}
fmt.Println(string(p[:]))
}
通過系統呼叫直接向 OS 申請了 13bytes 的記憶體,然後將一個字串寫入到申請的記憶體陣列中。
所以我們也可以通過提前向 OS 申請一塊記憶體,而不是用的時候才申請記憶體,減少頻繁的記憶體分配從而達到提高效能的目的。
原始碼實戰
API
我們在開發前先把這個庫的 API 定義一下:
func New
func New(maxBytes int) *Cache
建立一個 Cache 結構體,傳入預設的快取大小,單位是位元組。
func (*Cache) Get
func (c *Cache) Get(k []byte) []byte
獲取 Cache 中的值,傳入的引數是 byte 陣列。
func (*Cache) Set
func (c *Cache) Set(k, v []byte)
設定鍵值對到快取中,k 是鍵,v 是值,引數都是 byte 陣列。
結構體
const bucketsCount = 512
type Cache struct {
buckets [bucketsCount]bucket
}
type bucket struct {
// 讀寫鎖
mu sync.RWMutex
// 二維陣列,存放資料的地方,是一個環形連結串列
chunks [][]byte
// 索引字典
m map[uint64]uint64
// 索引值
idx uint64
// chunks 被重寫的次數,用來校驗環形連結串列中資料有效性
gen uint64
}
通過我們上面的分析,可以看到,實際上真正存放資料的地方是 chunks 二維陣列,在實現上是通過 m 欄位來對映索引路徑,根據 chunks 和 gen 兩個欄位來構建一個環形連結串列,環形連結串列每轉一圈 gen 就會加一。
初始化
func New(maxBytes int) *Cache {
if maxBytes <= 0 {
panic(fmt.Errorf("maxBytes must be greater than 0; got %d", maxBytes))
}
var c Cache
// 算出每個桶的大小
maxBucketBytes := uint64((maxBytes + bucketsCount - 1) / bucketsCount)
for i := range c.buckets[:] {
// 對桶進行初始化
c.buckets[i].Init(maxBucketBytes)
}
return &c
}
我們會設定一個 New 函式來初始化我們 Cache 結構體,在 Cache 結構體中會將快取的資料大小平均分配到每個桶中,然後對每個桶進行初始化。
const bucketSizeBits = 40
const maxBucketSize uint64 = 1 << bucketSizeBits
const chunkSize = 64 * 1024
func (b *bucket) Init(maxBytes uint64) {
if maxBytes == 0 {
panic(fmt.Errorf("maxBytes cannot be zero"))
}
// 我們這裡限制每個桶最大的大小是 1024 GB
if maxBytes >= maxBucketSize {
panic(fmt.Errorf("too big maxBytes=%d; should be smaller than %d", maxBytes, maxBucketSize))
}
// 初始化 Chunks 中每個 Chunk 大小為 64 KB,計算 chunk 數量
maxChunks := (maxBytes + chunkSize - 1) / chunkSize
b.chunks = make([][]byte, maxChunks)
b.m = make(map[uint64]uint64)
// 初始化 bucket 結構體
b.Reset()
}
在這裡會將桶裡面的記憶體按 chunk 進行分配,每個 chunk 佔用記憶體約為 64 KB。在最後會呼叫 bucket 的 Reset 方法對 bucket 結構體進行初始化。
func (b *bucket) Reset() {
b.mu.Lock()
chunks := b.chunks
// 遍歷 chunks
for i := range chunks {
// 將 chunk 中的記憶體歸還到快取中
putChunk(chunks[i])
chunks[i] = nil
}
// 刪除索引字典中所有的資料
bm := b.m
for k := range bm {
delete(bm, k)
}
b.idx = 0
b.gen = 1
b.mu.Unlock()
}
Reset 方法十分簡單,主要就是清空 chunks 陣列、刪除索引字典中所有的資料以及重置索引 idx 和 gen 的值。
在上面這個方法中有一個 putChunk ,其實這個就是直接操作我們提前向 OS 申請好的記憶體,相應的還有一個 getChunk 方法。下面我們具體看看 Chunk 的操作。
Chunk 操作
getChunk
const chunksPerAlloc = 1024
const chunkSize = 64 * 1024
var (
freeChunks []*[chunkSize]byte
freeChunksLock sync.Mutex
)
func getChunk() []byte {
freeChunksLock.Lock()
if len(freeChunks) == 0 {
// 分配 64 * 1024 * 1024 = 64 MB 記憶體
data, err := syscall.Mmap(-1, 0, chunkSize*chunksPerAlloc, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_ANON|syscall.MAP_PRIVATE)
if err != nil {
panic(fmt.Errorf("cannot allocate %d bytes via mmap: %s", chunkSize*chunksPerAlloc, err))
}
// 迴圈遍歷 data 資料
for len(data) > 0 {
//將從系統分配的記憶體分為 64 * 1024 = 64 KB 大小,存放到 freeChunks中
p := (*[chunkSize]byte)(unsafe.Pointer(&data[0]))
freeChunks = append(freeChunks, p)
data = data[chunkSize:]
}
}
//從 freeChunks 獲取最後一個元素
n := len(freeChunks) - 1
p := freeChunks[n]
freeChunks[n] = nil
freeChunks = freeChunks[:n]
freeChunksLock.Unlock()
return p[:]
}
初次呼叫 getChunk 函式時會使用系統呼叫分配 64MB 的記憶體,然後迴圈將記憶體切成 1024 份,每份 64KB 放入到 freeChunks 空閒列表中。然後獲取每次都獲取 freeChunks 空閒列表最後一個元素 64KB 記憶體返回。需要注意的是 getChunk 會下下面將要介紹到的 Cache 的 set 方法中使用到,所以需要考慮到併發問題,所以在這裡加了鎖。
putChunk
func putChunk(chunk []byte) {
if chunk == nil {
return
}
chunk = chunk[:chunkSize]
p := (*[chunkSize]byte)(unsafe.Pointer(&chunk[0]))
freeChunksLock.Lock()
freeChunks = append(freeChunks, p)
freeChunksLock.Unlock()
}
putChunk 函式就是將記憶體資料還回到 freeChunks 空閒列表中,會在 bucket 的 Reset 方法中被呼叫。
Set
const bucketsCount = 512
func (c *Cache) Set(k, v []byte) {
h := xxhash.Sum64(k)
idx := h % bucketsCount
c.buckets[idx].Set(k, v, h)
}
Set 方法裡面會根據 k 的值做一個 hash,然後取模對映到 buckets 桶中,這裡用的 hash 庫是 cespare/xxhash
。
最主要的還是 buckets 裡面的 Set 方法:
func (b *bucket) Set(k, v []byte, h uint64) {
// 限定 k v 大小不能超過 2bytes
if len(k) >= (1<<16) || len(v) >= (1<<16) {
return
}
// 4個byte 設定每條資料的資料頭
var kvLenBuf [4]byte
kvLenBuf[0] = byte(uint16(len(k)) >> 8)
kvLenBuf[1] = byte(len(k))
kvLenBuf[2] = byte(uint16(len(v)) >> 8)
kvLenBuf[3] = byte(len(v))
kvLen := uint64(len(kvLenBuf) + len(k) + len(v))
// 校驗一下大小
if kvLen >= chunkSize {
return
}
b.mu.Lock()
// 當前索引位置
idx := b.idx
// 存放完資料後索引的位置
idxNew := idx + kvLen
// 根據索引找到在 chunks 的位置
chunkIdx := idx / chunkSize
chunkIdxNew := idxNew / chunkSize
// 新的索引是否超過當前索引
// 因為還有chunkIdx等於chunkIdxNew情況,所以需要先判斷一下
if chunkIdxNew > chunkIdx {
// 校驗是否新索引已到chunks陣列的邊界
// 已到邊界,那麼迴圈連結串列從頭開始
if chunkIdxNew >= uint64(len(b.chunks)) {
idx = 0
idxNew = kvLen
chunkIdx = 0
b.gen++
// 當 gen 等於 1<<genSizeBits時,才會等於0
// 也就是用來限定 gen 的邊界為1<<genSizeBits
if b.gen&((1<<genSizeBits)-1) == 0 {
b.gen++
}
} else {
// 未到 chunks陣列的邊界,從下一個chunk開始
idx = chunkIdxNew * chunkSize
idxNew = idx + kvLen
chunkIdx = chunkIdxNew
}
// 重置 chunks[chunkIdx]
b.chunks[chunkIdx] = b.chunks[chunkIdx][:0]
}
chunk := b.chunks[chunkIdx]
if chunk == nil {
chunk = getChunk()
// 清空切片
chunk = chunk[:0]
}
// 將資料 append 到 chunk 中
chunk = append(chunk, kvLenBuf[:]...)
chunk = append(chunk, k...)
chunk = append(chunk, v...)
b.chunks[chunkIdx] = chunk
// 因為 idx 不能超過bucketSizeBits,所以用一個 uint64 同時表示gen和idx
// 所以高於bucketSizeBits位置表示gen
// 低於bucketSizeBits位置表示idx
b.m[h] = idx | (b.gen << bucketSizeBits)
b.idx = idxNew
b.mu.Unlock()
}
- 在這段程式碼開頭實際上我會限制鍵值的大小不能超過 2bytes;
- 然後將 2bytes 大小長度的鍵值封裝到 4bytes 的 kvLenBuf 作為資料頭,資料頭和鍵值的總長度是不能超過一個 chunk 長度,也就是
64 * 1024
; - 然後計算出原索引 chunkIdx 和新索引 chunkIdxNew,用來判斷這次新增的資料加上原來的資料有沒有超過一個 chunk 長度;
- 根據新的索引找到對應的 chunks 中的位置,然後將鍵值以及 kvLenBuf 追加到 chunk 後面;
- 設定新的 idx 以及 m 字典對應的值,m 字典中存放的是 gen 和 idx 通過取與的放置存放。
在 Set 一個鍵值對會有 4bytes 的 kvLenBuf 作為資料頭,後面的資料會接著 key 和 value ,在 kvLenBuf 中,前兩個 byte 分別代表了 key 長度的低位和高位;後兩個 byte 分別代表了 value 長度的低位和高位,資料圖大致如下:
下面舉個例子來看看是是如何利用 chunks 這個二維陣列來實現環形連結串列的。
我們在 bucket 的 Init 方法中會根據傳入 maxBytes 桶位元組數來設定 chunks 的長度大小,由於每個 chunk 大小都是 64 * 1024
bytes,那麼我們設定 3 * 64 * 1024
bytes 大小的桶,那麼 chunks 陣列長度就為 3。
如果當前算出 chunkIdx 在 chunks 陣列為 1 的位置,並且在 chunks[1] 的位置中,還剩下 6bytes 未被使用,那麼有如下幾種情況:
- 現在假設放入的鍵值長度都是 1byte,那麼在 chunks[1] 的位置中剩下的 6bytes 剛好可以放下;
- 現在假設放入的鍵值長度超過了 1byte,那麼在 chunks[1] 的位置中剩下的位置就放不下,只能放入到 chunks[2] 的位置中。
如果當前算出 chunkIdx 在 chunks 陣列為 2 的位置,並且現在 Set 一個鍵值,經過計算 chunkIdxNew 為 3,已經超過了 chunks 陣列長度,那麼會將索引重置,重新將資料從 chunks[0] 開始放置,並將 gen 加一,表示已經跑完一圈了。
Get
func (c *Cache) Get(dst, k []byte) []byte {
h := xxhash.Sum64(k)
idx := h % bucketsCount
dst, _ = c.buckets[idx].Get(dst, k, h, true)
return dst
}
這裡和 Set 方法是一樣的,首先是要找到對應的桶的位置,然後才去桶裡面拿資料。需要注意的是,這裡的 dst 可以從外部傳入一個切片,以達到減少重複分配返回值。
func (b *bucket) Get(dst, k []byte, h uint64,returnDst bool) ([]byte, bool) {
found := false
b.mu.RLock()
v := b.m[h]
bGen := b.gen & ((1 << genSizeBits) - 1)
if v > 0 {
// 高於bucketSizeBits位置表示gen
gen := v >> bucketSizeBits
// 低於bucketSizeBits位置表示idx
idx := v & ((1 << bucketSizeBits) - 1)
// 這裡說明chunks還沒被寫滿
if gen == bGen && idx < b.idx ||
// 這裡說明chunks已被寫滿,並且當前資料沒有被覆蓋
gen+1 == bGen && idx >= b.idx ||
// 這裡是邊界條件gen已是最大,並且chunks已被寫滿bGen從1開始,,並且當前資料沒有被覆蓋
gen == maxGen && bGen == 1 && idx >= b.idx {
chunkIdx := idx / chunkSize
// chunk 索引位置不能超過 chunks 陣列長度
if chunkIdx >= uint64(len(b.chunks)) {
goto end
}
// 找到資料所在的 chunk
chunk := b.chunks[chunkIdx]
// 通過取模找到該key 對應的資料在 chunk 中的位置
idx %= chunkSize
if idx+4 >= chunkSize {
goto end
}
// 前 4bytes 是資料頭
kvLenBuf := chunk[idx : idx+4]
// 通過資料頭算出鍵值的長度
keyLen := (uint64(kvLenBuf[0]) << 8) | uint64(kvLenBuf[1])
valLen := (uint64(kvLenBuf[2]) << 8) | uint64(kvLenBuf[3])
idx += 4
if idx+keyLen+valLen >= chunkSize {
goto end
}
// 如果鍵值是一致的,表示找到該資料
if string(k) == string(chunk[idx:idx+keyLen]) {
idx += keyLen
// 返回該鍵對應的值
if returnDst {
dst = append(dst, chunk[idx:idx+valLen]...)
}
found = true
}
}
}
end:
b.mu.RUnlock()
return dst, found
}
Get 方法主要是考慮環形連結串列的邊界問題。我們在 Set 方法中會將每一個 key 對應的 gen 和 idx 索引存放到 m 字典中,所以我們通過 hash 獲取 m 字典的值之後通過位運算就可以獲取到 gen 和 idx 索引。
找到 gen 和 idx 索引之後就是邊界條件的判斷了,用一個 if 條件來進行判斷:
gen == bGen && idx < b.idx
這裡是判斷如果是在環形連結串列的同一次迴圈中,那麼 key 對應的索引應該小於當前桶的索引;
gen+1 == bGen && idx >= b.idx
這裡表示當前桶已經進入到下一個迴圈中,所以需要判斷 key 對應的索引是不是大於當前索引,以表示當前 key 對應的值沒有被覆蓋;
gen == maxGen && bGen == 1 && idx >= b.idx
因為 gen 和 idx 索引要塞到 uint64 型別的欄位中,所以留給 gen 的最大值只有 maxGen = 1<< 24 -1
,超過了 maxGen 會讓 gen 從1開始。所以這裡如果 key 對應 gen 等於 maxGen ,那麼當前的 bGen 應該等於1,並且 key 對應的索引還應該大於當前 idx,這樣才這個鍵值對才不會被覆蓋。
判斷完邊界條件之後就會找到對應的 chunk ,然後取模後找到資料位置,通過偏移量找到並取出值。
Benchmark
下面我上一下過後的 Benchmark:
程式碼位置: https://github.com/devYun/mycache/blob/main/cache_timing_test.go
GOMAXPROCS=4 go test -bench='Set|Get' -benchtime=10s
goos: linux
goarch: amd64
pkg: gotest
// GoCache
BenchmarkGoCacheSet-4 836 14595822 ns/op 4.49 MB/s 2167340 B/op 65576 allocs/op
BenchmarkGoCacheGet-4 3093 3619730 ns/op 18.11 MB/s 5194 B/op 23 allocs/op
BenchmarkGoCacheSetGet-4 236 54379268 ns/op 2.41 MB/s 2345868 B/op 65679 allocs/op
// BigCache
BenchmarkBigCacheSet-4 1393 12763995 ns/op 5.13 MB/s 6691115 B/op 8 allocs/op
BenchmarkBigCacheGet-4 2526 4342561 ns/op 15.09 MB/s 650870 B/op 131074 allocs/op
BenchmarkBigCacheSetGet-4 1063 11180201 ns/op 11.72 MB/s 4778699 B/op 131081 allocs/op
// standard map
BenchmarkStdMapSet-4 1484 7299296 ns/op 8.98 MB/s 270603 B/op 65537 allocs/op
BenchmarkStdMapGet-4 4278 2480523 ns/op 26.42 MB/s 2998 B/op 15 allocs/op
BenchmarkStdMapSetGet-4 343 39367319 ns/op 3.33 MB/s 298764 B/op 65543 allocs/op
// sync.map
BenchmarkSyncMapSet-4 756 15951363 ns/op 4.11 MB/s 3420214 B/op 262320 allocs/op
BenchmarkSyncMapGet-4 11826 1010283 ns/op 64.87 MB/s 1075 B/op 33 allocs/op
BenchmarkSyncMapSetGet-4 1910 5507036 ns/op 23.80 MB/s 3412764 B/op 262213 allocs/op
PASS
ok gotest 215.182s
上面的測試是 GoCache、BigCache、Map、sync.Map 的情況。下面是本篇文章中所開發的快取庫的測試:
// myCachce
BenchmarkCacheSet-4 4371 2723208 ns/op 24.07 MB/s 1306 B/op 2 allocs/op
BenchmarkCacheGet-4 6003 1884611 ns/op 34.77 MB/s 951 B/op 1 allocs/op
BenchmarkCacheSetGet-4 2044 6611759 ns/op 19.82 MB/s 2797 B/op 5 allocs/op
可以看到記憶體分配是幾乎就不存在,操作速度在上面的庫中也是佼佼者的存在。
總結
在本文中根據其他快取庫,並分析瞭如果用 Map 作為快取所存在的問題,然後引出存在這個問題的原因,並提出解決方案;在我們的快取庫中,第一是通過使用索引加記憶體塊的方式來存放快取資料,再來是通過 OS 系統呼叫來進行記憶體分配讓我們的快取資料塊脫離了 GC 的控制,從而做到降低 GC 頻率提高併發的目的。
其實不只是快取庫,在我們的專案中當遇到需要使用大量的帶指標的資料結構並需要長時間保持引用的時候,也是需要注意這樣做可能會引發 GC 問題,從而給系統帶來隱患。
Reference
https://github.com/VictoriaMetrics/fastcache
Further Dangers of Large Heaps in Go https://syslog.ravelin.com/further-dangers-of-large-heaps-in-go-7a267b57d487
Avoiding high GC overhead with large heaps https://blog.gopheracademy.com/advent-2018/avoid-gc-overhead-large-heaps/
Go 的 GC 如何調優?https://www.bookstack.cn/read/qcrao-Go-Questions/spilt.14.GC-GC.md