1. 程式人生 > >Kubernetes Node Controller原始碼分析之執行篇

Kubernetes Node Controller原始碼分析之執行篇

Author: [email protected]

摘要:我認為,Node Controller是Kubernetes幾十個Controller中最為重要的Controller之一,其重要程度在Top3,然而這可能也是最為複雜的一個Controller,因此對其的原始碼分析,我將做一個系列文章,希望能幫助自己有一個深入淺出的理解。本博文從NodeController的Run方法作為入口,對其工作原理作了跟蹤分析。

Node Controller的執行

Node Controller的Run方法如下,這是所有Node Controller真正處理邏輯的入口。

pkg/controller/node/nodecontroller.go
:550 // Run starts an asynchronous loop that monitors the status of cluster nodes. func (nc *NodeController) Run() { go func() { defer utilruntime.HandleCrash() if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) { utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync"
)) return } // Incorporate the results of node status pushed from kubelet to master. go wait.Until(func() { if err := nc.monitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } }, nc.nodeMonitorPeriod, wait.NeverStop) if
nc.runTaintManager { go nc.taintManager.Run(wait.NeverStop) } if nc.useTaintBasedEvictions { // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints. go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop) } else { // Managing eviction of nodes: // When we delete pods off a node, if the node was not empty at the time we then // queue an eviction watcher. If we hit an error, retry deletion. go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop) } }() }

WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)

  • Node Controller首先呼叫WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即這三個API Object都完成同步。
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100

// WaitForCacheSync waits for caches to populate.  It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {

    // 每隔100ms遍歷一次cacheSyncs中的InformerSynced方法,
    // 當所有要求的cacheSyncs方法都返回true,
    // 意味著所有要求的cache都已經同步後,則WaitForCacheSync返回true,
    // 否則繼續遍歷。
    err := wait.PollUntil(syncedPollPeriod,
        func() (bool, error) {
            for _, syncFunc := range cacheSyncs {
                if !syncFunc() {
                    return false, nil
                }
            }
            return true, nil
        },
        stopCh)
    if err != nil {
        glog.V(2).Infof("stop requested")
        return false
    }

    glog.V(4).Infof("caches populated")
    return true
}

WaitForCacheSync的實現邏輯是:

  • 每隔100ms遍歷一次cacheSyncs中的InformerSynced方法,當所有要求的cacheSyncs方法都返回true,意味著所有要求的cache都已經同步後,則WaitForCacheSync返回true,

  • 否則按照100ms的週期繼續遍歷,知道返回true或者受到stop訊號為止。

啟動goruntime按照5s的週期執行monitorNodeStatus,進行Node狀態監控

pkg/controller/node/nodecontroller.go:586

// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {

    // We are listing nodes from local cache as we can tolerate some small delays
    // comparing to state from etcd and there is eventual consistency anyway.
    nodes, err := nc.nodeLister.List(labels.Everything())
    if err != nil {
        return err
    }

    // 對比knownNodeSet和nodes資料,得到對應的added和deleted Node列表
    added, deleted := nc.checkForNodeAddedDeleted(nodes)

    // 遍歷added Node列表,表示Node Controller觀察到一個新的Node加入叢集
    for i := range added {
        ...

        // 將added node新增到knowNodeSet中
        nc.knownNodeSet[added[i].Name] = added[i]

        // When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
        zone := utilnode.GetZoneKey(added[i])
        if _, found := nc.zoneStates[zone]; !found {

           // 設定該Node對應的新zone狀態為“Initial”
            nc.zoneStates[zone] = stateInitial

            // 如果Node Controller的useTaintBasedEvictions為false(--feature-gates中指定,預設TaintBasedEvictions=false),
            // 則新增該zone對應的zonePodEvictor,並設定evictionLimiterQPS(--node-eviction-rate設定,預設為0.1if !nc.useTaintBasedEvictions {
                nc.zonePodEvictor[zone] =
                    NewRateLimitedTimedQueue(
                        flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
            } else {

                // 如果Node Controller的useTaintBasedEvictions為true,
                // 則新增該zone對應的zoneNotReadyOrUnreachableTainer,並設定evictionLimiterQPS
                nc.zoneNotReadyOrUnreachableTainer[zone] =
                    NewRateLimitedTimedQueue(
                        flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
            }
            ...
        }

        // 如果Node Controller的useTaintBasedEvictions為true,呼叫RemoveTaintOffNode將Node上對應的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
        // 並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
        if nc.useTaintBasedEvictions {
            nc.markNodeAsHealthy(added[i])
        } else {

        // 如果Node Controller的useTaintBasedEvictions為false,即使用zonePodEvictor時,
        // 將該node從對應的zonePodEvictor Queue中Remove
            nc.cancelPodEviction(added[i])
        }
    }

    // 遍歷deleted Nodes列表,將其從knowNodeSet中刪除
    for i := range deleted {
       ...
        delete(nc.knownNodeSet, deleted[i].Name)
    }


    zoneToNodeConditions := map[string][]*v1.NodeCondition{}
    for i := range nodes {
        ...
        // PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
        // retrySleepTime為20ms,timeout為100ms。
        if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {

          // nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it. 
          // Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
            gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
            if err == nil {
                return true, nil
            }
            ...
        }); 
        ...
        }

        // 對於非master節點,將node對應的NodeCondition新增到zoneToNodeConditions Map中。
        // We do not treat a master node as a part of the cluster for network disruption checking.
        if !system.IsMasterNode(node.Name) {
            zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
        }


        decisionTimestamp := nc.now()
        if currentReadyCondition != nil {

            // 當觀察到Node的Condition為NotReady時,根據是否useTaintBasedEvictions是否為true,分別進行處理
            // Check eviction timeout against decisionTimestamp
            if observedReadyCondition.Status == v1.ConditionFalse {

                 // useTaintBasedEvictions為true時,
                if nc.useTaintBasedEvictions {

                    // 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
                    // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
                    if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
                        taintToAdd := *NotReadyTaintTemplate
                        if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
                            ...
                        }

                    // 將node加入到Tainer Queue中,交給Taint Controller處理
                    } else if nc.markNodeForTainting(node) {
                        ...
                    }

                // 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
                } else {

                    // 注意保證readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
                    if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {

                       // 將node加入到PodEvictor Queue中,交給PodEvictor處理
                        if nc.evictPods(node) {
                            ...
                        }
                    }
                }
            }

            // 同理地,當觀察到Node的Condition為Unknown時,根據是否useTaintBasedEvictions是否為true,分別進行處理
            if observedReadyCondition.Status == v1.ConditionUnknown {

                //  useTaintBasedEvictions為true時,
                if nc.useTaintBasedEvictions {

                    // 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
                    // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
                    if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
                        taintToAdd := *UnreachableTaintTemplate
                        if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
                        ...
                        }

                    // 將node加入到Tainer Queue中,交給Taint Controller處理
                    } else if nc.markNodeForTainting(node) {
                        ...
                    }

                // 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
                } else {

                    // 注意保證probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
                    if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {

                        // 將node加入到PodEvictor Queue中,交給PodEvictor處理
                        if nc.evictPods(node) {
                            ...
                        }
                    }
                }
            }

            // 同理地,當觀察到Node的Condition為True時,根據是否useTaintBasedEvictions是否為true,分別進行處理
            if observedReadyCondition.Status == v1.ConditionTrue {

             // useTaintBasedEvictions為true時
                if nc.useTaintBasedEvictions {

                    // 並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
                    removed, err := nc.markNodeAsHealthy(node)
                    ...
                } else {

                    // useTaintBasedEvictions為false時,將該node從對應的zonePodEvictor Queue中Remove
                    if nc.cancelPodEviction(node) {
                        ...
                    }
                }
            }

          // 如果Node Status狀態從Ready變為NotReady,則將給Node上的所有Pod Status設定為Not Ready
            // Report node event.
            if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
                recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
                if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
                    utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
                }
            }


            // Check with the cloud provider to see if the node still exists. If it
            // doesn't, delete the node immediately.
            ...
        }
    }

    // 處理Disruption
    nc.handleDisruption(zoneToNodeConditions, nodes)

    return nil
}
  • 對比knownNodeSet和nodes資料,得到對應的added和deleted Node列表
  • 遍歷added Node列表,表示Node Controller觀察到一個新的Node加入叢集
    • 將added node新增到knowNodeSet中
    • When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,則:
    • 設定該Node對應的新zone狀態為“Initial”
    • 如果Node Controller的useTaintBasedEvictions為false(–feature-gates中指定,預設TaintBasedEvictions=false),則新增該zone對應的zonePodEvictor,並設定evictionLimiterQPS(–node-eviction-rate設定,預設為0.1)
    • 如果Node Controller的useTaintBasedEvictions為true,則新增該zone對應的zoneNotReadyOrUnreachableTainer,並設定evictionLimiterQPS
    • 如果Node Controller的useTaintBasedEvictions為true,呼叫RemoveTaintOffNode將Node上對應的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
    • 如果Node Controller的useTaintBasedEvictions為false,即使用zonePodEvictor時,將該node從對應的zonePodEvictor Queue中Remove。
  • 遍歷deleted Nodes列表,將其從knowNodeSet中刪除
  • 遍歷所有nodes,
    • 更新Node Status,得到上一次觀察到的NodeCondition和當前的NodeCondition
    • 對於非master節點,將node對應的NodeCondition新增到zoneToNodeConditions Map中。
    • 當觀察到Node的Condition為NotReady時,根據是否useTaintBasedEvictions是否為true,分別進行處理。
    • useTaintBasedEvictions為true時,
      • 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
      • 將node加入到Tainer Queue中,交給Taint Controller處理
    • 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
      • 注意保證readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
      • 將node加入到PodEvictor Queue中,交給PodEvictor處理
    • 同理地,當觀察到Node的Condition為Unknown時,根據是否useTaintBasedEvictions是否為true,分別進行處理
    • useTaintBasedEvictions為true時,
      • 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
      • 將node加入到Tainer Queue中,交給Taint Controller處理
    • 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
      • 注意保證probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
      • 將node加入到PodEvictor Queue中,交給PodEvictor處理
    • 同理地,當觀察到Node的Condition為True時,根據是否useTaintBasedEvictions是否為true,分別進行處理
    • useTaintBasedEvictions為true時
      • 將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
    • useTaintBasedEvictions為false時,將該node從對應的zonePodEvictor Queue中Remove
    • 如果Node Status狀態從Ready變為NotReady,則將給Node上的所有Pod Status設定為Not Ready
  • 執行handleDisruption

