經過前兩篇的學習與實操,也大致掌握了一個k8s資源的Controller寫法了,如有不熟,可回顧

先說說CRD-controller的作用,本CR原意是記錄雲主機ECS及node節點對映資訊,而本controller則把這個對映操作省略掉,只為所有建立成功的CR打一個Label。而本篇為達成此目的,需要執行的步驟有以下幾個:

  • 往k8s建立一個CRD
  • 定義對應CRD的api,包含了struct
  • 給CRD的api註冊到scheme
  • 實現這個CRD的clinet,封裝其請求apiserver的相關操作
  • 實現一個Informer和Lister
  • 實現一個controller

通過上述步驟,可以繞開ApiBuilder腳手架,自己手捏一個CRD-Controller出來。可以更好的理解整個Informer機制的架構

建立一個CRD

建立CRD的manifest如下所示

apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.2.5
creationTimestamp: null
name: ecsbinding.hopegi.com
spec:
group: hopegi.com #此處定義api group
names:
kind: EcsBinding #本行和下一行定義CRD的單體kind和集合型別的Kind
listKind: EcsBindingList
plural: ecsbinding #複數寫法 #本行和下行僅填寫CRD的小寫即可
singular: ecsbinding #單數寫法
shortNames:
- ecsb #
preserveUnknownFields: false #允許儲存未知欄位,就是下面沒宣告的
additionalPrinterColumns: #在kubectl顯示的欄位
- JSONPath: .metadata.creationTimestamp
name: Age
type: date
- JSONPath: .spec.nodename
name: NodeName
type: string
- JSONPath: .spec.innerip
name: InnerIp
type: string
scope: Cluster #資源的作用域,一般是名稱空間下和叢集級別,名稱空間是Namespaced,叢集則Cluster
validation: #本行和下行都是固定
openAPIV3Schema:
description: hopegi test crd
properties: #後面開始描述各個欄位,API和metadata是固定,spec也是固定,spec往下則是自定義的欄位,每個欄位需要制定
#type,name屬性,description可選,type有常用的string,object,bool,int等,object則會多一級properties
#用於定義下一級的子欄位
apiVersion:
type: string
metadata:
type: object
spec:
properties:
id:
type: string
name:
type: string
nodename:
type: string
innerip:
type: string
type: object
status:
type: object
type: object
version: v1 #到此處往下皆固定
versions:
- name: v1
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []

這裡比較值得注意是ApiGroup需要定好,這個group到後續給scheme註冊資源型別時用到,影響往apiserver去互動管理資源。

定義CRD的api

這個api可能容易給人造成誤解,實際是定義CR的struct,包含什麼欄位,檔案路徑api/v1/ecs-bing.go

type EcsBinding struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"` Spec EcsBindSpec `json:"spec,omitempty"`
Status EcsBindStatus `json:"status,omitempty"`
} type EcsBindingList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"` Items []EcsBinding `json:"items"`
} type EcsBindSpec struct {
Id string `json:"id"`
Name string `json:"name"`
NodeName string `json:"node_name"`
InnerIp string `json:"inner_ip"`
} type EcsBindStatus struct {
}

自上而下定義EcsBinding和EcsBindingList兩個struct,由於要實現runtime.Object的介面,需要實現DeepCopyObject方法,如果用腳手架生成的程式碼,這部分實現介面的程式碼就不用手敲

註冊到Scheme

scheme註冊這裡分兩部分,一部分是定義一個scheme,另一部分是在各個api裡面提供AddToScheme函式,這個函式用於把各種型別各種版本的api(也就是GVK)註冊到scheme

先看第一部分,檔案路徑client/versiond/scheme/register.go

var scheme   = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(scheme)
var ParameterCodec = runtime.NewParameterCodec(scheme) func init() {
metav1.AddToGroupVersion(scheme,schema.GroupVersion{Version:"v1"})
if err:=AddToScheme(scheme);err!=nil{
fmt.Println("error to AddToScheme ",err)
}
} func AddToScheme(scheme *runtime.Scheme)error {
return ecsv1.AddToScheme(scheme)
}

在AddToScheme中就是呼叫各個kind的AddToScheme,儘管這裡只有一個Kind。第二部分的又回去api/v1/ecs-bing.go

