Kubernetes client-go Indexer / ThreadSafeStore 原始碼分析
 
請閱讀原文:原文地址
 

概述

原始碼版本資訊

  • Project: kubernetes
  • Branch: master
  • Last commit id: d25d741c
  • Date: 2021-09-26

Indexer 主要依賴於 ThreadSafeStore 實現,是 client-go 提供的一種快取機制,通過檢索本地快取可以有效降低 apiserver 的壓力,我們在自定義控制器裡呼叫一個 List() 方法帶一個 ListOption 本質就是加上條件檢索了 Indexer,掌握 Indexer 的實現可以在 Operator 編碼時更加心裡有底。

Indexer 介面

Indexer 介面主要是在 Store 介面的基礎上拓展了物件的檢索功能

  • client-go/tools/cache/index.go:35
1
2
3
4
5
6
7
8
9
type Indexer interface {
Store
Index(indexName string, obj interface{}) ([]interface{}, error) // 根據索引名和給定的物件返回符合條件的所有物件
IndexKeys(indexName, indexedValue string) ([]string, error) // 根據索引名和索引值返回符合條件的所有物件的 key
ListIndexFuncValues(indexName string) []string // 列出索引函式計算出來的所有索引值
ByIndex(indexName, indexedValue string) ([]interface{}, error) // 根據索引名和索引值返回符合條件的所有物件
GetIndexers() Indexers // 獲取所有的 Indexers,對應 map[string]IndexFunc 型別
AddIndexers(newIndexers Indexers) error // 這個方法要在資料加入儲存前呼叫,新增更多的索引方法,預設只通過 namespace 檢索
}

Indexer 的預設實現是 cache

1
2
3
4
type cache struct {
cacheStorage ThreadSafeStore
keyFunc KeyFunc
}

cache 對應兩個方法體實現完全一樣的 New 函式:

 1
2
3
4
5
6
7
8
9
10
11
12
13
func NewStore(keyFunc KeyFunc) Store {
return &cache{
cacheStorage: NewThreadSafeStore(Indexers{}, Indices{}),
keyFunc: keyFunc,
}
} func NewIndexer(keyFunc KeyFunc, indexers Indexers) Indexer {
return &cache{
cacheStorage: NewThreadSafeStore(indexers, Indices{}),
keyFunc: keyFunc,
}
}

這裡涉及到兩個型別:

  • KeyFunc
  • ThreadSafeStore

我們先看一下 Indexer 的 Add()、Update() 等方法是怎麼實現的:

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (c *cache) Add(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Add(key, obj)
return nil
} func (c *cache) Update(obj interface{}) error {
key, err := c.keyFunc(obj)
if err != nil {
return KeyError{obj, err}
}
c.cacheStorage.Update(key, obj)
return nil
}

可以看到這裡的邏輯就是呼叫 keyFunc() 方法獲取 key,然後呼叫 cacheStorage.Xxx() 方法完成對應增刪改查過程。KeyFunc 型別時這樣定義的:

1
type KeyFunc func(obj interface{}) (string, error)

也就是給一個物件,返回一個字串型別的 key。KeyFunc 的一個預設實現如下:

 1
2
3
4
5
6
7
8
9
10
11
12
13
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok {
return string(key), nil
}
meta, err := meta.Accessor(obj)
if err != nil {
return "", fmt.Errorf("object has no meta: %v", err)
}
if len(meta.GetNamespace()) > 0 {
return meta.GetNamespace() + "/" + meta.GetName(), nil
}
return meta.GetName(), nil
}

可以看到一般情況下返回值是 <namespace><name> ,如果 namespace 為空則直接返回 name。類似的還有一個叫做 IndexFunc 的型別,定義如下:

1
type IndexFunc func(obj interface{}) ([]string, error)

這是給一個物件生成 Index 用的,一個通用實現如下,直接返回物件的 namespace 欄位作為 Index

