1. 程式人生 > >圖解kubernetes排程器搶佔流程與演算法設計

圖解kubernetes排程器搶佔流程與演算法設計

搶佔排程是分散式排程中一種常見的設計,其核心目標是當不能為高優先順序的任務分配資源的時候,會通過搶佔低優先順序的任務來進行高優先順序的排程,本文主要學習k8s的搶佔排程以及裡面的一些有趣的演算法

1. 搶佔排程設計

1.1 搶佔原理

搶佔排程原理其實很簡單就是通過高優先順序的pod搶佔低優先順序的pod資源,從而滿足高優先pod的排程

1.2 中斷預算

在kubernetes中為了保證服務儘可能的高可用,設計PDB(PodDisruptionBudget)其核心目標就是在保證對應pod在指定的數量,主要是為了保證服務的可用性,在進行搶佔的過程中,應儘可能遵守該設計,儘量不去搶佔有PDB的資源,避免因為搶佔導致服務的不可用

1.3 優先順序反轉

優先順序反轉是訊號量裡面的一種機制即因為低優先順序任務的執行阻塞高優先順序的任務執行

在k8s中搶佔排程是通過高優先順序搶佔低優先順序pod,如果高優先順序pod依賴低優先順序pod, 則會因為依賴問題,導致優先順序失效,所以應該儘可能減少高優先順序pod對低優先順序的pod的依賴, 後面進行篩選原始碼分析時可以看到

1.4 搶佔選擇演算法

搶佔選擇演算法是指的通過搶佔部分節點後,如何從被搶佔的node陣列中篩選出一個node節點,目前k8s中主要實現了5個演算法

1.4.1 最少違反PDB

即最少違反PDB規則

1.4.2 最高優先順序最小優先

比較所有node的最高優先順序的pod,找到優先順序最低的node

1.4.3 優先順序總和最低優先

計算每個node上面的被搶佔的pod優先順序之和,選擇優先順序和最低的節點

1.4.4 最少搶佔數量優先

計算需要搶佔的節點數量最少的節點優先

1.4.5 最近更新節點優先

比較每個node中被驅逐的pod中最早啟動的pod的啟動時間,最近啟動的pod的節點,會被選擇

2. 原始碼設計

2.1 搶佔核心流程


搶佔的流程主要是通過Preempt來實現,其針對預選失敗的節點來進行驅逐某些低優先順序的pod來滿足高優先順序pod

func (g *genericScheduler) Preempt(pluginContext *framework.PluginContext, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
    // 只允許預選失敗的pod進行重試
    fitError, ok := scheduleErr.(*FitError)
    if !ok || fitError == nil {
        return nil, nil, nil, nil
    }
    // 是否允許搶佔其他提議的pod
    if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfoMap, g.enableNonPreempting) {
        klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
        return nil, nil, nil, nil
    }
    // 獲取當前叢集中的所有node
    allNodes := g.cache.ListNodes()
    if len(allNodes) == 0 {
        return nil, nil, nil, ErrNoNodesAvailable
    }
    // 初步篩選潛在的可以進行搶佔操作的node
    potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
    if len(potentialNodes) == 0 {
        klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
        // In this case, we should clean-up any existing nominated node name of the pod.
        return nil, nil, []*v1.Pod{pod}, nil
    }
    // 獲取所有pdb
    pdbs, err := g.pdbLister.List(labels.Everything())
    if err != nil {
        return nil, nil, nil, err
    }
    // 針對之前初步篩選的node嘗試進行搶佔和預選操作,返回結果中包含所有可以通過搶佔低優先順序pod完成pod排程的node節點與搶佔的pod
    nodeToVictims, err := g.selectNodesForPreemption(pluginContext, pod, g.nodeInfoSnapshot.NodeInfoMap, potentialNodes, g.predicates,
        g.predicateMetaProducer, g.schedulingQueue, pdbs)
    if err != nil {
        return nil, nil, nil, err
    }

    // 呼叫extenders進行再一輪的篩選
    nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
    if err != nil {
        return nil, nil, nil, err
    }

    // 從篩選結果中選擇最適合搶佔的node
    candidateNode := pickOneNodeForPreemption(nodeToVictims)
    if candidateNode == nil {
        return nil, nil, nil, nil
    }

    // 如果candidateNode不為nil,則找到一個最優的執行搶佔操作的node, 返回低優先的提議的pod
    // 還有搶佔的pod和當前節點
    nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
    if nodeInfo, ok := g.nodeInfoSnapshot.NodeInfoMap[candidateNode.Name]; ok {
        return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, nil
    }

    return nil, nil, nil, fmt.Errorf(
        "preemption failed: the target node %s has been deleted from scheduler cache",
        candidateNode.Name)
}

