1. 程式人生 > >Kubernetes CRD (CustomResourceDefinition) 自定義資源型別

Kubernetes CRD (CustomResourceDefinition) 自定義資源型別

目錄

1、CRD (CustomResourceDefinition) 介紹

我們知道,Kubernetes 中一切都可視為資源,它提供了很多預設資源型別,如 Pod、Deployment、Service、Volume等一系列資源,能夠滿足大多數日常系統部署和管理的需求。但是,在一些特殊的需求場景下,這些現有資源型別就滿足不了,那麼這些就可以抽象為 Kubernetes 的自定義資源,在 Kubernetes 1.7 之後增加了對 CRD 自定義資源二次開發能力來擴充套件 Kubernetes API,通過 CRD 我們可以向 Kubernetes API 中增加新資源型別,而不需要修改 Kubernetes 原始碼或建立自定義的 API server,該功能大大提高了 Kubernetes 的擴充套件能力。它是 (TPR) ThirdPartyResource 的替代者,在 1.9 以上版本 TPR 將被廢棄。

下圖展示了 client-go 庫各元件如何工作以及同自定義 Container 的互動。
client-go-controller

通過圖示,我們可以看到幾個核心元件以及互動流程,以上藍色部分是 client-go 元件,黃色部分是自定義 Controller 元件,各元件作用介紹如下:

1.1 client-go 元件

  • Reflector:該元件是用來監測指定資源型別的 Kubernetes API,當監測 API 接收到新資源型別例項變化時,它將通過 List API 來獲取新建立的 Object,並將其放入到 Delta Fifo queue(一個先進先出佇列)中。
  • Informer:該元件是用來將 Delta Fifo queue 中的 Object 迴圈取出,並且儲存 Object 供後邊索引,並呼叫我們自定義 Controller 傳遞 Object。
  • Indexer:該元件是為 Object 提供索引功能,典型的用例就是通過 Object 的標籤建立索引,並且使用執行緒安全的資料儲存來儲存 Object 以及它的 Keys。

1.2 Custom Controller 元件

  • Informer reference:該元件是知道如何使用自定義資源 Object 的 Informer 例項的引用,我們需要在自定義 Controller 程式碼中建立適當的 Informer。
  • Indexer reference:該元件是知道如何使用自定義資源 Object 的 Indexer 例項的引用,我們需要在自定義 Controller 程式碼中建立適當的 Indexer,並且將使用該引用處理後續檢索 Object。
  • Resource Event Handlers:該元件是當 Informer 要部署 Object 到我們自定義 Controller 時,呼叫的 Callback 函式。這些函式可以獲取被排程 Object 的 Key,並將 Key 存入工作佇列以便進行下一步處理。
  • Work queue:該元件是我們自定義 Controller 中建立用來解耦一個處理中的 Object,也是上邊 Resource Event Handlers 儲存 Key 的地方。
  • Process Item:該元件是我們自定義 Controller 中建立用來處理 Work queue 的一些列函式,這些方法通常使用 Indexer reference 並檢索該 Object 對應的 Key。

簡單的說,整個處理流程大概為:Reflector 通過檢測 Kubernetes API 來跟蹤該擴充套件資源型別的變化,一旦發現有變化,就將該 Object 儲存佇列中,Informer 迴圈取出該 Object 並將其存入 Indexer 進行檢索,同時觸發 Callback 回撥函式,並將變更的 Object Key 資訊放入到工作佇列中,此時自定義 Controller 裡面的 Process Item 就會獲取工作佇列裡面的 Key,並從 Indexer 中獲取 Key 對應的 Object,從而進行相關的業務處理。

2、環境、軟體準備

本次演示環境,我是在本機 MAC OS 上操作,以下是安裝的軟體及版本:

  • Docker: 17.09.0-ce
  • Oracle VirtualBox: 5.1.20 r114628 (Qt5.6.2)
  • Minikube: v0.28.2
  • Kubernetes: v1.10.0
  • Kubectl:
    • Client Version: v1.10.0
    • Server Version: v1.10.0

注意:這裡 Kubernetes 叢集搭建使用 Minikube 來完成,Minikube 啟動的單節點 k8s Node 例項是需要執行在本機的 VM 虛擬機器裡面,所以需要提前安裝好 VM,這裡我選擇 Oracle VirtualBox。k8s 執行底層使用 Docker 容器,所以本機需要安裝好 Docker 環境,這裡忽略 Docker、VirtualBox、Minikube、Kubectl 的安裝過程,可以參考之前文章 Minikube & kubectl 升級並配置, 這裡著重介紹下 Kubernetes CRD 示例 sample-controller 的使用以及原始碼分析。