1
2
3
4
5
6
7
func MetaNamespaceIndexFunc(obj interface{}) ([]string, error) {
meta, err := meta.Accessor(obj)
if err != nil {
return []string{""}, fmt.Errorf("object has no meta: %v", err)
}
return []string{meta.GetNamespace()}, nil
}

下面我們直接來看 cacheStorage 是如果實現增刪改查的。

ThreadSafeStore

ThreadSafeStore 是 Indexer 的核心邏輯所在,Indexer 的多數方法是直接呼叫內部 cacheStorage 屬性的方法實現的,同樣先看介面定義:

  • client-go/tools/cache/thread_safe_store.go:41
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type ThreadSafeStore interface {
Add(key string, obj interface{})
Update(key string, obj interface{})
Delete(key string)
Get(key string) (item interface{}, exists bool)
List() []interface{}
ListKeys() []string
Replace(map[string]interface{}, string)
Index(indexName string, obj interface{}) ([]interface{}, error)
IndexKeys(indexName, indexKey string) ([]string, error)
ListIndexFuncValues(name string) []string
ByIndex(indexName, indexKey string) ([]interface{}, error)
GetIndexers() Indexers
AddIndexers(newIndexers Indexers) error
Resync() error // 過期了,沒有具體程式碼邏輯
}

對應實現:

1
2
3
4
5
6
type threadSafeMap struct {
lock sync.RWMutex
items map[string]interface{}
indexers Indexers
indices Indices
}

這裡的 Indexers 和 Indices 是:

1
2
3
type Index map[string]sets.String
type Indexers map[string]IndexFunc
type Indices map[string]Index

對照圖片理解一下這幾個欄位的關係:Indexers 裡存的是 Index 函式 map,一個典型的實現是字串 namespace 作為 key,IndexFunc 型別的實現 MetaNamespaceIndexFunc 函式作為 value,也就是我們希望通過 namespace 來檢索時,通過 Indexers 可以拿到對應的計算 Index 的函式,接著拿著這個函式,把物件穿進去,就可以計算出這個物件對應的 key,在這裡也就是具體的 namespace 值,比如 default、kube-system 這種。然後在 Indices 裡存的也是一個 map,key 是上面計算出來的 default 這種 namespace 值,value 是一個 set,而 set 表示的是這個 default namespace 下的一些具體 pod 的 <namespace>/<name> 這類字串。最後拿著這種 key,就可以在 items 裡檢索到對應的物件了。

threadSafeMap.Xxx()

比如 Add() 方法程式碼如下:

1
2
3
4
5
6
7
func (c *threadSafeMap) Add(key string, obj interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
oldObject := c.items[key] // c.items 是 map[string]interface{} 型別
c.items[key] = obj // 在 items map 裡新增這個物件
c.updateIndices(oldObject, obj, key) // 下面分析
}

可以看到更復雜的邏輯在 updateIndices 方法裡,我們繼續來看:

  • client-go/tools/cache/thread_safe_store.go:256
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) {
// 新增場景這裡是 nil,如果是更新,就需要刪除舊物件的索引了
if oldObj != nil {
c.deleteFromIndices(oldObj, key) // 刪除操作後面具體看
}
for name, indexFunc := range c.indexers { // 從 Indexers 裡拿到索引函式,比如 "namespace":MetaNamespaceIndexFunc
indexValues, err := indexFunc(newObj) // 通過 MetaNamespaceIndexFunc 計算得到 namespace,比如 "default"
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
}
index := c.indices[name] // 拿到一個 Index,對應型別 map[string]sets.String
if index == nil {
index = Index{}
c.indices[name] = index // 如果 map 不存在則初始化一個
} for _, indexValue := range indexValues { // "default"
set := index[indexValue] // 檢索 "default" 下的 set,對應一個集合,多個 pod 資訊
if set == nil {
set = sets.String{}
index[indexValue] = set // 如果為空則初始化一個
}
set.Insert(key) // key 也就是類似 "default/pod_1" 這樣的字串,儲存到 set 裡,也就完成了 key + obj 的 Add 過程
}
}
}