2.2 搶佔條件檢測

如果發現需要執行搶佔的pod有提名的node並且對應node上面存在比自己優先順序低的pod正在進行刪除, 則不允許進行搶佔

func podEligibleToPreemptOthers(pod *v1.Pod, nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo, enableNonPreempting bool) bool {
    if enableNonPreempting && pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
        klog.V(5).Infof("Pod %v/%v is not eligible for preemption because it has a preemptionPolicy of %v", pod.Namespace, pod.Name, v1.PreemptNever)
        return false
    }
    nomNodeName := pod.Status.NominatedNodeName
    if len(nomNodeName) > 0 {
        if nodeInfo, found := nodeNameToInfo[nomNodeName]; found {
            podPriority := util.GetPodPriority(pod)
            for _, p := range nodeInfo.Pods() {
                if p.DeletionTimestamp != nil && util.GetPodPriority(p) < podPriority {
                    // 正在終止的優先順序低於當前pod的pod就不會進行搶佔
                    return false
                }
            }
        }
    }
    return true
}

2.3 篩選潛在節點


每個node在預選階段都會進行一個標記,標記當前node執行預選失敗的原因,篩選潛在節點主要是根據對應的錯誤來進行篩選,如果不是不可解決的預選錯誤,則該node節點就可以參與接下來的搶佔階段

func nodesWherePreemptionMightHelp(nodes []*v1.Node, fitErr *FitError) []*v1.Node {
    potentialNodes := []*v1.Node{}
    // 根據預選階段的錯誤原因,如果不存在無法解決的錯誤,則這些node可能在接下來的搶佔流程中被使用
    for _, node := range nodes {
        if fitErr.FilteredNodesStatuses[node.Name].Code() == framework.UnschedulableAndUnresolvable {
            continue
        }
        failedPredicates, _ := fitErr.FailedPredicates[node.Name]
        if !unresolvablePredicateExists(failedPredicates) { 
            // 如果我們發現並不是不可解決的排程錯誤的時候,就講這個節點加入到這裡
            // 可能通過後續的調整會讓這些node重新滿足
            klog.V(3).Infof("Node %v is a potential node for preemption.", node.Name)
            potentialNodes = append(potentialNodes, node)
        }
    }
    return potentialNodes
}

不可通過調整的預選失敗原因

var unresolvablePredicateFailureErrors = map[predicates.PredicateFailureReason]struct{}{
    predicates.ErrNodeSelectorNotMatch:      {},
    predicates.ErrPodAffinityRulesNotMatch:  {},
    predicates.ErrPodNotMatchHostName:       {},
    predicates.ErrTaintsTolerationsNotMatch: {},
    predicates.ErrNodeLabelPresenceViolated: {},
    // 省略大部分,感興趣的可以自己關注下
}

2.4 並行篩選節點


篩選搶佔節點主要是並行對之前篩選潛在node進行嘗試,通過驅逐低優先順序pod滿足高優先順序pod排程,最終會篩選一批可以通過搶佔來滿足pod排程需要的節點, 其核心實現時通過selectVictimsOnNode來進行檢測,繼續往下看