下面我們接著看handleDisruption的邏輯:

pkg/controller/node/nodecontroller.go:772


func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
    newZoneStates := map[string]zoneState{}

    // 根據zoneToNodeConditions的資料,判斷allAreFullyDisrupted是否為true(表示基於當前觀察到的zone中nodeCondition來判斷出的當前cluster所有zone是否都為FullDisruption狀態)
    allAreFullyDisrupted := true
    for k, v := range zoneToNodeConditions {
        ZoneSize.WithLabelValues(k).Set(float64(len(v)))
        unhealthy, newState := nc.computeZoneStateFunc(v)
        ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
        UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
        if newState != stateFullDisruption {
            allAreFullyDisrupted = false
        }
        newZoneStates[k] = newState
        if _, had := nc.zoneStates[k]; !had {
            glog.Errorf("Setting initial state for unseen zone: %v", k)
            nc.zoneStates[k] = stateInitial
        }
    }

    // 根據zoneStates的資料,判斷allWasFullyDisrupted是否為true(表示基於上一次觀察到的zone中nodeCondition來判斷出的上一次cluster所有zone是否都為FullDisruption狀態)
    allWasFullyDisrupted := true
    for k, v := range nc.zoneStates {
        if _, have := zoneToNodeConditions[k]; !have {
            ZoneSize.WithLabelValues(k).Set(0)
            ZoneHealth.WithLabelValues(k).Set(100)
            UnhealthyNodes.WithLabelValues(k).Set(0)
            delete(nc.zoneStates, k)
            continue
        }
        if v != stateFullDisruption {
            allWasFullyDisrupted = false
            break
        }
    }

    // At least one node was responding in previous pass or in the current pass. Semantics is as follows:
    // - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
    // - if the new state is "normal" we resume normal operation (go back to default limiter settings),
    // - if new state is "fullDisruption" we restore normal eviction rate,
    //   - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
    if !allAreFullyDisrupted || !allWasFullyDisrupted {

       // 如果allAreFullyDisrupted為true且allWasFullyDisrupted為false,即cluster狀態從非FullDisruption變成為FullDisruption時,則遍歷所有nodes:
        // We're switching to full disruption mode
        if allAreFullyDisrupted {
            glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
            for i := range nodes {
                 // 如果useTaintBasedEvictions為true,則標記node為Healthy狀態(remove taint from node,並且從Tainter Queue中Remove該node)
                if nc.useTaintBasedEvictions {
                    _, err := nc.markNodeAsHealthy(nodes[i])
                    if err != nil {
                        glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
                    }
                } else {
                    // 如果useTaintBasedEvictions為false,則取消該node上的pod eviction(通過從zone pod Evictor queue中刪除該node)
                    nc.cancelPodEviction(nodes[i])
                }
            }

            // 設定所有zone的對應Tainter Queue或者Pod Evictor Queue的Rate Limeter為0,即表示停止所有的evictions。
            // We stop all evictions.
            for k := range nc.zoneStates {
                if nc.useTaintBasedEvictions {
                    nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
                } else {
                    nc.zonePodEvictor[k].SwapLimiter(0)
                }
            }

            // 更新所有zone的狀態(nc.zoneStates)為FullDisruption
            for k := range nc.zoneStates {
                nc.zoneStates[k] = stateFullDisruption
            }
            // All rate limiters are updated, so we can return early here.
            return
        }

        //  如果allWasFullyDisrupted為true且allAreFullyDisrupted為false,即cluster狀態從FullDisruption變成為非FullDisruption時,則遍歷所有nodes:
        // We're exiting full disruption mode
        if allWasFullyDisrupted {
            glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")

            // When exiting disruption mode update probe timestamps on all Nodes.
            now := nc.now()
            for i := range nodes {
                v := nc.nodeStatusMap[nodes[i].Name]
                v.probeTimestamp = now
                v.readyTransitionTimestamp = now
                nc.nodeStatusMap[nodes[i].Name] = v
            }


            // 根據zone size和zone state,重新設定對應的Disruption rate limiter的值。
            // We reset all rate limiters to settings appropriate for the given state.
            for k := range nc.zoneStates {
                nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
                nc.zoneStates[k] = newZoneStates[k]
            }
            return
        }


        // 如果allWasFullyDisrupted為false且allAreFullyDisrupted為false,即cluster狀態保持為非FullDisruption時,則根據zone size和zone state,重新設定對應的Disruption rate limiter的值。
        // We know that there's at least one not-fully disrupted so,
        // we can use default behavior for rate limiters
        for k, v := range nc.zoneStates {
            newState := newZoneStates[k]
            if v == newState {
                continue
            }
            glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
            nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
            nc.zoneStates[k] = newState
        }
    }
}