上面還提到了一個 deleteFromIndices 方法,前半段和上面邏輯上類似的,最後拿到 set 後不同於上面的 Insert 過程,這裡呼叫了一個 Delete。

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (c *threadSafeMap) deleteFromIndices(obj interface{}, key string) {
for name, indexFunc := range c.indexers {
indexValues, err := indexFunc(obj)
if err != nil {
panic(fmt.Errorf("unable to calculate an index entry for key %q on index %q: %v", key, name, err))
} index := c.indices[name]
if index == nil {
continue
}
for _, indexValue := range indexValues {
set := index[indexValue]
if set != nil {
set.Delete(key) // set 中刪除這個 key
if len(set) == 0 {
delete(index, indexValue)
}
}
}
}
}

Index() 等實現

最後看幾個具體方法等實現

Index() 方法

來看一下 Index() 方法的實現,Index() 方法的作用是給定一個 obj 和 indexName,比如 pod1和 “namespace”,然後返回 pod1 所在 namespace 下的所有 pod。

  • client-go/tools/cache/thread_safe_store.go:141
 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func (c *threadSafeMap) Index(indexName string, obj interface{}) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock() indexFunc := c.indexers[indexName] // 提取索引函式,比如通過 "namespace" 提取到 MetaNamespaceIndexFunc
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
} indexedValues, err := indexFunc(obj) // 物件丟進去拿到索引值,比如 "default"
if err != nil {
return nil, err
}
index := c.indices[indexName] // indexName 例如 "namespace",這裡可以查到 Index var storeKeySet sets.String
if len(indexedValues) == 1 {
// 多數情況對應索引值為1到場景,比如用 namespace 時,值就是唯一的
storeKeySet = index[indexedValues[0]]
} else {
// 對應不為1場景
storeKeySet = sets.String{}
for _, indexedValue := range indexedValues {
for key := range index[indexedValue] {
storeKeySet.Insert(key)
}
}
} list := make([]interface{}, 0, storeKeySet.Len())
// storeKey 也就是 "default/pod_1" 這種字串,通過其就可以到 items map 裡提取需要的 obj 了
for storeKey := range storeKeySet {
list = append(list, c.items[storeKey])
}
return list, nil
}

ByIndex() 方法

相比 Index(),這個函式要簡單的多,直接傳遞 indexedValue,也就不需要通過 obj 去計算 key 了,例如 indexName == namespace & indexValue == default 就是直接檢索 default 下的資源物件。

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (c *threadSafeMap) ByIndex(indexName, indexedValue string) ([]interface{}, error) {
c.lock.RLock()
defer c.lock.RUnlock() indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
} index := c.indices[indexName] set := index[indexedValue]
list := make([]interface{}, 0, set.Len())
for key := range set {
list = append(list, c.items[key])
} return list, nil
}

IndexKeys() 方法

和上面返回 obj 列表不同,這裡只返回 key 列表,就是 []string{“default/pod_1”} 這種資料

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *threadSafeMap) IndexKeys(indexName, indexedValue string) ([]string, error) {
c.lock.RLock()
defer c.lock.RUnlock() indexFunc := c.indexers[indexName]
if indexFunc == nil {
return nil, fmt.Errorf("Index with name %s does not exist", indexName)
} index := c.indices[indexName] set := index[indexedValue]
return set.List(), nil
}

Replace() 方法

Replace() 的實現簡單粗暴,給一個新 items map,直接替換到 threadSafeMap.items 中,然後重建索引。

 1
2
3
4
5
6
7
8
9
10
11
func (c *threadSafeMap) Replace(items map[string]interface{}, resourceVersion string) {
c.lock.Lock()
defer c.lock.Unlock()
c.items = items // rebuild any index
c.indices = Indices{}
for key, item := range c.items {
c.updateIndices(nil, item, key)
}
}

(轉載請保留本文原始連結)

https://www.danielhu.cn/post/k8s/client-go-indexer/