func (g *genericScheduler) selectNodesForPreemption(
    pluginContext *framework.PluginContext,
    pod *v1.Pod,
    nodeNameToInfo map[string]*schedulernodeinfo.NodeInfo,
    potentialNodes []*v1.Node,
    fitPredicates map[string]predicates.FitPredicate,
    metadataProducer predicates.PredicateMetadataProducer,
    queue internalqueue.SchedulingQueue,
    pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*schedulerapi.Victims, error) {
    nodeToVictims := map[*v1.Node]*schedulerapi.Victims{}
    var resultLock sync.Mutex

    // We can use the same metadata producer for all nodes.
    meta := metadataProducer(pod, nodeNameToInfo)
    checkNode := func(i int) {
        nodeName := potentialNodes[i].Name
        var metaCopy predicates.PredicateMetadata
        if meta != nil {
            metaCopy = meta.ShallowCopy()
        }
        pods, numPDBViolations, fits := g.selectVictimsOnNode(pluginContext, pod, metaCopy, nodeNameToInfo[nodeName], fitPredicates, queue, pdbs)
        if fits {
            resultLock.Lock()
            victims := schedulerapi.Victims{
                Pods:             pods,
                NumPDBViolations: numPDBViolations,
            }
            nodeToVictims[potentialNodes[i]] = &victims
            resultLock.Unlock()
        }
    }
    workqueue.ParallelizeUntil(context.TODO(), 16, len(potentialNodes), checkNode)
    return nodeToVictims, nil
}

2.5 單點篩選流程

selectVictimsOnNode即單點篩選流程是針對單個node來指向具體的驅逐搶佔決策的流程, 其核心流程如下

2.5.1 優先順序篩選

優先順序篩選首先會對當前node上面的所有節點進行優先順序排序,移除所有比當前pod低的pod

potentialVictims := util.SortableList{CompFunc: util.MoreImportantPod}
    nodeInfoCopy := nodeInfo.Clone()

    removePod := func(rp *v1.Pod) {
        nodeInfoCopy.RemovePod(rp)
        if meta != nil {
            meta.RemovePod(rp, nodeInfoCopy.Node())
        }
    }
    addPod := func(ap *v1.Pod) {
        nodeInfoCopy.AddPod(ap)
        if meta != nil {
            meta.AddPod(ap, nodeInfoCopy)
        }
    }
    podPriority := util.GetPodPriority(pod)
    for _, p := range nodeInfoCopy.Pods() {
        if util.GetPodPriority(p) < podPriority {
            // 移除所有優先順序比自己低的pod
            potentialVictims.Items = append(potentialVictims.Items, p)
            removePod(p)
        }
    }

2.5.2 預選判斷

對移除所有優先順序比自己的pod之後,會嘗試進行預選流程,如果發現預選流程失敗,則當前node即使通過移除所有比自己優先順序低的pod也不能滿足排程需求,則就進行下一個node判斷

if fits, _, _, err := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false); !fits {
        if err != nil {
            klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
        }

        return nil, 0, false
    }

2.5.3 PDB分組與分組演算法

PDB分組就是對當前節點上篩選出來的低優先順序pod按照是否有PDB匹配來進行分組,分為違反PDB和未違反PDB的兩組

violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims.Items, pdbs)

分組演算法其實也不難,只需要遍歷所有的pdb和pod就可以得到最終的分組

func filterPodsWithPDBViolation(pods []interface{}, pdbs []*policy.PodDisruptionBudget) (violatingPods, nonViolatingPods []*v1.Pod) {
    for _, obj := range pods {
        pod := obj.(*v1.Pod)
        pdbForPodIsViolated := false
        // A pod with no labels will not match any PDB. So, no need to check.
        if len(pod.Labels) != 0 {
            for _, pdb := range pdbs {
                if pdb.Namespace != pod.Namespace {
                    continue
                }
                selector, err := metav1.LabelSelectorAsSelector(pdb.Spec.Selector)
                if err != nil {
                    continue
                }
                // A PDB with a nil or empty selector matches nothing.
                if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
                    continue
                }
                // We have found a matching PDB.
                if pdb.Status.PodDisruptionsAllowed <= 0 {
                    pdbForPodIsViolated = true
                    break
                }
            }
        }
        if pdbForPodIsViolated {
            violatingPods = append(violatingPods, pod)
        } else {
            nonViolatingPods = append(nonViolatingPods, pod)
        }
    }
    return violatingPods, nonViolatingPods
}