因此,handleDisruption的邏輯為:

  • 根據zoneToNodeConditions的資料,判斷allAreFullyDisrupted是否為true(表示基於當前觀察到的zone中nodeCondition來判斷出的當前cluster所有zone是否都為FullDisruption狀態)
  • 根據zoneStates的資料,判斷allWasFullyDisrupted是否為true(表示基於上一次觀察到的zone中nodeCondition來判斷出的上一次cluster所有zone是否都為FullDisruption狀態)
  • 如果allAreFullyDisrupted為true且allWasFullyDisrupted為false,即cluster狀態從非FullDisruption變成為FullDisruption時,表示switching to full disruption mode,則遍歷所有nodes:
    • 如果useTaintBasedEvictions為true,則標記node為Healthy狀態(remove taint from node,並且從Tainter Queue中Remove該node)
    • 如果useTaintBasedEvictions為false,則取消該node上的pod eviction(通過從zone pod Evictor queue中刪除該node)
    • 設定所有zone的對應Tainter Queue或者Pod Evictor Queue的Rate Limeter為0,即表示停止所有的evictions。
    • 更新所有zone的狀態(nc.zoneStates)為FullDisruption
  • 如果allWasFullyDisrupted為true且allAreFullyDisrupted為false,即cluster狀態從FullDisruption變成為非FullDisruption時,表示 exiting disruption mode 則遍歷所有nodes:
    • update probe timestamps on all Nodes.
    • 根據zone size和zone state,呼叫setLimiterInZone重新設定對應的Disruption rate limiter的值。
  • 如果allWasFullyDisrupted為false且allAreFullyDisrupted為false,即cluster狀態保持為非FullDisruption時,則根據zone size和zone state,呼叫setLimiterInZone重新設定對應的Disruption rate limiter的值。

