Kubernetes Node Controller原始碼分析之執行篇
Author: [email protected]
摘要:我認為,Node Controller是Kubernetes幾十個Controller中最為重要的Controller之一,其重要程度在Top3,然而這可能也是最為複雜的一個Controller,因此對其的原始碼分析,我將做一個系列文章,希望能幫助自己有一個深入淺出的理解。本博文從NodeController的Run方法作為入口,對其工作原理作了跟蹤分析。
Node Controller的執行
Node Controller的Run方法如下,這是所有Node Controller真正處理邏輯的入口。
pkg/controller/node/nodecontroller.go :550
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *NodeController) Run() {
go func() {
defer utilruntime.HandleCrash()
if !cache.WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync" ))
return
}
// Incorporate the results of node status pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, nc.nodeMonitorPeriod, wait.NeverStop)
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
}()
}
WaitForCacheSync(wait.NeverStop, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced)
- Node Controller首先呼叫WaitForCacheSync,等待PodInformer、NodeInformer、DaemonSetInformer的HasSyncs都返回true,即這三個API Object都完成同步。
vendor/k8s.io/client-go/tools/cache/shared_informer.go:100
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
// 每隔100ms遍歷一次cacheSyncs中的InformerSynced方法,
// 當所有要求的cacheSyncs方法都返回true,
// 意味著所有要求的cache都已經同步後,則WaitForCacheSync返回true,
// 否則繼續遍歷。
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
glog.V(2).Infof("stop requested")
return false
}
glog.V(4).Infof("caches populated")
return true
}
WaitForCacheSync的實現邏輯是:
每隔100ms遍歷一次cacheSyncs中的InformerSynced方法,當所有要求的cacheSyncs方法都返回true,意味著所有要求的cache都已經同步後,則WaitForCacheSync返回true,
否則按照100ms的週期繼續遍歷,知道返回true或者受到stop訊號為止。
啟動goruntime按照5s的週期執行monitorNodeStatus,進行Node狀態監控
pkg/controller/node/nodecontroller.go:586
// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// not reachable for a long period of time.
func (nc *NodeController) monitorNodeStatus() error {
// We are listing nodes from local cache as we can tolerate some small delays
// comparing to state from etcd and there is eventual consistency anyway.
nodes, err := nc.nodeLister.List(labels.Everything())
if err != nil {
return err
}
// 對比knownNodeSet和nodes資料,得到對應的added和deleted Node列表
added, deleted := nc.checkForNodeAddedDeleted(nodes)
// 遍歷added Node列表,表示Node Controller觀察到一個新的Node加入叢集
for i := range added {
...
// 將added node新增到knowNodeSet中
nc.knownNodeSet[added[i].Name] = added[i]
// When adding new Nodes we need to check if new zone appeared, and if so add new evictor.
zone := utilnode.GetZoneKey(added[i])
if _, found := nc.zoneStates[zone]; !found {
// 設定該Node對應的新zone狀態為“Initial”
nc.zoneStates[zone] = stateInitial
// 如果Node Controller的useTaintBasedEvictions為false(--feature-gates中指定,預設TaintBasedEvictions=false),
// 則新增該zone對應的zonePodEvictor,並設定evictionLimiterQPS(--node-eviction-rate設定,預設為0.1)
if !nc.useTaintBasedEvictions {
nc.zonePodEvictor[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
} else {
// 如果Node Controller的useTaintBasedEvictions為true,
// 則新增該zone對應的zoneNotReadyOrUnreachableTainer,並設定evictionLimiterQPS
nc.zoneNotReadyOrUnreachableTainer[zone] =
NewRateLimitedTimedQueue(
flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, evictionRateLimiterBurst))
}
...
}
// 如果Node Controller的useTaintBasedEvictions為true,呼叫RemoveTaintOffNode將Node上對應的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,
// 並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
if nc.useTaintBasedEvictions {
nc.markNodeAsHealthy(added[i])
} else {
// 如果Node Controller的useTaintBasedEvictions為false,即使用zonePodEvictor時,
// 將該node從對應的zonePodEvictor Queue中Remove
nc.cancelPodEviction(added[i])
}
}
// 遍歷deleted Nodes列表,將其從knowNodeSet中刪除
for i := range deleted {
...
delete(nc.knownNodeSet, deleted[i].Name)
}
zoneToNodeConditions := map[string][]*v1.NodeCondition{}
for i := range nodes {
...
// PollImmediate tries a condition func until it returns true, an error, or the timeout is reached.
// retrySleepTime為20ms,timeout為100ms。
if err := wait.PollImmediate(retrySleepTime, retrySleepTime*nodeStatusUpdateRetry, func() (bool, error) {
// nc.tryUpdateNodeStatus - For a given node checks its conditions and tries to update it.
// Returns grace period to which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeStatus(node)
if err == nil {
return true, nil
}
...
});
...
}
// 對於非master節點,將node對應的NodeCondition新增到zoneToNodeConditions Map中。
// We do not treat a master node as a part of the cluster for network disruption checking.
if !system.IsMasterNode(node.Name) {
zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
}
decisionTimestamp := nc.now()
if currentReadyCondition != nil {
// 當觀察到Node的Condition為NotReady時,根據是否useTaintBasedEvictions是否為true,分別進行處理
// Check eviction timeout against decisionTimestamp
if observedReadyCondition.Status == v1.ConditionFalse {
// useTaintBasedEvictions為true時,
if nc.useTaintBasedEvictions {
// 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
taintToAdd := *NotReadyTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, UnreachableTaintTemplate, node) {
...
}
// 將node加入到Tainer Queue中,交給Taint Controller處理
} else if nc.markNodeForTainting(node) {
...
}
// 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
} else {
// 注意保證readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
// 將node加入到PodEvictor Queue中,交給PodEvictor處理
if nc.evictPods(node) {
...
}
}
}
}
// 同理地,當觀察到Node的Condition為Unknown時,根據是否useTaintBasedEvictions是否為true,分別進行處理
if observedReadyCondition.Status == v1.ConditionUnknown {
// useTaintBasedEvictions為true時,
if nc.useTaintBasedEvictions {
// 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
// We want to update the taint straight away if Node is already tainted with the UnreachableTaint
if v1.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
taintToAdd := *UnreachableTaintTemplate
if !swapNodeControllerTaint(nc.kubeClient, &taintToAdd, NotReadyTaintTemplate, node) {
...
}
// 將node加入到Tainer Queue中,交給Taint Controller處理
} else if nc.markNodeForTainting(node) {
...
}
// 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
} else {
// 注意保證probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
if decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout)) {
// 將node加入到PodEvictor Queue中,交給PodEvictor處理
if nc.evictPods(node) {
...
}
}
}
}
// 同理地,當觀察到Node的Condition為True時,根據是否useTaintBasedEvictions是否為true,分別進行處理
if observedReadyCondition.Status == v1.ConditionTrue {
// useTaintBasedEvictions為true時
if nc.useTaintBasedEvictions {
// 並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
removed, err := nc.markNodeAsHealthy(node)
...
} else {
// useTaintBasedEvictions為false時,將該node從對應的zonePodEvictor Queue中Remove
if nc.cancelPodEviction(node) {
...
}
}
}
// 如果Node Status狀態從Ready變為NotReady,則將給Node上的所有Pod Status設定為Not Ready
// Report node event.
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
recordNodeStatusChange(nc.recorder, node, "NodeNotReady")
if err = markAllPodsNotReady(nc.kubeClient, node); err != nil {
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
}
}
// Check with the cloud provider to see if the node still exists. If it
// doesn't, delete the node immediately.
...
}
}
// 處理Disruption
nc.handleDisruption(zoneToNodeConditions, nodes)
return nil
}
- 對比knownNodeSet和nodes資料,得到對應的added和deleted Node列表
- 遍歷added Node列表,表示Node Controller觀察到一個新的Node加入叢集
- 將added node新增到knowNodeSet中
- When adding new Nodes we need to check if new zone appeared, and if so add new evictor.如果是新zone,則:
- 設定該Node對應的新zone狀態為“Initial”
- 如果Node Controller的useTaintBasedEvictions為false(–feature-gates中指定,預設TaintBasedEvictions=false),則新增該zone對應的zonePodEvictor,並設定evictionLimiterQPS(–node-eviction-rate設定,預設為0.1)
- 如果Node Controller的useTaintBasedEvictions為true,則新增該zone對應的zoneNotReadyOrUnreachableTainer,並設定evictionLimiterQPS
- 如果Node Controller的useTaintBasedEvictions為true,呼叫RemoveTaintOffNode將Node上對應的Taints(node.alpha.kubernetes.io/notReady和node.alpha.kubernetes.io/unreachable)清除掉,並將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
- 如果Node Controller的useTaintBasedEvictions為false,即使用zonePodEvictor時,將該node從對應的zonePodEvictor Queue中Remove。
- 遍歷deleted Nodes列表,將其從knowNodeSet中刪除
- 遍歷所有nodes,
- 更新Node Status,得到上一次觀察到的NodeCondition和當前的NodeCondition
- 對於非master節點,將node對應的NodeCondition新增到zoneToNodeConditions Map中。
- 當觀察到Node的Condition為NotReady時,根據是否useTaintBasedEvictions是否為true,分別進行處理。
- useTaintBasedEvictions為true時,
- 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
- 將node加入到Tainer Queue中,交給Taint Controller處理
- 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
- 注意保證readyTransitionTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
- 將node加入到PodEvictor Queue中,交給PodEvictor處理
- 同理地,當觀察到Node的Condition為Unknown時,根據是否useTaintBasedEvictions是否為true,分別進行處理
- useTaintBasedEvictions為true時,
- 如果該node的已經被Taint為UnreachableTaint,則將其改成NotReadyTaint
- 將node加入到Tainer Queue中,交給Taint Controller處理
- 如果useTaintBasedEvictions為false時,表示使用Pod Eivict方式。
- 注意保證probeTimestamp + podEvictionTimeout(default 5min) > decisionTimestamp(當前時間)
- 將node加入到PodEvictor Queue中,交給PodEvictor處理
- 同理地,當觀察到Node的Condition為True時,根據是否useTaintBasedEvictions是否為true,分別進行處理
- useTaintBasedEvictions為true時
- 將其從zoneNotReadyOrUnreachableTainer Queue中Remove(如果它在這個Queue中)
- useTaintBasedEvictions為false時,將該node從對應的zonePodEvictor Queue中Remove
- 如果Node Status狀態從Ready變為NotReady,則將給Node上的所有Pod Status設定為Not Ready
- 執行handleDisruption
下面我們接著看handleDisruption
的邏輯:
pkg/controller/node/nodecontroller.go:772
func (nc *NodeController) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {
newZoneStates := map[string]zoneState{}
// 根據zoneToNodeConditions的資料,判斷allAreFullyDisrupted是否為true(表示基於當前觀察到的zone中nodeCondition來判斷出的當前cluster所有zone是否都為FullDisruption狀態)
allAreFullyDisrupted := true
for k, v := range zoneToNodeConditions {
ZoneSize.WithLabelValues(k).Set(float64(len(v)))
unhealthy, newState := nc.computeZoneStateFunc(v)
ZoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))
UnhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))
if newState != stateFullDisruption {
allAreFullyDisrupted = false
}
newZoneStates[k] = newState
if _, had := nc.zoneStates[k]; !had {
glog.Errorf("Setting initial state for unseen zone: %v", k)
nc.zoneStates[k] = stateInitial
}
}
// 根據zoneStates的資料,判斷allWasFullyDisrupted是否為true(表示基於上一次觀察到的zone中nodeCondition來判斷出的上一次cluster所有zone是否都為FullDisruption狀態)
allWasFullyDisrupted := true
for k, v := range nc.zoneStates {
if _, have := zoneToNodeConditions[k]; !have {
ZoneSize.WithLabelValues(k).Set(0)
ZoneHealth.WithLabelValues(k).Set(100)
UnhealthyNodes.WithLabelValues(k).Set(0)
delete(nc.zoneStates, k)
continue
}
if v != stateFullDisruption {
allWasFullyDisrupted = false
break
}
}
// At least one node was responding in previous pass or in the current pass. Semantics is as follows:
// - if the new state is "partialDisruption" we call a user defined function that returns a new limiter to use,
// - if the new state is "normal" we resume normal operation (go back to default limiter settings),
// - if new state is "fullDisruption" we restore normal eviction rate,
// - unless all zones in the cluster are in "fullDisruption" - in that case we stop all evictions.
if !allAreFullyDisrupted || !allWasFullyDisrupted {
// 如果allAreFullyDisrupted為true且allWasFullyDisrupted為false,即cluster狀態從非FullDisruption變成為FullDisruption時,則遍歷所有nodes:
// We're switching to full disruption mode
if allAreFullyDisrupted {
glog.V(0).Info("NodeController detected that all Nodes are not-Ready. Entering master disruption mode.")
for i := range nodes {
// 如果useTaintBasedEvictions為true,則標記node為Healthy狀態(remove taint from node,並且從Tainter Queue中Remove該node)
if nc.useTaintBasedEvictions {
_, err := nc.markNodeAsHealthy(nodes[i])
if err != nil {
glog.Errorf("Failed to remove taints from Node %v", nodes[i].Name)
}
} else {
// 如果useTaintBasedEvictions為false,則取消該node上的pod eviction(通過從zone pod Evictor queue中刪除該node)
nc.cancelPodEviction(nodes[i])
}
}
// 設定所有zone的對應Tainter Queue或者Pod Evictor Queue的Rate Limeter為0,即表示停止所有的evictions。
// We stop all evictions.
for k := range nc.zoneStates {
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[k].SwapLimiter(0)
} else {
nc.zonePodEvictor[k].SwapLimiter(0)
}
}
// 更新所有zone的狀態(nc.zoneStates)為FullDisruption
for k := range nc.zoneStates {
nc.zoneStates[k] = stateFullDisruption
}
// All rate limiters are updated, so we can return early here.
return
}
// 如果allWasFullyDisrupted為true且allAreFullyDisrupted為false,即cluster狀態從FullDisruption變成為非FullDisruption時,則遍歷所有nodes:
// We're exiting full disruption mode
if allWasFullyDisrupted {
glog.V(0).Info("NodeController detected that some Nodes are Ready. Exiting master disruption mode.")
// When exiting disruption mode update probe timestamps on all Nodes.
now := nc.now()
for i := range nodes {
v := nc.nodeStatusMap[nodes[i].Name]
v.probeTimestamp = now
v.readyTransitionTimestamp = now
nc.nodeStatusMap[nodes[i].Name] = v
}
// 根據zone size和zone state,重新設定對應的Disruption rate limiter的值。
// We reset all rate limiters to settings appropriate for the given state.
for k := range nc.zoneStates {
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])
nc.zoneStates[k] = newZoneStates[k]
}
return
}
// 如果allWasFullyDisrupted為false且allAreFullyDisrupted為false,即cluster狀態保持為非FullDisruption時,則根據zone size和zone state,重新設定對應的Disruption rate limiter的值。
// We know that there's at least one not-fully disrupted so,
// we can use default behavior for rate limiters
for k, v := range nc.zoneStates {
newState := newZoneStates[k]
if v == newState {
continue
}
glog.V(0).Infof("NodeController detected that zone %v is now in state %v.", k, newState)
nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)
nc.zoneStates[k] = newState
}
}
}
因此,handleDisruption的邏輯為:
- 根據zoneToNodeConditions的資料,判斷allAreFullyDisrupted是否為true(表示基於當前觀察到的zone中nodeCondition來判斷出的當前cluster所有zone是否都為FullDisruption狀態)
- 根據zoneStates的資料,判斷allWasFullyDisrupted是否為true(表示基於上一次觀察到的zone中nodeCondition來判斷出的上一次cluster所有zone是否都為FullDisruption狀態)
- 如果allAreFullyDisrupted為true且allWasFullyDisrupted為false,即cluster狀態從非FullDisruption變成為FullDisruption時,表示switching to full disruption mode,則遍歷所有nodes:
- 如果useTaintBasedEvictions為true,則標記node為Healthy狀態(remove taint from node,並且從Tainter Queue中Remove該node)
- 如果useTaintBasedEvictions為false,則取消該node上的pod eviction(通過從zone pod Evictor queue中刪除該node)
- 設定所有zone的對應Tainter Queue或者Pod Evictor Queue的Rate Limeter為0,即表示停止所有的evictions。
- 更新所有zone的狀態(nc.zoneStates)為FullDisruption
- 如果allWasFullyDisrupted為true且allAreFullyDisrupted為false,即cluster狀態從FullDisruption變成為非FullDisruption時,表示 exiting disruption mode 則遍歷所有nodes:
- update probe timestamps on all Nodes.
- 根據zone size和zone state,呼叫
setLimiterInZone
重新設定對應的Disruption rate limiter的值。
- 如果allWasFullyDisrupted為false且allAreFullyDisrupted為false,即cluster狀態保持為非FullDisruption時,則根據zone size和zone state,呼叫
setLimiterInZone
重新設定對應的Disruption rate limiter的值。
下面接著來看setLimiterInZone的邏輯,它是如何根據zone size和zone state對應到不同的rate limiter的。
pkg/controller/node/nodecontroller.go:870
func (nc *NodeController) setLimiterInZone(zone string, zoneSize int, state zoneState) {
switch state {
// 如果zone state為normal,則設定對應的rate limiter為evictionLimiterQPS(預設為0.1)
case stateNormal:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(nc.evictionLimiterQPS)
} else {
nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)
}
// 如果zone state為PartialDisruption,則呼叫nc.enterPartialDisruptionFunc來設定對應的rate limiter。
case statePartialDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterPartialDisruptionFunc(zoneSize))
}
// 如果zone state為FullDisruption,則呼叫nc.enterFullDisruptionFunc來設定對應的rate limiter。
case stateFullDisruption:
if nc.useTaintBasedEvictions {
nc.zoneNotReadyOrUnreachableTainer[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
} else {
nc.zonePodEvictor[zone].SwapLimiter(
nc.enterFullDisruptionFunc(zoneSize))
}
}
}
nc.enterFullDisruptionFunc和nc.enterPartialDisruptionFunc是在呼叫NewNodeController建立Node Controller的時候賦值註冊的:
pkg/controller/node/nodecontroller.go:270
func NewNodeController(...) (*NodeController, error) {
...
nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
...
}
pkg/controller/node/nodecontroller.go:1132
// Default value for cluster eviction rate - we take nodeNum for consistency with ReducedQPSFunc.
func (nc *NodeController) HealthyQPSFunc(nodeNum int) float32 {
return nc.evictionLimiterQPS
}
// If the cluster is large make evictions slower, if they're small stop evictions altogether.
func (nc *NodeController) ReducedQPSFunc(nodeNum int) float32 {
if int32(nodeNum) > nc.largeClusterThreshold {
return nc.secondaryEvictionLimiterQPS
}
return 0
}
因此setLimiterInZone的邏輯為:
zone state為PartialDisruption時,設定Tainter Queue或者Pod Evictor Queue的rate limiter為:
- 如果當前zone size大於
nc.largeClusterThreshold
(預設為50),則設定為secondaryEvictionLimiterQPS
(預設為0.01) - 否則設定為0
- 如果當前zone size大於
zone state為FullDisruption時,設定Tainter Queue或者Pod Evictor Queue的rate limiter為
evictionLimiterQPS
(預設0.1)如果zone state為normal,則設定Tainter Queue或者Pod Evictor Queue的rate limiter為
evictionLimiterQPS
(預設為0.1)
Run TaintManager
在Node Controller的Run方法中,接著會啟動Tainter Manager:
if nc.runTaintManager {
go nc.taintManager.Run(wait.NeverStop)
}
nc.runTaintManager通過--enable-taint-manager
設定,預設為true,因此預設情況下都會啟動Taint Manager。
接下來,我們看看Taint Manager Run方法中都做了些什麼。
pkg/controller/node/taint_controller.go:179
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
glog.V(0).Infof("Starting NoExecuteTaintManager")
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(*nodeUpdateItem)
select {
case <-stopCh:
break
case tc.nodeUpdateChannel <- nodeUpdate:
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(*podUpdateItem)
select {
case <-stopCh:
break
case tc.podUpdateChannel <- podUpdate:
}
}
}(stopCh)
// When processing events we want to prioritize Node updates over Pod updates,
// as NodeUpdates that interest NoExecuteTaintManager should be handled as soon as possible -
// we don't want user (or system) to wait until PodUpdate queue is drained before it can
// start evicting Pods from tainted Nodes.
for {
select {
case <-stopCh:
break
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
case podUpdate := <-tc.podUpdateChannel:
// If we found a Pod update we need to empty Node queue first.
priority:
for {
select {
case nodeUpdate := <-tc.nodeUpdateChannel:
tc.handleNodeUpdate(nodeUpdate)
default:
break priority
}
}
// After Node queue is emptied we process podUpdate.
tc.handlePodUpdate(podUpdate)
}
}
}
- 啟動一個goroutine從NoExecuteTaintManager的nodeUpdateQueue中獲取nodeUpdate,並扔給tc.nodeUpdateChannel。
- 啟動一個goroutine從NoExecuteTaintManager的podUpdateQueue中獲取podUpdate,並扔給tc.podUpdateChannel。
- 呼叫
tc.handleNodeUpdate
處理完所有nodeUpdate後,才會去呼叫tc.handlePodUpdate
開始處理podUpdate。
關於NoExecuteTaintManager的handleNodeUpdate和handlePodUpdate,我會在之後專門對Taint Manager寫一篇部落格進行分析,在此就不會再深入下去。
doTaintingPass
如果useTaintBasedEvictions為true,即--feature-gates
中指定TaintBasedEvictions為true(預設TaintBasedEvictions=false)則每隔100ms呼叫一次doTaintingPass。
doTaintingPass就是根據Node Condition是NotReady或者Unknown,調apiserver,分別給node打上對應的Taint:node.alpha.kubernetes.io/notReady
和node.alpha.kubernetes.io/unreachable
。
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doTaintingPass, nodeEvictionPeriod, wait.NeverStop)
}
doEvictionPass
如果useTaintBasedEvictions為false(預設TaintBasedEvictions=false)則每隔100ms呼叫一次doEvictionPass。
else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, nodeEvictionPeriod, wait.NeverStop)
}
我們接著來看doEvictionPass
的程式碼:
pkg/controller/node/nodecontroller.go:481
func (nc *NodeController) doEvictionPass() {
...
// 遍歷所有zone的pod Evictor,從pod Evictor queue中獲取node name,然後呼叫deletePods刪除node上的所有pods(deamonSet對應的Pod除外)
for k := range nc.zonePodEvictor {
// Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
nc.zonePodEvictor[k].Try(func(value TimedValue) (bool, time.Duration) {
node, err := nc.nodeLister.Get(value.Value)
...
nodeUid, _ := value.UID.(string)
remaining, err := deletePods(nc.kubeClient, nc.recorder, value.Value, nodeUid, nc.daemonSetStore)
...
return true, 0
})
}
}
doEvictionPass的邏輯:
- 遍歷所有zone的pod Evictor,從pod Evictor queue中獲取node name,
- 然後呼叫
deletePods
刪除node上的所有pods(deamonSet對應的Pod除外)
deletePods
的程式碼如下,
pkg/controller/node/controller_utils.go:49
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted, or were found pending deletion.
func deletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore extensionslisters.DaemonSetLister) (bool, error) {
...
// 從apiserver中獲取所有的pods物件。
pods, err := kubeClient.Core().Pods(metav1.NamespaceAll).List(options)
...
// 逐個遍歷pods中的pod,篩選出該node上的pods
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
...
// if the pod has already been marked for deletion, we still return true that there are remaining pods.
if pod.DeletionGracePeriodSeconds != nil {
remaining = true
continue
}
// if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod)
if err == nil { // No error means at least one daemonset was found
continue
}
...
// 呼叫apiserver介面刪除pod
if err := kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
...
return remaining, nil
}
deletePods
的主要邏輯如下:
- 從apiserver中獲取所有的pods物件。
- 逐個遍歷pods中的pod,篩選出該node上的pods
- 如果pod已經被標記為刪除(pod.DeletionGracePeriodSeconds != nil ),我們跳過這個pod.
- 如果pod是某個daemonset的pod,我們跳過這個pod。
- 除此之外,呼叫apiserver介面刪除pod。
至此,Node Controller Run方法的所有主要邏輯都已分析完成。
其中涉及到Taint Manager Run的相關邏輯,在該博文沒有深入進去分析,我將在後續博文中對Taint Manager做一次專門的分析。
相關推薦
Kubernetes Node Controller原始碼分析之執行篇
Author: [email protected] 摘要:我認為,Node Controller是Kubernetes幾十個Controller中最為重要的Controller之一,其重要程度在Top3,然而這可能也是最為複雜的一個Contr
Kubernetes PodGC Controller原始碼分析
Author: [email protected] PodGC Controller配置 關於PodGC Controller的相關配置(kube-controller-manager配置),一共只有兩個: flag def
Android事件分發機制原始碼分析之Activity篇
在之前的事件分發分析中,曾提及到View的事件是由ViewGroup分發的,然而ViewGroup的事件我們只是稍微帶過是由Activity分發的。而我們知道,事件產生於使用者按下螢幕的一瞬間,事件生成後,經過一系列的過程來到我們的Activity層,那麼事件是怎樣從Activity傳遞
Spring Cloud原始碼分析之Eureka篇第六章:服務註冊
在文章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》的分析中,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledT
Spring Cloud原始碼分析之Eureka篇第八章:服務註冊名稱的來歷
關於服務註冊名稱 服務註冊名稱,是指Eureka client註冊到Eureka server時,用於標記自己身份的標誌,舉例說明,以下是個簡單的Eureka client配置: server: port: 8082 spring: applicatio
Python 原始碼分析之執行時環境
python 執行時環境 執行環境是一個全域性的概念,而執行環境就是指棧幀 當執行時環境已經準備好的時候,執行第一行程式碼的函式就是 PyEval_EvalFrame 函式 PyObject * PyEval_EvalFrame(PyFrameObject *f) {
騰訊效能監控框架Matrix原始碼分析之第一篇
騰訊效能監控框架Matrix原始碼分析之第一篇 概述 前幾天騰訊將一款Android應用效能監控的框架matrix開源了,原始碼地址在github.com/Tencent/mat…,作者是微信終端團隊。matrix到底是什麼?據官方說法如下: Matrix 是一款微信研發並日常使用的 APM(Applic
Jetty原始碼分析之執行緒池:QueuedThreadPool
前面分析Jetty整體架構的時候介紹過Jetty的三大元件:Acceptor、Handler和ThreadPool;前兩者工作的時候都是需要執行緒的,而所需的執行緒正是從ThreadPool中獲取的。這篇檔案就是來分析ThreadPool的一個具體實現:Queu
Spring Cloud原始碼分析之Eureka篇第五章:更新服務列表
在上一章《Spring Cloud原始碼分析之Eureka篇第四章:服務註冊是如何發起的 》,我們知道了作為Eureka Client的應用啟動時,在com.netflix.discovery.DiscoveryClient類的initScheduledTask
Monkey原始碼分析之執行流程
在《MonkeyRunner原始碼分析之與Android裝置通訊方式》中,我們談及到MonkeyRunner控制目標android裝置有多種方法,其中之一就是在目標機器啟動一個monkey服務來監聽指定的一個埠,然後monkeyrunner再連線上這個埠來發送命令,驅動mo
【kubernetes/k8s原始碼分析】kubelet原始碼分析之cdvisor原始碼分析
資料流 UnsecuredDependencies -> run 1. cadvisor.New初始化 if kubeDeps.CAdvisorInterface == nil { imageFsInfoProvider := cadv
【kubernetes/k8s原始碼分析】kubelet原始碼分析之容器網路初始化原始碼分析
一. 網路基礎 1.1 網路名稱空間的操作 建立網路名稱空間: ip netns add 名稱空間內執行命令: ip netns exec 進入名稱空間: ip netns exec bash 1.2 bridge-nf-c
【kubernetes/k8s原始碼分析】kubelet原始碼分析之資源上報
0. 資料流 路徑: pkg/kubelet/kubelet.go Run函式() -> syncNodeStatus () -> registerWithAPIServer() ->
【kubernetes/k8s原始碼分析】kubelet原始碼分析之啟動容器
主要是呼叫runtime,這裡預設為docker 0. 資料流 NewMainKubelet(cmd/kubelet/app/server.go) -> NewKubeGenericRuntimeManager(pkg/kubelet/kuberuntime/kuberuntime
一步步實現windows版ijkplayer系列文章之三——Ijkplayer播放器原始碼分析之音視訊輸出——音訊篇
一步步實現windows版ijkplayer系列文章之三——Ijkplayer播放器原始碼分析之音視訊輸出——音訊篇 這篇文章的ijkplayer音訊原始碼研究我們還是選擇Android平臺,它的音訊解碼是不支援硬解的,音訊播放使用的API是OpenSL ES或AudioTrack。 OpenSL ES
【搞定Java併發程式設計】第17篇:佇列同步器AQS原始碼分析之共享模式
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過上一篇文章的的分析,我們知道獨佔模式獲取同步狀態(或者說獲取鎖
【搞定Java併發程式設計】第16篇:佇列同步器AQS原始碼分析之獨佔模式
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 本文主要講解佇列同步器AQS的獨佔模式:主要分為獨佔式同步狀態獲取
【搞定Java併發程式設計】第15篇:佇列同步器AQS原始碼分析之概要分析
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 先推薦兩篇不錯的博文: 1、一行一行原始碼分析清楚Abstract
【搞定Java併發程式設計】第18篇:佇列同步器AQS原始碼分析之Condition介面、等待佇列
AQS系列文章: 1、佇列同步器AQS原始碼分析之概要分析 2、佇列同步器AQS原始碼分析之獨佔模式 3、佇列同步器AQS原始碼分析之共享模式 4、佇列同步器AQS原始碼分析之Condition介面、等待佇列 通過前面三篇關於AQS文章的學習,我們深入瞭解了AbstractQ
Spring Cloud Netflix Zuul原始碼分析之路由註冊篇
微信公眾號:I am CR7如有問題或建議,請在下方留言;最近更新:2018-12-29 前言 繼上一篇Spring Cloud Netflix Zuul原始碼分析之預熱篇,我們知道了兩個重要的類:ZuulHandlerMapping和SimpleControllerHandlerA