標準Controller

上一篇通過一個簡單的例子,編寫了一個controller-manager,以及一個極簡單的controller。從而對controller的開發有個最基本的認識,但是細心觀察前一篇實現的controller僅僅是每次全量獲取了所有資源,雖然都是從快取中獲取速度是比較快的,如果單次處理一個資源時的時間比較長,而且沒必要每次都把所有資源都掃描一遍,上一篇實現的controller顯然不符合使用場景了,本篇將繼續用另一種方式開發一個結構較為標準的Controller,並介紹支撐controller功能實現的Informer架構。

先介紹一下本次實現的controller需要實現的功能,本controller通過監控Node節點的新增,發現新增是通過特定方式加入叢集的,就給該node打上label。

看下本次controller的結構包含的成員

type NodeController struct {
kubeClient *kubernetes.Clientset //用於給符合條件的node打標記用
nodeLister corelisters.NodeLister //用於獲取被監控的node資源
nodeListerSynced cache.InformerSynced
nodesQueue workqueue.DelayingInterface //一個延時佇列,用於記錄需要controller的node的key
cloudProvider cloudproviders.CloudProvider //用於判定node是否符合條件打標記,此成員並非controller關鍵結構的成員
}

下面是Controller的建構函式

func NewNodeController(kubeClient *kubernetes.Clientset, nodeInformer coreinformers.NodeInformer, cp cloudproviders.CloudProvider) *NodeController {

	n := &NodeController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
nodesQueue: workqueue.NewNamedDelayingQueue("nodes"),
cloudProvider: cp,
} nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(cur interface{}) {
node := cur.(*v1.Node)
fmt.Printf("controller: Add event, nodes [%s]\n", node.Name)
n.syncNodes(node)
},
DeleteFunc: func(cur interface{}) {
node := cur.(*v1.Node)
fmt.Printf("controller: Delete event, nodes [%s]\n", node.Name)
n.syncNodes(node)
},
}) return n
}

傳入建構函式的Informer除了提供Lister和HasSynced函式以外,多了一個事件註冊的操作,註冊了node資源的Add事件和Delete事件。這個AddEventHandler除了註冊這兩種事件外,還可以註冊Update事件,由於在這次例子中不需要處理這方面的事件,因此沒有用上。此處通過Informer的事件回撥實現了哪個資源有變更,就會觸發事件,通知到controller來,與前一個controller相比這種就能立馬定位到有變更的資源,無需將資源全量掃描才找到對應的資源。程式碼中兩個事件處理函式都是獲取到變更的node,打印出node的名字和變更事件,最後呼叫syncNodes方法

func (n *NodeController) syncNodes(node *v1.Node) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(node)
if err != nil {
fmt.Printf("Couldn't get key for object %#v: %v \n", node, err)
return
}
fmt.Printf("working queue add node %s\n", key)
n.nodesQueue.Add(key)
}

syncNodes單純給資源求了一個key,再把這個key塞到workqueue中。接收informer傳遞過來的變更事件處理也就此結束。

然後到啟動controller的Run方法

func (n *NodeController) Run(stopCh <-chan struct{}) {

	defer runtime.HandleCrash()
defer n.nodesQueue.ShutDown() fmt.Println("Starting service controller")
defer fmt.Println("Shutting down service controller") if !cache.WaitForCacheSync(stopCh, n.nodeListerSynced) {
runtime.HandleError(fmt.Errorf("time out waiting for caches to sync"))
return
} for i := 0; i < WorkerCount; i++ {
go wait.Until(n.worker, time.Second, stopCh)
} <-stopCh
}
func (n *NodeController) worker() {
for {
func() {
key, quit := n.nodesQueue.Get()
if quit {
return
}
defer n.nodesQueue.Done(key)
err := n.handleNode(key.(string))
if err != nil {
fmt.Printf("controller: error syncing node, %v \n", err)
}
}()
}
}

接收終止訊號和快取處理與之前的controller沒多大區別,區別就在於啟動了若干個worker協程,worker方法就從workqueue中取出key,然後處理node

func (n *NodeController) handleNode(key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
node, err := n.nodeLister.Get(name)
switch {
case errors.IsNotFound(err):
fmt.Printf("Node has been deleted %v \n", key)
return nil
case err != nil:
fmt.Printf("Unable to retrieve node %v from store: %v \n", key, err)
n.nodesQueue.AddAfter(key, time.Second*30)
return err
default:
err := n.processNodeAddIntoCluster(node)
if err != nil {
n.nodesQueue.AddAfter(key, time.Second*30)
}
return err
}
return nil
}