下面接著來看setLimiterInZone的邏輯,它是如何根據zone size和zone state對應到不同的rate limiter的。

pkg/controller/node/nodecontroller.go:870

func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
    switch state {

    // 如果zone state為normal,則設定對應的rate limiter為evictionLimiterQPS(預設為0.1)
    case stateNormal:
        if nc.useTaintBasedEvictions {
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
        }

    // 如果zone state為PartialDisruption,則呼叫nc.enterPartialDisruptionFunc來設定對應的rate limiter。
    case statePartialDisruption:
        if nc.useTaintBasedEvictions {
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterPartialDisruptionFunc(zoneSize))
        }

    // 如果zone state為FullDisruption,則呼叫nc.enterFullDisruptionFunc來設定對應的rate limiter。
    case stateFullDisruption:
        if nc.useTaintBasedEvictions {
            nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
        } else {
            nc.zonePodEvictor[zone].SwapLimiter(
                nc.enterFullDisruptionFunc(zoneSize))
        }
    }
}

nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在呼叫NewNodeController建立Node Controller的時候賦值註冊的:

pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) {
    ...
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    ...
}    

pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
    return nc.evictionLimiterQPS
}

// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
    if int32(nodeNum) > nc.largeClusterThreshold {
        return nc.secondaryEvictionLimiterQPS
    }
    return 0
}

因此setLimiterInZone的邏輯為:

  • zone state為PartialDisruption時,設定Tainter Queue或者Pod Evictor Queue的rate limiter為:

    • 如果當前zone size大於nc.largeClusterThreshold(預設為50),則設定為secondaryEvictionLimiterQPS(預設為0.01)
    • 否則設定為0
  • zone state為FullDisruption時,設定Tainter Queue或者Pod Evictor Queue的rate limiter為evictionLimiterQPS(預設0.1)

  • 如果zone state為normal,則設定Tainter Queue或者Pod Evictor Queue的rate limiter為evictionLimiterQPS(預設為0.1)

Run TaintManager

在Node Controller的Run方法中,接著會啟動Tainter Manager:

if nc.runTaintManager {
    go nc.taintManager.Run(wait.NeverStop)
}

nc.runTaintManager通過--enable-taint-manager設定,預設為true,因此預設情況下都會啟動Taint Manager。

接下來,我們看看Taint Manager Run方法中都做了些什麼。

pkg/controller/node/taint_controller.go:179

// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
    glog.V(0).Infof("Starting NoExecuteTaintManager")
    // Functions that are responsible for taking work items out of the workqueues and putting them
    // into channels.
    go func(stopCh <-chan struct{}) {
        for {
            item, shutdown := tc.nodeUpdateQueue.Get()
            if shutdown {
                break
            }
            nodeUpdate := item.(*nodeUpdateItem)
            select {
            case <-stopCh:
                break
            case tc.nodeUpdateChannel <- nodeUpdate:
            }
        }
    }(stopCh)

    go func(stopCh <-chan struct{}) {
        for {
            item, shutdown := tc.podUpdateQueue.Get()
            if shutdown {
                break
            }
            podUpdate := item.(*podUpdateItem)
            select {
            case <-stopCh:
                break
            case tc.podUpdateChannel <- podUpdate:
            }
        }
    }(stopCh)

    // When processing events we want to prioritize Node updates over Pod updates,
    // as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
    // we don't want user (or system) to wait until PodUpdate queue is drained before it can
    // start evicting Pods from tainted Nodes.
    for {
        select {
        case <-stopCh:
            break
        case nodeUpdate := <-tc.nodeUpdateChannel:
            tc.handleNodeUpdate(nodeUpdate)
        case podUpdate := <-tc.podUpdateChannel:
            // If we found a Pod update we need to empty Node queue first.
        priority:
            for {
                select {
                case nodeUpdate := <-tc.nodeUpdateChannel:
                    tc.handleNodeUpdate(nodeUpdate)
                default:
                    break priority
                }
            }
            // After Node queue is emptied we process podUpdate.
            tc.handlePodUpdate(podUpdate)
        }
    }
}
  • 啟動一個goroutine從NoExecuteTaintManager的nodeUpdateQueue中獲取nodeUpdate,並扔給tc.nodeUpdateChannel。
  • 啟動一個goroutine從NoExecuteTaintManager的podUpdateQueue中獲取podUpdate,並扔給tc.podUpdateChannel。
  • 呼叫tc.handleNodeUpdate處理完所有nodeUpdate後,才會去呼叫tc.handlePodUpdate開始處理podUpdate。

