1. 程式人生 > >kubernetes1.9原始碼閱讀 List

kubernetes1.9原始碼閱讀 List

List-Watch是kubernetes的核心機制。元件kubelet、kube-controller-manager、kube-scheduler需要監控各種資源(pod、service等)的變化,當這些物件發生變化時(add、delete、update),kube-apiserver會主動通知這些元件。這個過程類似一個釋出-訂閱系統。本文章將從程式碼角度探究一下list-watch的實現方式。

本次分析是基於kubernetes tag v1.9.0

kube-apiserver對etcd的List-watch機制

構建PodStorage

pkg/registry/core/pod/storage.go NewStorage()

  1. kube-apiserver針對每一類資源(pod、service、endpoint、replication controller、depolyments),都會建立Storage物件,如:PodStorage。PodStorage.Pod.Store封裝了對etcd的操作;
  2. RESTOptions包括Decorator物件,它是針對storage的一個裝飾器;

建立StorageDecorator

apiserver/registry/storage_factory.go

  1. 通過StorageFactory工廠模式建立Cacher物件,返回StorageDecorator,StorageDecorator即為帶有cache的storage;

建立Cacher

apiserver/pkg/storage/cacher.go NewCacherFromConfig()

  1. Cacher物件,主要的資料成員:storage、watchCache、reflector、watchers及incoming channel;
    1. watchCache是一個cache,用來儲存apiserver從etcd那裡watch到的物件;
    2. watchers是一個map,map的值型別為cacheWatcher,當kubelet、kube-scheduler需要watch某類資源時,他們會向kube-apiserver發起watch請求,kube-apiserver就會生成一個cacheWatcher,cacheWatcher負責將watch的資源通過http從apiserver傳遞到kubelet、kube-scheduler;
    3. Reflector物件,主要資料成員:ListerWatcher,ListerWatcher是介面物件,包括方法List()和Watch();listerWatcher包裝了Storage,主要是將watch到的物件存到watchCache中;
    4. incoming channel接收watchCacheEvent;
  2. 註冊cacher.processEvent方法,協程呼叫cacher.dispatchEvents();
  3. 協程呼叫cacher.startCaching();

StartCaching

client-go/tools/cache/reflector.go ListAndWatch()

  1. ListAndWatch()方法
    1. 首先,建立watchCache物件和cacheListerWatcher物件,cacheListWatcher物件是ListerWatcher介面實現,實現了List()和Watch()方法;
    2. 之後,執行cacheListerWatcher的List()方法和Watch()方法;
    3. 最後,呼叫reflector的watchHandler()方法;

List/Watch

apiserver/pkg/storage/cacher.go List()和Watch();

  1. List()方法將呼叫storage.List()方法;
  2. Watch()方法將呼叫storage.watch()方法;

Storage List/Watch

apiserver/pkg/storage/etcd/etcd_helper.go

  1. etcdHelper物件是Storage介面物件的實現;
  2. etcdHelper的List()方法:
    1. 獲取etcd的物件,包括resourceVersion資訊;
  3. etcdHelper的WatchList()方法:
    1. 建立etcdWatcher;
    2. etcdWatcher物件,實現了Watch介面;
    3. etcdWatcher物件,主要的資料成員是etcdIncoming channel和outgoing channel;
  4. 協程執行etcdWatcher.translate()
  5. 最後,協程執行etcdWatcher.etcdWatch()

etcdWatcher.etcdWatch

apiserver/pkg/storage/etcd/etcd_watcher.go etcdWatch()

  1. 如果resourceVersion==0, 執行etcdGetInitialWatchState(),獲取所有的pods,並將結果輸入到etcdIncoming channel;
  2. 之後,不停的呼叫watcher.Next(),並將結果輸入到etcdIncoming channel;

etcdWatcher.translate

apiserver/pkg/storage/etcd/etcd_watcher.go translate()

  1. 讀取etcdIncoming channel資訊;
  2. 呼叫etcdWatcher.sendResult()進行轉化;
  3. 輸入到outgoing channel;

reflector.watchHandler

client-go/tools/cache/reflector.go watchHandler()

  1. 讀取outgoing channel資訊,更新watchCache;

更新watchCache

apiserver/pkg/storage/watch_cache.go Add()/Delete()/Get()/Update()

處理事件processEvent

apiserver/pkg/storage/watch_cache.go processEvent()

  1. 建立watchCacheEvent
  2. 呼叫watchCache.updateCache(),更新cache;

kube-apiserver的watch功能

kube-apiserver的watch功能是作為一個restful api提供給其他元件(kubelet、kube-controller-manager、kube-scheduler、kube-proxy)。watch的處理流程和PUT、DELETE、GET等REST API處理流程類似。

registerResourceHandlers

apiserver/pkg/endpoints/installer.go registerResourceHandlers()

ListResource

apiserver/pkg/endpoints/handlers/rest.go ListResource()

1.呼叫rest.watcher.watch()方法,這裡將會呼叫Store.watch()

2.呼叫serveWatch()方法;

Store.watch

apiserver/pkg/registry/generic/registry/store.go watch()/watchPredicate()

  1. 會呼叫Storage.Watch()方法,這裡將會呼叫Cacher.watch()

Cacher.watch

apiserver/pkg/storage/cacher.go watch()/watchList()

  1. 首先,呼叫newCacheWatcher生成一個watcher,並將watcher插入到cacher.watchers中去;
  2. 協程呼叫cacheWatcher.process()方法,此方法將會操作cacheWatcher的input channel的訊息;

3. 會將watchCacheEvent通過add()新增到cacheWatcher的input channel中;

操作input channel

apiserver/pkg/storage/cacher.go process()

  1. 讀取input channel的資訊,並呼叫sendWatchCacheEvent()方法;

sendWatchCacheEvent

apiserver/pkg/storage/cacher.go sendWatchCacheEvent()

  1. kube-apiserver的watch會帶過濾的功能;
  2. 對watchCacheEvent進行Filter,輸出到cacheWatcher的result channel中;

serveWatch

apiserver/pkg/endpoints/handlers/rest.go serveWatch()

對result Channel資訊進行序列化,併發送給呼叫者;

相關閱讀:

1. http://licyhust.com/