var (

	// GroupVersion is group version used to register these objects
GroupVersion = schema.GroupVersion{Group: "hopegi.com", Version: "v1"} // SchemeBuilder is used to add go types to the GroupVersionKind scheme
SchemeBuilder = &scheme.Builder{GroupVersion: GroupVersion}
//SchemeBuilder = runtime.NewSchemeBuilder(AddKnownTypes) // AddToScheme adds the types in this group-version to the given scheme.
AddToScheme = SchemeBuilder.AddToScheme EscBindVersionKind = schema.GroupVersionKind{Group: GroupVersion.Group, Version: GroupVersion.Version, Kind: "EcsBinding"}
)
func init() { SchemeBuilder.Register(&EcsBinding{},&EcsBindingList{}) }

此處的Group需要和之前定義CRD時的group一致

實現CRD的clinet

這裡實際定義了一個clientSet,clientset應該包含多個version,一個version包含多個資源型別,但是這裡只有一個version,一個kind。clientSet的結構如下所示

clientSet
|---Discovery
|---EcsV1
|---RESTClient
|---EcsBindingClient

clientSet位於client/versiond/clientset.go

EcsV1位於client/versiond/typed/ecsbinding/v1/ecs_client.go中,它的RESTClient也用於傳遞給EcsClient,用於EcsClient對apiserver通訊的http客戶端

EcsBindingClient位於client/version/typed/ecsbingding/v1/ecs-binding.go中,定義了client的各種操作方法,封裝了對apiserver的各個http請求。

各個client的初始化,則是由最外層把Config一層一層的往裡面具體的Client傳。整套client的程式碼不在這裡展示,僅展示一下呼叫鏈

versiond.NewForConfig->ecsbindv1.NewForConfig
創建出clientSet 創建出EcsV1

當呼叫EcsV1的EcsBinding方法(也就是獲取EcsClient)時,才呼叫newEcsbindings建構函式構造一個client

ecsbindv1.NewForConfig的程式碼如下:

func NewForConfig(c *rest.Config)(*EcsV1Client,error)  {
config:=*c
if err:=setConfigDefaults(&config);err!=nil{
return nil,err
}
client,err:=rest.RESTClientFor(&config)
if err!=nil{
return nil,err
}
return &EcsV1Client{client},nil }

在這個函式中先給config設定預設引數,最後按照這些預設引數構造出一個RESTClient,這個RESTClient傳遞給EcsV1Client,一個作用是把它自己的一個成員restClient,另一個作用就是用於構造EcsClient所需的RESTClient。

setConfigDefaults函式定義如下

func setConfigDefaults(config *rest.Config) error {
gv := ecsv1.GroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: scheme.Codecs} if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
} return nil
}

函式給config指定了groudversion這個gv就是hopegi.com v1;api的地址固定是"/apis",通過這兩句可以確定客戶端跟apiserver通訊時的地址是/apis/hopegi.com/v1,後面設定scheme的序列化器,用於把apiserver響應的json資料反序列化成struct資料。

EcsBindingClient介面定義的函式如下

type EcsBindingInterface interface {
Create(*v1.EcsBinding)(*v1.EcsBinding,error)
Update(*v1.EcsBinding)(*v1.EcsBinding,error)
Delete(string,*metav1.DeleteOptions)error
DeleteCollection(options *metav1.DeleteOptions, listOptions metav1.ListOptions) error
List(metav1.ListOptions)(*v1.EcsBindingList,error)
Get(name string,options metav1.GetOptions)(*v1.EcsBinding,error)
Watch(options metav1.ListOptions)(watch.Interface,error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1.EcsBinding, err error)
}

以List方法實現作例子

func (c *ecsBinding)List(opts metav1.ListOptions)(*v1.EcsBindingList,error){
result := &v1.EcsBindingList{}
err := c.client.Get().
Resource("ecsbinding").
VersionedParams(&opts, scheme.ParameterCodec).
Do().
Into(result)
for _,o:=range result.Items{
o.SetGroupVersionKind(v1.EscBindVersionKind)
}
return result,err
}

client成員則是先前構造時傳入的RESTClient,Resource指定資源的名ecsbingding,當有CR返回時需要執行SetGroupVersionKind,否則拿到的CR結構體會丟失GroupVersion和Kind資訊

實現一個Informer和Lister