關於NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我會在之後專門對Taint Manager寫一篇部落格進行分析,在此就不會再深入下去。

doTaintingPass

如果useTaintBasedEvictions為true,即--feature-gates中指定TaintBasedEvictions為true(預設TaintBasedEvictions=false)則每隔100ms呼叫一次doTaintingPass。

doTaintingPass就是根據Node Condition是NotReady或者Unknown,調apiserver,分別給node打上對應的Taint:node.alpha.kubernetes.io/notReadynode.alpha.kubernetes.io/unreachable

if nc.useTaintBasedEvictions {
    // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
    // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
    go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
}

doEvictionPass

如果useTaintBasedEvictions為false(預設TaintBasedEvictions=false)則每隔100ms呼叫一次doEvictionPass。

else {
    // Managing eviction of nodes:
    // When we delete pods off a node, if the node was not empty at the time we then
    // queue an eviction watcher. If we hit an error, retry deletion.
    go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}

我們接著來看doEvictionPass的程式碼:

pkg/controller/node/nodecontroller.go:481

func (nc *NodeController) doEvictionPass() {
    ...

    // 遍歷所有zone的pod Evictor,從pod Evictor queue中獲取node name,然後呼叫deletePods刪除node上的所有pods(deamonSet對應的Pod除外)
    for k := range nc.zonePodEvictor {
        // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
        nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
            node, err := nc.nodeLister.Get(value.Value)
            ...
            nodeUid, _ := value.UID.(string)
            remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
            ...
            return true, 0
        })
    }
}

doEvictionPass的邏輯:

  • 遍歷所有zone的pod Evictor,從pod Evictor queue中獲取node name,
  • 然後呼叫deletePods刪除node上的所有pods(deamonSet對應的Pod除外)

deletePods的程式碼如下,

pkg/controller/node/controller_utils.go:49

// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
    ...
    // 從apiserver中獲取所有的pods物件。
    pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
    ...

    // 逐個遍歷pods中的pod,篩選出該node上的pods
    for _, pod := range pods.Items {
        // Defensive check, also needed for tests.
        if pod.Spec.NodeName != nodeName {
            continue
        }

        ...

        // if the pod has already been marked for deletion, we still return true that there are remaining pods.
        if pod.DeletionGracePeriodSeconds != nil {
            remaining = true
            continue
        }

        // if the pod is managed by a daemonset, ignore it
        _, err := daemonStore.GetPodDaemonSets(&pod)
        if err == nil { // No error means at least one daemonset was found
            continue
        }

        ...

        // 呼叫apiserver介面刪除pod
        if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
            return false, err
        }
        remaining = true
    }

    ...
    return remaining, nil
}

deletePods的主要邏輯如下:

  • 從apiserver中獲取所有的pods物件。
  • 逐個遍歷pods中的pod,篩選出該node上的pods
  • 如果pod已經被標記為刪除(pod.DeletionGracePeriodSeconds != nil ),我們跳過這個pod.
  • 如果pod是某個daemonset的pod,我們跳過這個pod。
  • 除此之外,呼叫apiserver介面刪除pod。

至此,Node Controller Run方法的所有主要邏輯都已分析完成。

其中涉及到Taint Manager Run的相關邏輯,在該博文沒有深入進去分析,我將在後續博文中對Taint Manager做一次專門的分析。

相關推薦

Kubernetes Node Controller原始碼分析執行

Author: [email protected] 摘要:我認為,Node Controller是Kubernetes幾十個Controller中最為重要的Controller之一,其重要程度在Top3,然而這可能也是最為複雜的一個Contr

Kubernetes PodGC Controller原始碼分析

Author: [email protected] PodGC Controller配置 關於PodGC Controller的相關配置(kube-controller-manager配置),一共只有兩個: flag def

Android事件分發機制原始碼分析Activity