由於workqueue裡存的也只是代表資源的key,此時需要將key轉換回資源的name,還是通過lister從cache中把資源取出。當取不出來,就表明資源已經被刪除了,如果有相應的刪除相關的邏輯就可以在此處執行;如果能取出來,當前的資源已經是最新版本的資源(更新時會有新舊兩個版本的資源,這裡拿到的是新版本的),當然在本例中沒有更新這個事件的處理;整個過程中出現了非期望的錯誤(如因資源被刪除導致的IsNotFound error),處理邏輯需要重試,重試的方式就是將延時重新入隊,延時佇列的作用在此處得以發揮,因為有可能出現這種極端場景:當前只有一個資源需要被處理,而且該資源在剛建立的時候因為其狀態未就緒確實會一直處理失敗,假設當其被處理失敗時馬上又重新入隊,那整個controller就會陷入一個類似於死迴圈的狀態中,直到資源狀態就緒,這樣會浪費不少計算資源。

最後的processNodeAddIntoCluster就是給node打上label並呼叫kubeclient更新之,這裡就不展示了。

看到上面使用了workqueue,頗有生產者消費者的模式,從informer的事件處理函式中獲取到變更的資源將其放入佇列,這個是生產者;worker處理方法從workqueue裡取出變更資源處理,這個是消費者。使用了這個workqueue而不是直接在事件處理函式直接處理的目的在於事件觸發的速度與資源處理的速度不一致,有workqueue在其中起到緩衝作用,免除了因實際處理變更的邏輯造成了事件觸發方的阻塞導致影響了事件的實時性。

類似的佇列和生產者消費者模式在informer中也有出現,作為controller處理資源變更調諧資源狀態的上游,它又如何提供及時的變更事件給下游的controller,以及給下游的controller提供與apiserver一致的資源快取。

Informer機制

Informer機制架構如下圖所示



從該架構中還涉及了幾個相關元件:

  • Reflector:用於監控apiserver相關資源的變化,及時把變更獲取回來,觸發相關的變更事件,變更的資源存放到Delta FIFO裡面;
  • Delta FIFO:一個存放資源變更的FIFO佇列,裡面元素是以 Add/Update/Delete 為key,變更的資源為value這樣的一個物件;
  • Indexder:用於存放從Delta FIFO佇列取出後,經過處理好的值,是apiserver的一個快取,無需每次從apiserver以及Etcd中獲取,減輕兩者的壓力,裡面的快取應與apiserver(實質是Etcd)保持一致。

從上面的結構中,尤其是Delta FIFO,模式與上面使用了WorkQueue的controller是一致的,生產者消費者模式:

  • 生產者方面:主要操作由Reflector執行,它主要依靠ListAndWatch方法對apiserver的資源進行監控,ListAndWatch方法中就呼叫了由Informer提供的List方法和Watch方法,一旦獲得變更的資源,就將資源及其變更的方式(Add,Update,Delete)一同存入Delta FIFO,如上面介紹Delta FIFO時所述;
  • 消費者方面:主要操作由Controller執行,它主要呼叫Delta FIFO的Pop方法,從佇列中取出變更的資源及其變更方式,呼叫之前註冊到Informer裡面相關的事件(即例子中NewNodeController函式中AddEventHandler方法註冊進去的事件處理函式),將變更分發出去,在這裡Delta FIFO除了記錄變更的資源本身資料,也附帶記錄變更方式的作用體現出來了。最後將這個資源快取到Indexer中,快取時也是通過變更方式對快取進行操作,比如是Add變更,則直接往快取中Add一個記錄;是Update則把快取記錄更新;是Delete則刪除記錄。這也是另一個體現記錄變更方式的地方。

Informer和自定義的Controller(這個controller並非消費者的controller)的互動主要是兩處,其一是AddEventHandler時,把變更資源收到自己的workerqueue中供後續執行調諧操作;其二是在調諧期間從Indexer(這個並非是直接呼叫,是通過呼叫Informer時巢狀呼叫的)獲取快取的資源值。整個過程所有涉及的成員如下圖所示

小結Informer

Informer在整個過程中起到銜接各方的作用。Reflector對apiserver的監控是由Informer提供的ListAndWatch方法,資源從apiserver介面拿回來資源資料由註冊進去Informer的反序列化器進行反序列化(這部分跟Schema有關)。而資料進入Indexer快取前,的事件分發也是通過Informer呼叫各個已經註冊上來的事件處理函式。

瞭解了Informer機制,後面則進行自定義資源的Controller開發,裡面還需要給自定義資源擴充套件一個Informer,及封裝跟apiserver訪問自定義資源的client,實現完整的手捏Controller.