在實現某個資源的Informer之前,要實現一個Informer的Factory。這個Factory的作用有幾個,其一是用於構造一個Informer;另外就是在start一個Controller的時候呼叫它Start方法,Start方法內部就會把它管理的所有Informer Run起來。

實現一個Informer的Factory

SharedInformerFactory介面的定義如下所示,程式碼位於controller-demo/informers/factory.go

type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool EcsBind() ecsbind.Interface
}

這裡主要是暴露一個構造並獲取各個Group的Interface,Start方法的介面則來源於它繼承的internalinterfaces.SharedInformerFactory介面,程式碼位於 controller-demo/informers/internalinterface/factory_interfaces.go

type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}

除了Start方法,InformerFor跟構造一個Informer有關,實現Informer的時候會呼叫到factory的方法,後續會再介紹

EcsBind()返回的是這個Group的Interface,程式碼位於controller-demo/informers/ecsbind/interface.go
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}

V1的Interface則是涵蓋了這個版本下各個資源的客戶端介面,程式碼位於controller-demo/informers/ecsbind/v1/interface.go

type Interface interface {
EcsBinding() EcsBindingInformer
}

這樣也剛好跟k8s的api的層級相呼應,先是ApiGroup,再到Version,最後到Kind,就是GVK

實現一個Informer

一個Informer的最核心邏輯是List和Watch方法,不過我們實現一個Infomer時只需要給SharedIndexInformer提供這兩個方法就可以,呼叫這兩個方法的邏輯由SharedIndexInformer統一實現

func NewFilteredEcsBindingInformer(clientset *versiond.Clientset, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(&cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (object runtime.Object, e error) {
if tweakListOptions!=nil{
tweakListOptions(&options)
}
return clientset.EcsV1().EcsBinding().List(options)
},
WatchFunc: func(options meta_v1.ListOptions) (i watch.Interface, e error) {
if tweakListOptions!=nil{
tweakListOptions(&options)
}
return clientset.EcsV1().EcsBinding().Watch(options)
},
},&ecsv1.EcsBinding{},resyncPeriod,indexers)
}

實際上僅僅是呼叫了client而已,client則是來源於這個CR的Informer——EcsBindingInformer,看看它的介面定義和結構

type EcsBindingInformer interface {
Informer() cache.SharedIndexInformer
Lister() demov1.EcsBindingLister
} type ecsBindingInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
}

對外暴露的EcsBindingInformer僅僅是一個介面,暴露Informer和LIster兩個方法,實現則交由一個內部的結構實現,縱觀這個CRD的client,CR的client,clientset,Informer乃至後續的lister都是這樣的模式。

EcsBindingInformer的Informer()實現如下

func (e *ecsBindingInformer) Informer() cache.SharedIndexInformer {
return e.factory.InformerFor(&ecsv1.EcsBinding{}, e.defaultInformer)
}
func (e *ecsBindingInformer) defaultInformer(client versiond.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredEcsBindingInformer(client, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, e.tweakListOptions)
}

如前面介紹Factory的時候所介紹的,Informer建立時需要呼叫factory的InformerFor方法,傳入資源的指標以及一個函式回撥

func (e *ecsBindingInformer) Informer() cache.SharedIndexInformer {
return e.factory.InformerFor(&ecsv1.EcsBinding{}, e.defaultInformer)
}

回撥的宣告在internalinterface處,controller-demo/informers/internalinterface/factory_interfaces.go

type NewInformerFunc func(versiond.Interface, time.Duration) cache.SharedIndexInformer

在這裡就是ecsBindingInformer.defaultInformer,呼叫這個方法時就會把factory的client傳遞到構造SharedIndexInformer函式,這樣List函式和Watch函式就有client使用,相當於整個構造過程是

  • 建立一個client,將這個client傳遞給Factory
  • 建立一個Informer時,會通過Factory經過GVK三個層次的介面調到對應資源的Informer,同時factory的例項也會經過每一級往下傳遞
  • 呼叫Inform()方法獲得SharedIndexInformer,依次經過EcsBindingInformer.Informer()-->d.defaultInformer(即:NewInformerFunc回撥)-->NewFilteredEcsBindingInformer

實現一個Lister

EcsBindingInformer介面的另一個方法就是獲取Lister,僅僅需要把SharedIndexInformer的Indexer傳遞過去則可,Lister的快取機構已由SharedIndexInformer實現