在之前的事件分發分析中,曾提及到View的事件是由ViewGroup分發的,然而ViewGroup的事件我們只是稍微帶過是由Activity分發的。而我們知道,事件產生於使用者按下螢幕的一瞬間,事件生成後,經過一系列的過程來到我們的Activity層,那麼事件是怎樣從Activity傳遞

Spring Cloud原始碼分析Eureka第六章:服務註冊

在文章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》的分析中,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledT

Spring Cloud原始碼分析Eureka第八章:服務註冊名稱的來歷

關於服務註冊名稱 服務註冊名稱,是指Eureka client註冊到Eureka server時,用於標記自己身份的標誌,舉例說明,以下是個簡單的Eureka client配置: server: port: 8082 spring: applicatio

Python 原始碼分析執行時環境

python 執行時環境 執行環境是一個全域性的概念,而執行環境就是指棧幀 當執行時環境已經準備好的時候,執行第一行程式碼的函式就是 PyEval_EvalFrame 函式 PyObject * PyEval_EvalFrame(PyFrameObject *f) {

騰訊效能監控框架Matrix原始碼分析第一

騰訊效能監控框架Matrix原始碼分析之第一篇 概述 前幾天騰訊將一款Android應用效能監控的框架matrix開源了,原始碼地址在github.com/Tencent/mat…,作者是微信終端團隊。matrix到底是什麼?據官方說法如下: Matrix 是一款微信研發並日常使用的 APM(Applic

Jetty原始碼分析執行緒池:QueuedThreadPool

前面分析Jetty整體架構的時候介紹過Jetty的三大元件:Acceptor、Handler和ThreadPool;前兩者工作的時候都是需要執行緒的,而所需的執行緒正是從ThreadPool中獲取的。這篇檔案就是來分析ThreadPool的一個具體實現:Queu

Spring Cloud原始碼分析Eureka第五章:更新服務列表

在上一章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTask

Monkey原始碼分析執行流程

在《MonkeyRunner原始碼分析之與Android裝置通訊方式》中,我們談及到MonkeyRunner控制目標android裝置有多種方法,其中之一就是在目標機器啟動一個monkey服務來監聽指定的一個埠,然後monkeyrunner再連線上這個埠來發送命令,驅動mo

kubernetes/k8s原始碼分析】kubelet原始碼分析cdvisor原始碼分析

  資料流 UnsecuredDependencies -> run   1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv

kubernetes/k8s原始碼分析】kubelet原始碼分析容器網路初始化原始碼分析

一. 網路基礎   1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash   1.2 bridge-nf-c

kubernetes/k8s原始碼分析】kubelet原始碼分析資源上報

0. 資料流   路徑: pkg/kubelet/kubelet.go   Run函式() ->   syncNodeStatus ()  ->   registerWithAPIServer() ->

kubernetes/k8s原始碼分析】kubelet原始碼分析啟動容器

主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime

一步步實現windows版ijkplayer系列文章三——Ijkplayer播放器原始碼分析音視訊輸出——音訊

一步步實現windows版ijkplayer系列文章之三——Ijkplayer播放器原始碼分析之音視訊輸出——音訊篇 這篇文章的ijkplayer音訊原始碼研究我們還是選擇Android平臺,它的音訊解碼是不支援硬解的,音訊播放使用的API是OpenSL ES或AudioTrack。 OpenSL ES

【搞定Java併發程式設計】第17:佇列同步器AQS原始碼分析共享模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過上一篇文章的的分析,我們知道獨佔模式獲取同步狀態(或者說獲取鎖

【搞定Java併發程式設計】第16:佇列同步器AQS原始碼分析獨佔模式

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取

【搞定Java併發程式設計】第15:佇列同步器AQS原始碼分析概要分析

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 先推薦兩篇不錯的博文: 1、一行一行原始碼分析清楚Abstract

【搞定Java併發程式設計】第18:佇列同步器AQS原始碼分析Condition介面、等待佇列

AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過前面三篇關於AQS文章的學習,我們深入瞭解了AbstractQ

Spring Cloud Netflix Zuul原始碼分析路由註冊

微信公眾號:I am CR7如有問題或建議,請在下方留言;最近更新:2018-12-29 前言 繼上一篇Spring Cloud Netflix Zuul原始碼分析之預熱篇,我們知道了兩個重要的類:ZuulHandlerMapping和SimpleControllerHandlerA