3、Kubernetes CRD 示例 sample-controller 使用

上一篇文章 部署 Prometheus Operator 監控 Kubernetes 叢集 中,我們講到 Prometheus Operator 部署了幾種自定義資源型別,如 Alertmanager、Prometheus、ServiceMonitor,通過這些 CRD 資源,很輕鬆就能部署完整個監控系統,當時,就勾引了我的興趣,通過幾天的摸索和實踐,也慢慢了解了 CRD 的工作機制和原理,接下來,我們通過官方示例 sample-controller 來演示下如何使用 Kubernetes CRD。

通過該示例 sample-controller,我們可以清楚的瞭解到:

  • 如何通過 CRD 建立一個新的自定義資源型別 Foo
  • 如何建立、獲取、List 該新資源型別 Foo 的例項
  • 如何在資源處理建立、更新、刪除事件上設定 Controller

廢話少說,直接操作一下吧!首先進入到本地 $GOPATH 目錄,編譯並啟動該專案。

$ cd $GOPATH/src/k8s.io/sample-controller/
$ go build -o sample-controller .
$./sample-controller -kubeconfig=$HOME/.kube/config

然後,直接使用該示例提供的 CRD 模板檔案來建立一個新資源型別 Foo,並建立一個 Foo 型別的資源例項。

# 建立 kind 為 Foo 的 CRD 型別 
$ kubectl create -f artifacts/examples/crd.yaml
customresourcedefinition.apiextensions.k8s.io/foos.samplecontroller.k8s.io created
$ kubectl get crd
NAME                                    CREATED AT
foos.samplecontroller.k8s.io            2018-08-17T06:53:25Z

# 建立一個 Foo 型別的資源例項
$ kubectl create -f artifacts/examples/example-foo.yaml
foo.samplecontroller.k8s.io/example-foo created
$ kubectl get deployments
NAME          DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
example-foo   1         1         1            1           1m
$ kubectl get pods
NAME                          READY     STATUS    RESTARTS   AGE
example-foo-d74cd7fbc-2dnwn   1/1       Running   0          1m
$ kubectl get foo
NAME          CREATED AT
example-foo   1m

啟動完畢,不過,大家肯定該是不懂,為什麼就這麼簡單操作,就可以完成 Foo 資源型別的建立,並且建立了 example-foo deployments 例項呢?那麼,我們看下這兩個配置檔案,到底配置了什麼?

$ cat artifacts/examples/crd.yaml
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
  name: foos.samplecontroller.k8s.io
spec:
  group: samplecontroller.k8s.io
  version: v1alpha1
  names:
    kind: Foo
    plural: foos
  scope: Namespaced