func (e *ecsBindingInformer) Lister() demov1.EcsBindingLister {
return demov1.NewEcsBindingLister(e.Informer().GetIndexer())
}

作為apiserver的快取,供controller呼叫快速獲取資源,因此它需要提供兩個查詢的方法,程式碼位於controller-demo/listers/ecsbind/v1/ecs-binding-lister.go

type EcsBindingLister interface {
List(selector labels.Selector)([]*ecsv1.EcsBinding,error)
Get(name string)(*ecsv1.EcsBinding,error)
} func NewEcsBindingLister(indexer cache.Indexer) EcsBindingLister {
return &ecsBindingLister{
indexer:indexer,
}
} func (e *ecsBindingLister)List(selector labels.Selector)(ret []*ecsv1.EcsBinding,err error) {
err= cache.ListAll(e.indexer,selector, func(i interface{}) {
ret= append(ret, i.(*ecsv1.EcsBinding))
})
return
} func (e *ecsBindingLister)Get(name string)(*ecsv1.EcsBinding,error) {
obj, exists, err := e.indexer.GetByKey(name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1.Resource("ecsbind"), name)
}
return obj.(*ecsv1.EcsBinding), nil
}

實現一個controller

controller所依賴的各個元件都已經實現完畢,現在可以實現這個crd的controller,完整的實現不展示,大致跟上一篇NodeController類似。僅展示他的欄位和建構函式

type EcsBindingController struct {
kubeClient *versiond.Clientset
clusterName string
ecsbingdingLister list_and_watch.EcsBindingLister
ecsbingdingListerSynced cache.InformerSynced
broadcaster record.EventBroadcaster
recorder record.EventRecorder ecsQueue workqueue.DelayingInterface
lock sync.Mutex
} func NewEcsBindingController(kubeclient *versiond.Clientset,informer list_and_watch.EcsBindingInformer,clusterName string)*EcsBindingController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ecsbinding_controller"}) ec:= &EcsBindingController{
kubeClient:kubeclient,
clusterName:clusterName,
ecsbingdingLister:informer.Lister(),
ecsbingdingListerSynced:informer.Informer().HasSynced,
broadcaster:eventBroadcaster,
recorder:recorder, ecsQueue:workqueue.NewNamedDelayingQueue("EcsBinding"),
} informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ecsbindingg:=obj.(*ecsV1.EcsBinding)
fmt.Printf("controller: Add event, ecsbinding [%s]\n",ecsbindingg.Name)
ec.syncEcsbinding(ecsbindingg)
},
UpdateFunc: func(oldObj, newObj interface{}) {
ecs1,ok1:=oldObj.(*ecsV1.EcsBinding)
ecs2,ok2:=newObj.(*ecsV1.EcsBinding)
if ok1&&ok2 && !reflect.DeepEqual(ecs1,ecs2){
fmt.Printf("controller: Update event, ecsbinding [%s]\n",ecs1.Name)
ec.syncEcsbinding(ecs1)
}
},
DeleteFunc: func(obj interface{}) {
ecsbindingg:=obj.(*ecsV1.EcsBinding)
fmt.Printf("controller: Delete event, ecsbinding [%s]\n",ecsbindingg.Name)
ec.syncEcsbinding(ecsbindingg)
},
}) return ec
}

最後把controller加到controller的Start方法中,統一啟動

	demoCli, _ := versiond.NewForConfig(cfg)

	ecsbindFactory := ecsbindInformer.NewSharedInformerFactory(demoCli, 0)
ecsBindingInformer := ecsbindFactory.EcsBind().V1().EcsBinding()
ec := controller.NewEcsBindingController(demoCli, ecsBindingInformer, "k8s-cluster")
go ec.Run(stopCh)

小結

本篇雖然是說定義個CRD的controller,然而卻把更多的篇幅放到的controller外的一些元件上:api,client,informer。但正事如此自己編碼過一次,才會加深印象,後續在檢視K8S原始碼時遇到controller的原始碼摳出其核心邏輯,通過client去翻查api地址,才會快速上手。本篇的目的也就如此。

參考

client-go原始碼分析--informer機制流程分析

kubernetes client-go解析

深入淺出kubernetes之client-go的SharedInformer