2.5.4 違反PDB計數與最少驅逐彙總

會分別對違反PDB和不違反的pod集合來進行reprievePod檢測,如果加入當前pod後,不能滿足預選篩選流程,則該pod則必須被進行移除加入到victims中, 同時如果是違反PDB的pod則需要進行違反pdb計數numViolatingVictim

  reprievePod := func(p *v1.Pod) bool { 
        // 我們首先將pod加入到meta中
        addPod(p)
        fits, _, _, _ := g.podFitsOnNode(pluginContext, pod, meta, nodeInfoCopy, fitPredicates, queue, false)
        // 
        if !fits {
            // 如果我們加入了pod然後導致了預選不成功,則這個pod必須給移除
            removePod(p)
            victims = append(victims, p) // 新增到我們需要移除的列表裡面
            klog.V(5).Infof("Pod %v/%v is a potential preemption victim on node %v.", p.Namespace, p.Name, nodeInfo.Node().Name)
        }
        return fits
    }
    for _, p := range violatingVictims {
        if !reprievePod(p) {
            numViolatingVictim++
        }
    }
    // Now we try to reprieve non-violating victims.
    for _, p := range nonViolatingVictims {
        // 嘗試移除未違反pdb的pod
        reprievePod(p)
    }
    return victims, numViolatingVictim, true

2.6 篩選最優搶佔

最優篩選主要是通過pickOneNodeForPreemption實現,其中篩選資料儲存結構主要是通過重用minNodes1和minNodes2兩段記憶體來進行實現,這兩個node陣列分別配有兩個計數器lenNodes1和lenNodes2, 針對具有相同優先順序、相同數量的node,每增加一個會進行一次計數器累加, 核心演算法流程如下

2.6.1 最少違反PDB

最少違反PDB是根據前面統計的違反PDB的計數統計,找到最少違反的node,如果是單個node則直接返回篩選結束

    minNumPDBViolatingPods := math.MaxInt32
    var minNodes1 []*v1.Node
    lenNodes1 := 0
    for node, victims := range nodesToVictims {
        if len(victims.Pods) == 0 {
            // 如果發現一個noed不需要任何搶佔,則返回它
            return node
        }
        numPDBViolatingPods := victims.NumPDBViolations
        if numPDBViolatingPods < minNumPDBViolatingPods { 
            // 如果小於最小pdb數量, 如果數量發生變化,就重置
            minNumPDBViolatingPods = numPDBViolatingPods
            minNodes1 = nil
            lenNodes1 = 0
        }
        if numPDBViolatingPods == minNumPDBViolatingPods { 
            // 多個相同的node會進行追加,並累加計數器 
            minNodes1 = append(minNodes1, node)
            lenNodes1++
        }
    }
    if lenNodes1 == 1 {
        return minNodes1[0]
    }

2.6.2 最高優先順序最小優先

最高優先順序最小優先是指通過對比多個node的最高優先順序的pod,優先順序最低的那個node被選中,如果多個則進行下一個演算法

minHighestPriority := int32(math.MaxInt32)
    var minNodes2 = make([]*v1.Node, lenNodes1)
    lenNodes2 := 0
    for i := 0; i < lenNodes1; i++ {
        node := minNodes1[i]
        victims := nodesToVictims[node]
        // highestPodPriority is the highest priority among the victims on this node.
        // 返回優先順序最高的pod
        highestPodPriority := util.GetPodPriority(victims.Pods[0])
        if highestPodPriority < minHighestPriority {
            // 重置狀態
            minHighestPriority = highestPodPriority
            lenNodes2 = 0
        }
        
        if highestPodPriority == minHighestPriority {
            // 如果優先順序相等則加入進去
            minNodes2[lenNodes2] = node
            lenNodes2++
        }
    }
    if lenNodes2 == 1 {
        return minNodes2[0]
    }