該 CRD 模板定義了一個新的資源管理物件 Foo,在沒有修改任何 kubernetes 核心程式碼條件下,僅僅通過定義 CRD 型別就完成了,非常方便又木有。這樣一個新的名稱空間 RESTful API 端點就建立了,例如該示例: /apis/samplecontroller.k8s.io/v1alpha1/namespaces/*/foos/...

$ cat example-foo.yaml
apiVersion: samplecontroller.k8s.io/v1alpha1
kind: Foo
metadata:
  name: example-foo
spec:
  deploymentName: example-foo
  replicas: 1

通過該 yaml 檔案,可以建立新資源型別 Foo 的 Pod 例項 example-foo,注意這裡 apiVersion: samplecontroller.k8s.io/v1alpha1 要跟 crd.yaml 中配置要匹配 <group>/<version>Kind 指定為新資源型別 Foo。不過這裡有人會有疑問,該 yaml 檔案沒有指定 Deployment 型別,只是指定了 deploymentName 就建立了名稱為 example-foo 的 Deployment,而且通過詳情可以看到實際上該 Deployment 指定了 nginx:latest 的映象容器。

$ kubectl describe pod/example-foo-d74cd7fbc-2dnwn
Events:
  Type    Reason                 Age   From               Message
  ----    ------                 ----  ----               -------
  Normal  Scheduled              1m    default-scheduler  Successfully assigned example-foo-d74cd7fbc-2dnwn to minikube
  Normal  SuccessfulMountVolume  1m    kubelet, minikube  MountVolume.SetUp succeeded for volume "default-token-rnj54"
  Normal  Pulling                1m    kubelet, minikube  pulling image "nginx:latest"
  Normal  Pulled                 4s    kubelet, minikube  Successfully pulled image "nginx:latest"
  Normal  Created                4s    kubelet, minikube  Created container
  Normal  Started                4s    kubelet, minikube  Started container

那麼這個是怎麼實現的呢?其實這就是自定義 CRD Contorller 中定義實現的。接下來通過原始碼,我們簡單分析一下該自定義 Contorller 是如何實現的。

4、原始碼分析 CRD Contorller 的實現

我們通過原始碼簡要分析一下,自定義 CRD Controller 是如何實現的。首先,在該例項專案根目錄下存在兩個主要實現檔案:main.gocontroller.go

main.go 檔案主要是作為整個程式的入口主啟動程式,使用非同步處理,呼叫 controller.goRun 方法來啟動 Foo Controller。

4.1 main 主啟動程式

import (
    # 引入依賴包
    kubeinformers "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    clientset "k8s.io/sample-controller/pkg/client/clientset/versioned"
    informers "k8s.io/sample-controller/pkg/client/informers/externalversions"
    ......
)   
    # 設定訊號標示以便後邊非同步接收該訊號結束程序
    stopCh := signals.SetupSignalHandler()
    ......

    # 初始化 kubeInformer、exampleInformer Factory
    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
    exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

    # 初始化 Foo controller
    controller := NewController(kubeClient, exampleClient,
        kubeInformerFactory.Apps().V1().Deployments(),
        exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

    # 非同步啟動 Factory
    go kubeInformerFactory.Start(stopCh)
    go exampleInformerFactory.Start(stopCh)

    # 呼叫 Run 啟動函式並指定 replica 數量以及非同步啟動
    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
    .....

controller.go 是主處理檔案,包括 Controller 的定義、初始化、啟動、Callback 函式等等操作。

4.2 定義 Controller 結構體

首先需要定義一個 Controller 結構體,包含 deploymentsListerfoosListerworkqueue 等等

type Controller struct {
    kubeclientset kubernetes.Interface
    sampleclientset clientset.Interface

    deploymentsLister appslisters.DeploymentLister
    deploymentsSynced cache.InformerSynced
    foosLister        listers.FooLister
    foosSynced        cache.InformerSynced

    # 工作佇列
    workqueue workqueue.RateLimitingInterface
    recorder record.EventRecorder
}

4.3 初始化 Controller

最新版本里,把 kubeInformer 和 fooInformer 初始化放在了 main.go 中,這裡進行了 workqueue 初始化。


func NewController(
    kubeclientset kubernetes.Interface,
    sampleclientset clientset.Interface,
    deploymentInformer appsinformers.DeploymentInformer,
    fooInformer informers.FooInformer) *Controller {

    utilruntime.Must(samplescheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:     kubeclientset,
        sampleclientset:   sampleclientset,
        deploymentsLister: deploymentInformer.Lister(),
        deploymentsSynced: deploymentInformer.Informer().HasSynced,
        foosLister:        fooInformer.Lister(),
        foosSynced:        fooInformer.Informer().HasSynced,
        workqueue:         workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Foos"),
        recorder:          recorder,
    }

4.4 Deployment & Foo informer 監控資源 CRUD 操作的回撥函式

    fooInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueFoo,
        UpdateFunc: func(old, new interface{}) {
            controller.enqueueFoo(new)
        },
    })

    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.handleObject,
        UpdateFunc: func(old, new interface{}) {
            newDepl := new.(*appsv1.Deployment)
            oldDepl := old.(*appsv1.Deployment)
            if newDepl.ResourceVersion == oldDepl.ResourceVersion {
                return
            }
            controller.handleObject(new)
        },
        DeleteFunc: controller.handleObject,
    })

這裡要說下,fooInformer 的處理函式呼叫 enqueueFoo 方法將物件狀態變化事件存入到工作佇列中,deploymentInformer 的處理函式呼叫 handleObject 方法處理物件的 Add、Update、Del 事件,並最終將物件存入到工作佇列中。在這裡,可以看到 Controller 主要針對以下幾種資源事件進行了處理,一個是 Foo 資源的 Add、Update 事件處理,一個是 deployment 資源的 Add、Update、Delete 事件處理。這裡 deployment 事件呼叫 handleObject 方法對所有 deployment 進行過濾,將 Foo 資源例項對應的 deployment 過濾出來,並將對應的事件加入到工作佇列中。

4.5 啟動 Controller

// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
    defer runtime.HandleCrash()
    defer c.workqueue.ShutDown()

    // Start the informer factories to begin populating the informer caches
    glog.Info("Starting Foo controller")

    // Wait for the caches to be synced before starting workers
    glog.Info("Waiting for informer caches to sync")
    if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok {
        return fmt.Errorf("failed to wait for caches to sync")
    }

    glog.Info("Starting workers")
    // Launch two workers to process Foo resources
    for i := 0; i < threadiness; i++ {
        go wait.Until(c.runWorker, time.Second, stopCh)
    }

    glog.Info("Started workers")
    <-stopCh
    glog.Info("Shutting down workers")

    return nil
}

在該 Run 函式呼叫 runWorker 函式執行 Workers 前,需要等待狀態的同步完成,然後啟動多個 worker 併發的從工作佇列中獲取待處理的 Item,真正進行業務處理的函式為 runWorker 方法。

4.6 runWorker 佇列處理函式

runWorker函式是一個長期執行的函式,它呼叫 processNextWorkItem 函式來執行讀取並處理工作佇列上的訊息。

func (c *Controller) runWorker() {
    for c.processNextWorkItem() {
    }
}

func (c *Controller) processNextWorkItem() bool {
    obj, shutdown := c.workqueue.Get()

    if shutdown {
        return false
    }

    err := func(obj interface{}) error {
        defer c.workqueue.Done(obj)
        var key string
        var ok bool
        if key, ok = obj.(string); !ok {
            c.workqueue.Forget(obj)
            runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
            return nil
        }
        if err := c.syncHandler(key); err != nil {
            return fmt.Errorf("error syncing '%s': %s", key, err.Error())
        }
        c.workqueue.Forget(obj)
        glog.Infof("Successfully synced '%s'", key)
        return nil
    }(obj)
......
}

工作佇列中的每一個 item 都要呼叫 syncHandler 函式進行處理,其中就包括 Foo Deployment 的建立和更新。

func (c *Controller) syncHandler(key string) error {
    ......
    deployment, err := c.deploymentsLister.Deployments(foo.Namespace).Get(deploymentName)
    if errors.IsNotFound(err) {
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Create(newDeployment(foo))
    }
    ......
    if foo.Spec.Replicas != nil && *foo.Spec.Replicas != *deployment.Spec.Replicas {
        glog.V(4).Infof("Foo %s replicas: %d, deployment replicas: %d", name, *foo.Spec.Replicas, *deployment.Spec.Replicas)
        deployment, err = c.kubeclientset.AppsV1().Deployments(foo.Namespace).Update(newDeployment(foo))
    }
    ......
    err = c.updateFooStatus(foo, deployment)
    if err != nil {
        return err
    }

    c.recorder.Event(foo, corev1.EventTypeNormal, SuccessSynced, MessageResourceSynced)
}

每個 Deployment 事件都會呼叫 newDeployment 函式產生一個新的 Deployment 例項配置如下,這裡就是上邊提到疑問 沒有指定 Deployment 型別,只是指定了 deploymentName 就建立了名稱為 example-foo 容器映象為 nginx:latest 的 Deployment,就是在這裡定義的。

func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment {
    labels := map[string]string{
        "app":        "nginx",
        "controller": foo.Name,
    }
    return &appsv1.Deployment{
        ObjectMeta: metav1.ObjectMeta{
            Name:      foo.Spec.DeploymentName,
            Namespace: foo.Namespace,
            OwnerReferences: []metav1.OwnerReference{
                *metav1.NewControllerRef(foo, schema.GroupVersionKind{
                    Group:   samplev1alpha1.SchemeGroupVersion.Group,
                    Version: samplev1alpha1.SchemeGroupVersion.Version,
                    Kind:    "Foo",
                }),
            },
        },
        Spec: appsv1.DeploymentSpec{
            Replicas: foo.Spec.Replicas,
            Selector: &metav1.LabelSelector{
                MatchLabels: labels,
            },
            Template: corev1.PodTemplateSpec{
                ObjectMeta: metav1.ObjectMeta{
                    Labels: labels,
                },
                Spec: corev1.PodSpec{
                    Containers: []corev1.Container{
                        {
                            Name:  "nginx",
                            Image: "nginx:latest",
                        },
                    },
                },
            },
        },
    }

以上簡單分析了兩個主要的檔案,而其中它呼叫的一些核心自定義函式,位於專案 pkg 目錄下,該目錄下的函式是基於 client-go 進行的呼叫以及擴充套件,建議大家細細研究下。

import (
    ......
    samplev1alpha1 "k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1"
    clientset "k8s.io/sample-controller/pkg/client/clientset/versioned"
    samplescheme "k8s.io/sample-controller/pkg/client/clientset/versioned/scheme"
    informers "k8s.io/sample-controller/pkg/client/informers/externalversions/samplecontroller/v1alpha1"
    listers "k8s.io/sample-controller/pkg/client/listers/samplecontroller/v1alpha1"
)

參考資料