2.6.3 優先順序總和最低優先

統計每個node上的所有被搶佔的pod的優先順序的總和,然後在多個node之間進行比較,優先順序總和最低的節點被選中

minSumPriorities := int64(math.MaxInt64)
    lenNodes1 = 0
    for i := 0; i < lenNodes2; i++ {
        var sumPriorities int64
        node := minNodes2[i]
        // 統計所有優先順序
        for _, pod := range nodesToVictims[node].Pods {
            
            sumPriorities += int64(util.GetPodPriority(pod)) + int64(math.MaxInt32+1)
        }
        if sumPriorities < minSumPriorities {
            minSumPriorities = sumPriorities
            lenNodes1 = 0
        }
        if sumPriorities == minSumPriorities {
            minNodes1[lenNodes1] = node
            lenNodes1++
        }
    }
    // 最少優先順序的node
    if lenNodes1 == 1 {
        return minNodes1[0]
    }

2.6.4 最少搶佔數量優先

最少搶佔數量優先即統計每個node被搶佔的節點數量,數量最少得被選中

    minNumPods := math.MaxInt32
    lenNodes2 = 0
    for i := 0; i < lenNodes1; i++ {
        node := minNodes1[i]
        numPods := len(nodesToVictims[node].Pods)
        if numPods < minNumPods {
            minNumPods = numPods
            lenNodes2 = 0
        }
        if numPods == minNumPods {
            minNodes2[lenNodes2] = node
            lenNodes2++
        }
    }
    // 最少節點數量
    if lenNodes2 == 1 {
        return minNodes2[0]
    }

2.6.5 最近更新節點優先

該演算法會篩選每個node驅逐的pod中優先順序最高的pod的最早更新時間(其實就是說這個pod早就被建立了),然後在多個node之間進行比較,如果誰上面的時間越新(即這個node上的pod可能是最近被排程上去的),則就選中這個節點

 latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])
    if latestStartTime == nil {
        // If the earliest start time of all pods on the 1st node is nil, just return it,
        // which is not expected to happen.
        // 如果第一個節點上所有pod的最早開始時間為零,那麼返回它
        klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", minNodes2[0])
        return minNodes2[0]
    }
    nodeToReturn := minNodes2[0]
    for i := 1; i < lenNodes2; i++ {
        node := minNodes2[i]
        // Get earliest start time of all pods on the current node.
        // 獲取當前node最早啟動時間
        earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
        if earliestStartTimeOnNode == nil {
            klog.Errorf("earliestStartTime is nil for node %s. Should not reach here.", node)
            continue
        }
        if earliestStartTimeOnNode.After(latestStartTime.Time) {
            latestStartTime = earliestStartTimeOnNode
            nodeToReturn = node
        }
    }

    return nodeToReturn

閱讀總結

因為是純的演算法流程,並沒有複雜的資料結構,大家看看就好,排程器的設計可能就看到這了,後面把之前的都串起來,算是一個總結,如果有興趣我就再看看 SchedulerExtender和framework的設計, 其實學習scheduler排程器部分只是因為自己對分散式排程這塊比較好奇,而且自己有運維開發的經驗,這對pod排程類似場景並不陌生,看起來總的來說相對容易一點,而且我只分析了核心的資料結構和演算法,還有幾個階段,為了避免陷入對kubenretes一些複雜邏輯的處理,我都儘量簡化邏輯,就是希望即時不去看k8s scheduler的程式碼,也能有所收穫

微訊號:baxiaoshi2020
關注公告號閱讀更多原始碼分析文章
更多文章關注 www.sreguide.com
本文由部落格一文多發平臺 OpenWrite 釋出