1. 程式人生 > >13.深入k8s:Pod 水平自動擴縮HPA及其原始碼分析

13.深入k8s:Pod 水平自動擴縮HPA及其原始碼分析

![63200675_p0](https://img.luozhiyun.com/20201004174855.jpg) > 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19) ## Pod 水平自動擴縮 ### Pod 水平自動擴縮工作原理 Pod 水平自動擴縮全名是Horizontal Pod Autoscaler簡稱HPA。它可以基於 CPU 利用率或其他指標自動擴縮 ReplicationController、Deployment 和 ReplicaSet 中的 Pod 數量。 ![image-20200928160203334](https://img.luozhiyun.com/20201004174900.png) Pod 水平自動擴縮器由--horizontal-pod-autoscaler-sync-period 引數指定週期(預設值為 15 秒)。每個週期內,控制器管理器根據每個 HorizontalPodAutoscaler 定義中指定的指標查詢資源利用率。 Pod 水平自動擴縮控制器跟據當前指標和期望指標來計算擴縮比例,公式為: ``` desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )] ``` currentReplicas表示當前度量值,desiredMetricValue表示期望度量值,desiredReplicas表示期望副本數。例如,當前度量值為 200m,目標設定值為 100m,那麼由於 200.0/100.0 == 2.0, 副本數量將會翻倍。 如果當前指標為 50m,副本數量將會減半,因為50.0/100.0 == 0.5。 我們可以通過使用kubectl來建立HPA。如通過 kubectl create 命令建立一個 HPA 物件, 通過 kubectl get hpa 命令來獲取所有 HPA 物件, 通過 kubectl describe hpa 命令來檢視 HPA 物件的詳細資訊。 最後,可以使用 kubectl delete hpa 命令刪除物件。 也可以通過kubectl autoscale來建立 HPA 物件。 例如,命令 kubectl autoscale rs foo --min=2 --max=5 --cpu-percent=80 將會為名 為 foo 的 ReplicationSet 建立一個 HPA 物件, 目標 CPU 使用率為 80%,副本數量配置為 2 到 5 之間。 如果指標變化太頻繁,我們也可以使用`--horizontal-pod-autoscaler-downscale-stabilization`指令設定擴縮容延遲時間,表示的是自從上次縮容執行結束後,多久可以再次執行縮容,預設是5m。 ### Pod 水平自動擴縮示例 編寫用於測試的Deployment: ```yaml apiVersion: apps/v1 kind: Deployment metadata: name: hpatest spec: replicas: 1 selector: matchLabels: app: hpatest template: metadata: labels: app: hpatest spec: containers: - name: hpatest image: nginx imagePullPolicy: IfNotPresent command: ["/bin/sh"] args: ["-c","/usr/sbin/nginx; while true;do echo `hostname -I` > /usr/share/nginx/html/index.html; sleep 120;done"] ports: - containerPort: 80 resources: requests: cpu: 1m memory: 100Mi limits: cpu: 3m memory: 400Mi --- apiVersion: v1 kind: Service metadata: name: hpatest-svc spec: selector: app: hpatest ports: - port: 80 targetPort: 80 protocol: TCP ``` 編寫HPA,用於水平擴充套件,當cpu達到50%的利用率的時候開始擴充套件: ```yaml apiVersion: autoscaling/v1 kind: HorizontalPodAutoscaler metadata: name: haptest-nginx spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: hpatest minReplicas: 2 maxReplicas: 6 targetCPUUtilizationPercentage: 50 ``` 寫一個簡單的壓測指令碼: ```sh [root@localhost HPA]# vim hpatest.sh while true do wget -q -O- http://10.68.50.65 done ``` 觀察一下hpa的TARGETS情況: ```sh [root@localhost ~]# kubectl get hpa -w NAME REFERENCE TARGETS MINPODS MAXPODS REPLICAS AGE hpatest Deployment/hpatest 0%/50% 1 5 1 5m47s hpatest Deployment/hpatest 400%/50% 1 5 1 5m49s hpatest Deployment/hpatest 400%/50% 1 5 4 6m4s hpatest Deployment/hpatest 400%/50% 1 5 5 6m19s hpatest Deployment/hpatest 500%/50% 1 5 5 6m49s ``` 觀察是否會自動擴容: ```sh [root@localhost ~]# kubectl get pods -o wide -w NAME READY STATUS RESTARTS AGE IP NODE NOMINATED NODE READINESS GATES hpatest-bbb44c476-jv8zr 0/1 ContainerCreating 0 0s 192.168.13.130 hpatest-bbb44c476-sk6qb 0/1 ContainerCreating 0 0s 192.168.13.130 hpatest-bbb44c476-7s5qn 0/1 ContainerCreating 0 0s 192.168.13.130 hpatest-bbb44c476-7s5qn 1/1 Running 0 6s 172.20.0.23 192.168.13.130 hpatest-bbb44c476-sk6qb 1/1 Running 0 6s 172.20.0.22 192.168.13.130 hpatest-bbb44c476-jv8zr 1/1 Running 0 6s 172.20.0.21 192.168.13.130 hpatest-bbb44c476-dstnf 0/1 Pending 0 0s hpatest-bbb44c476-dstnf 0/1 Pending 0 0s 192.168.13.130 hpatest-bbb44c476-dstnf 0/1 ContainerCreating 0 0s 192.168.13.130 hpatest-bbb44c476-dstnf 1/1 Running 0 6s 172.20.0.24 192.168.13.130 ``` 停止壓測之後,HPA開始自動縮容: ```sh [root@localhost HPA]# kubectl get pod -w hpatest-bbb44c476-dstnf 0/1 Terminating 0 9m52s hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-sk6qb 0/1 Terminating 0 10m hpatest-bbb44c476-sk6qb 0/1 Terminating 0 10m hpatest-bbb44c476-dstnf 0/1 Terminating 0 10m hpatest-bbb44c476-dstnf 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-7s5qn 0/1 Terminating 0 10m hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m hpatest-bbb44c476-jv8zr 0/1 Terminating 0 10m ``` ## 原始碼分析 ### 初始化 檔案位置:cmd/kube-controller-manager/app/controllermanager.go ```go func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { ... controllers["horizontalpodautoscaling"] = startHPAController ... } ``` HPA Controller和其他的Controller一樣,都在NewControllerInitializers方法中進行註冊,然後通過startHPAController來啟動。 #### startHPAController 檔案位置:cmd/kube-controller-manager/app/autoscaling.go ```go func startHPAController(ctx ControllerContext) (http.Handler, bool, error) { ... return startHPAControllerWithLegacyClient(ctx) } func startHPAControllerWithLegacyClient(ctx ControllerContext) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") metricsClient := metrics.NewHeapsterMetricsClient( hpaClient, metrics.DefaultHeapsterNamespace, metrics.DefaultHeapsterScheme, metrics.DefaultHeapsterService, metrics.DefaultHeapsterPort, ) return startHPAControllerWithMetricsClient(ctx, metricsClient) } func startHPAControllerWithMetricsClient(ctx ControllerContext, metricsClient metrics.MetricsClient) (http.Handler, bool, error) { hpaClient := ctx.ClientBuilder.ClientOrDie("horizontal-pod-autoscaler") hpaClientConfig := ctx.ClientBuilder.ConfigOrDie("horizontal-pod-autoscaler") scaleKindResolver := scale.NewDiscoveryScaleKindResolver(hpaClient.Discovery()) scaleClient, err := scale.NewForConfig(hpaClientConfig, ctx.RESTMapper, dynamic.LegacyAPIPathResolverFunc, scaleKindResolver) if err != nil { return nil, false, err } // 初始化 go podautoscaler.NewHorizontalController( hpaClient.CoreV1(), scaleClient, hpaClient.AutoscalingV1(), ctx.RESTMapper, metricsClient, ctx.InformerFactory.Autoscaling().V1().HorizontalPodAutoscalers(), ctx.InformerFactory.Core().V1().Pods(), ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerSyncPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerDownscaleStabilizationWindow.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerTolerance, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerCPUInitializationPeriod.Duration, ctx.ComponentConfig.HPAController.HorizontalPodAutoscalerInitialReadinessDelay.Duration, ).Run(ctx.Stop) return nil, true, nil } ``` 最後會呼叫到startHPAControllerWithMetricsClient方法,啟動一個執行緒來呼叫NewHorizontalController方法初始化一個HPA Controller,然後執行Run方法。 #### Run 檔案位置:pkg/controller/podautoscaler/horizontal.go ```go func (a *HorizontalController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer a.queue.ShutDown() klog.Infof("Starting HPA controller") defer klog.Infof("Shutting down HPA controller") if !cache.WaitForNamedCacheSync("HPA", stopCh, a.hpaListerSynced, a.podListerSynced) { return } // 啟動非同步執行緒,每秒執行一次 go wait.Until(a.worker, time.Second, stopCh) <-stopCh } ``` 這裡會呼叫worker執行具體的擴縮容的邏輯。 ### 核心程式碼分析 worker裡面一路執行下來會走到reconcileAutoscaler方法裡面,這裡是HPA的核心。下面我們專注看看這部分。 #### reconcileAutoscaler:計算副本數 ```go func (a *HorizontalController) reconcileAutoscaler(hpav1Shared *autoscalingv1.HorizontalPodAutoscaler, key string) error { ... //副本數為0,不啟動自動擴縮容 if scale.Spec.Replicas == 0 && minReplicas != 0 { // Autoscaling is disabled for this resource desiredReplicas = 0 rescale = false setCondition(hpa, autoscalingv2.ScalingActive, v1.ConditionFalse, "ScalingDisabled", "scaling is disabled since the replica count of the target is zero") // 如果當前副本數大於最大期望副本數,那麼設定期望副本數為最大副本數 } else if currentReplicas >
hpa.Spec.MaxReplicas { rescaleReason = "Current number of replicas above Spec.MaxReplicas" desiredReplicas = hpa.Spec.MaxReplicas // 同上 } else if currentReplicas < minReplicas { rescaleReason = "Current number of replicas below Spec.MinReplicas" desiredReplicas = minReplicas } else { var metricTimestamp time.Time //計算需要擴縮容的數量 metricDesiredReplicas, metricName, metricStatuses, metricTimestamp, err = a.computeReplicasForMetrics(hpa, scale, hpa.Spec.Metrics) if err != nil { ... } klog.V(4).Infof("proposing %v desired replicas (based on %s from %s) for %s", metricDesiredReplicas, metricName, metricTimestamp, reference) rescaleMetric := "" if metricDesiredReplicas > desiredReplicas { desiredReplicas = metricDesiredReplicas rescaleMetric = metricName } if desiredReplicas > currentReplicas { rescaleReason = fmt.Sprintf("%s above target", rescaleMetric) } if desiredReplicas < currentReplicas { rescaleReason = "All metrics below target" } //從1.18開始支援behavior欄位 //可以在擴縮容的時候指定一個穩定視窗,以防止縮放目標中的副本數量出現波動 //doc:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior if hpa.Spec.Behavior == nil { desiredReplicas = a.normalizeDesiredReplicas(hpa, key, currentReplicas, desiredReplicas, minReplicas) } else { desiredReplicas = a.normalizeDesiredReplicasWithBehaviors(hpa, key, currentReplicas, desiredReplicas, minReplicas) } rescale = desiredReplicas != currentReplicas } ... } ``` 這一段程式碼是reconcileAutoscaler裡面的核心程式碼,在這裡會確定一個區間,首先根據當前的scale物件和當前hpa裡面配置的對應的引數的值,決策當前的副本數量,其中針對於超過設定的maxReplicas和小於minReplicas兩種情況,只需要簡單的修正為對應的值,直接更新對應的scale物件即可,而scale副本為0的物件,則hpa不會在進行任何操作。 對於當前副本數在maxReplicas和minReplicas之間的時候,則需要計算是否需要擴縮容,計算則是呼叫computeReplicasForMetrics方法來實現。 最後如果設定了Behavior則呼叫normalizeDesiredReplicasWithBehaviors函式來修正最後的結果,Behavior相關可以看文件:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#support-for-configurable-scaling-behavior。 下面我們一步步分析。 #### computeReplicasForMetrics:遍歷度量目標 ```go func (a *HorizontalController) computeReplicasForMetrics(hpa *autoscalingv2.HorizontalPodAutoscaler, scale *autoscalingv1.Scale, metricSpecs []autoscalingv2.MetricSpec) (replicas int32, metric string, statuses []autoscalingv2.MetricStatus, timestamp time.Time, err error) { ... //這裡的度量目標可以是一個列表,所以遍歷之後取最大的需要擴縮容的數量 for i, metricSpec := range metricSpecs { //根據type型別計算需要擴縮容的數量 replicaCountProposal, metricNameProposal, timestampProposal, condition, err := a.computeReplicasForMetric(hpa, metricSpec, specReplicas, statusReplicas, selector, &statuses[i]) if err != nil { if invalidMetricsCount <= 0 { invalidMetricCondition = condition invalidMetricError = err } invalidMetricsCount++ } //記錄最大的需要擴縮容的數量 if err == nil && (replicas == 0 || replicaCountProposal >
replicas) { timestamp = timestampProposal replicas = replicaCountProposal metric = metricNameProposal } } ... return replicas, metric, statuses, timestamp, nil } ``` 因為我們在設定metrics的時候實際上是一個數組,如下: ```yaml apiVersion: autoscaling/v2beta2 kind: HorizontalPodAutoscaler metadata: name: php-apache spec: scaleTargetRef: apiVersion: apps/v1 kind: Deployment name: php-apache minReplicas: 1 maxReplicas: 10 metrics: - type: Resource resource: name: cpu target: type: Utilization averageUtilization: 50 - type: Pods pods: metric: name: packets-per-second target: type: AverageValue averageValue: 1k - type: Object object: metric: name: requests-per-second describedObject: apiVersion: networking.k8s.io/v1beta1 kind: Ingress name: main-route target: type: Value value: 10k ``` 例如這個官方的例子中,設定了三個metric,所以我們在上面的程式碼中遍歷所有的metrics,然後選取返回副本數最大的那個。主要計算邏輯都在computeReplicasForMetric中,下面我們看看這個方法。 #### computeReplicasForMetric:根據type計算副本數 ```go func (a *HorizontalController) computeReplicasForMetric(hpa *autoscalingv2.HorizontalPodAutoscaler, spec autoscalingv2.MetricSpec, specReplicas, statusReplicas int32, selector labels.Selector, status *autoscalingv2.MetricStatus) (replicaCountProposal int32, metricNameProposal string, timestampProposal time.Time, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { //根據不同的型別來進行計量 switch spec.Type { //表示如果是一個k8s物件,如Ingress物件 case autoscalingv2.ObjectMetricSourceType: ... // 表示pod度量型別 case autoscalingv2.PodsMetricSourceType: metricSelector, err := metav1.LabelSelectorAsSelector(spec.Pods.Metric.Selector) if err != nil { condition := a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) } //僅支援AverageValue度量目標,計算需要擴縮容的數量 replicaCountProposal, timestampProposal, metricNameProposal, condition, err = a.computeStatusForPodsMetric(specReplicas, spec, hpa, selector, status, metricSelector) if err != nil { return 0, "", time.Time{}, condition, fmt.Errorf("failed to get pods metric value: %v", err) } // 表示Resource度量型別 case autoscalingv2.ResourceMetricSourceType: ... case autoscalingv2.ExternalMetricSourceType: ... default: errMsg := fmt.Sprintf("unknown metric source type %q", string(spec.Type)) err = fmt.Errorf(errMsg) condition := a.getUnableComputeReplicaCountCondition(hpa, "InvalidMetricSourceType", err) return 0, "", time.Time{}, condition, err } return replicaCountProposal, metricNameProposal, timestampProposal, autoscalingv2.HorizontalPodAutoscalerCondition{}, nil } ``` 這裡會根據不同的度量型別來進行統計,目前度量型別有四種,分別是Pods、Object、Resource、External,解釋如下: ```go const ( // ObjectMetricSourceType is a metric describing a kubernetes object // (for example, hits-per-second on an Ingress object). // 這種度量專門用來描述k8s的內建物件 ObjectMetricSourceType MetricSourceType = "Object" // PodsMetricSourceType is a metric describing each pod in the current scale // target (for example, transactions-processed-per-second). The values // will be averaged together before being compared to the target value. // 這種度量描述在目前被統計的每個pod平均期望值 PodsMetricSourceType MetricSourceType = "Pods" // ResourceMetricSourceType is a resource metric known to Kubernetes, as // specified in requests and limits, describing each pod in the current // scale target (e.g. CPU or memory). Such metrics are built in to // Kubernetes, and have special scaling options on top of those available // to normal per-pod metrics (the "pods" source). // Resource描述的是每個pod中資源,如CPU或記憶體 ResourceMetricSourceType MetricSourceType = "Resource" // ExternalMetricSourceType is a global metric that is not associated // with any Kubernetes object. It allows autoscaling based on information // coming from components running outside of cluster // (for example length of queue in cloud messaging service, or // QPS from loadbalancer running outside of cluster). // External型別表示的是一種全域性的度量,和k8s物件無關,主要依賴外部叢集提供資訊 ExternalMetricSourceType MetricSourceType = "External" ) ``` 我們這裡不會全部都介紹,挑選pod度量型別作為例子。pod這個分支會呼叫computeStatusForPodsMetric方法來計算需要擴縮容的數量。 #### computeStatusForPodsMetric&GetMetricReplicas:計算需要擴縮容的數量 檔案位置:pkg/controller/podautoscaler/replica_calculator.go ```go func (a *HorizontalController) computeStatusForPodsMetric(currentReplicas int32, metricSpec autoscalingv2.MetricSpec, hpa *autoscalingv2.HorizontalPodAutoscaler, selector labels.Selector, status *autoscalingv2.MetricStatus, metricSelector labels.Selector) (replicaCountProposal int32, timestampProposal time.Time, metricNameProposal string, condition autoscalingv2.HorizontalPodAutoscalerCondition, err error) { //計算需要擴縮容的數量 replicaCountProposal, utilizationProposal, timestampProposal, err := a.replicaCalc.GetMetricReplicas(currentReplicas, metricSpec.Pods.Target.AverageValue.MilliValue(), metricSpec.Pods.Metric.Name, hpa.Namespace, selector, metricSelector) if err != nil { condition = a.getUnableComputeReplicaCountCondition(hpa, "FailedGetPodsMetric", err) return 0, timestampProposal, "", condition, err } ... return replicaCountProposal, timestampProposal, fmt.Sprintf("pods metric %s", metricSpec.Pods.Metric.Name), autoscalingv2.HorizontalPodAutoscalerCondition{}, nil } func (c *ReplicaCalculator) GetMetricReplicas(currentReplicas int32, targetUtilization int64, metricName string, namespace string, selector labels.Selector, metricSelector labels.Selector) (replicaCount int32, utilization int64, timestamp time.Time, err error) { //獲取pod中度量資料 metrics, timestamp, err := c.metricsClient.GetRawMetric(metricName, namespace, selector, metricSelector) if err != nil { return 0, 0, time.Time{}, fmt.Errorf("unable to get metric %s: %v", metricName, err) } //通過結合度量資料來計算希望擴縮容的數量是多少 replicaCount, utilization, err = c.calcPlainMetricReplicas(metrics, currentReplicas, targetUtilization, namespace, selector, v1.ResourceName("")) return replicaCount, utilization, timestamp, err } ``` 這裡會呼叫GetRawMetric方法來獲取pod對應的度量資料,然後再呼叫calcPlainMetricReplicas方法結合度量資料與目標期望來計算希望擴縮容的數量是多少。 #### calcPlainMetricReplicas:計算副本數具體實現 calcPlainMetricReplicas方法邏輯比較多,下面分開來講解。 ```go func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { podList, err := c.podLister.Pods(namespace).List(selector) ... //將pod分成三類進行統計,得到ready的pod數量、ignored Pod集合、missing Pod集合 readyPodCount, ignoredPods, missingPods := groupPods(podList, metrics, resource, c.cpuInitializationPeriod, c.delayOfInitialReadinessStatus) //在度量的資料裡移除ignored Pods集合的資料 removeMetricsForPods(metrics, ignoredPods) //計算pod中container request 設定的資源之和 requests, err := calculatePodRequests(podList, resource) ... } ``` 這裡會呼叫groupPods將pod列表的進行一個分類統計。ignoredPods集合裡面包含了pod狀態為PodPending的資料;missingPods列表裡面包含了在度量資料裡面根據pod名找不到的資料。 因為missingPods的度量資料已經在metrics裡是找不到的,然後只需要剔除掉ignored Pods集合中度量的資源就好了。 接下來呼叫calculatePodRequests方法統計pod中container request 設定的資源之和。 我們繼續往下看: ```go func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... //獲取資源使用率 usageRatio, utilization := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) ... } ``` 到這裡會呼叫GetMetricUtilizationRatio方法計算資源使用率。 這個方法比較簡單: usageRatio=currentUtilization/targetUtilization; currentUtilization = metrics值之和metricsTotal/metrics的長度; 繼續往下: ```go func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... rebalanceIgnored := len(ignoredPods) >
0 && usageRatio > 1.0 if !rebalanceIgnored && len(missingPods) == 0 { if math.Abs(1.0-usageRatio) <= c.tolerance { // return the current replicas if the change would be too small return currentReplicas, utilization, nil } //如果沒有unready 或 missing 的pod,那麼使用 usageRatio*readyPodCount計算需要擴縮容數量 return int32(math.Ceil(usageRatio * float64(readyPodCount))), utilization, nil } if len(missingPods) > 0 { if usageRatio < 1.0 { //如果是縮容,那麼將missing pod使用率設定為目標資源使用率 for podName := range missingPods { metrics[podName] = metricsclient.PodMetric{Value: targetUtilization} } } else { //如果是擴容,那麼將missing pod使用率設定為0 for podName := range missingPods { metrics[podName] = metricsclient.PodMetric{Value: 0} } } } if rebalanceIgnored { // 將unready pods使用率設定為0 for podName := range ignoredPods { metrics[podName] = metricsclient.PodMetric{Value: 0} } } ... } ``` 這裡邏輯比較清晰,首先是判斷如果missingPods和ignoredPods集合為空,那麼檢查一下是否在tolerance容忍度之內預設是0.1,如果在的話直接返回不進行擴縮容,否則返回usageRatio*readyPodCount表示需要擴縮容的容量; 如果missingPods集合不為空,那麼需要判斷一下是擴容還是縮容,相應調整metrics裡面的值; 最後如果是擴容,還需要將ignoredPods集合的pod在metrics集合裡設定為空。 接著看最後一部分: ```go func (c *ReplicaCalculator) calcPlainMetricReplicas(metrics metricsclient.PodMetricsInfo, currentReplicas int32, targetUtilization int64, namespace string, selector labels.Selector, resource v1.ResourceName) (replicaCount int32, utilization int64, err error) { ... //重新計算資源利用率 newUsageRatio, _ := metricsclient.GetMetricUtilizationRatio(metrics, targetUtilization) if math.Abs(1.0-newUsageRatio) <= c.tolerance || (usageRatio < 1.0 && newUsageRatio > 1.0) || (usageRatio > 1.0 && newUsageRatio < 1.0) { return currentReplicas, utilization, nil } return int32(math.Ceil(newUsageRatio * float64(len(metrics)))), utilization, nil } ``` 因為上面重新對missingPods列表和ignoredPods列表中的metrics值進行了重新設定,所以這裡需要重新計算資源利用率。 如果變化在容忍度之內,或者usageRatio與newUsageRatio一個大於一個小於零表示兩者伸縮方向不一致,那麼直接返回。否則返回newUsageRatio* metrics的長度作為擴縮容的具體值。 介紹完了這一塊我們再來看看整個邏輯流程圖: ![image-20201003154222404](https://img.luozhiyun.com/20201004174910.png) 講完了computeReplicasForMetrics方法,下面我們繼續回到reconcileAutoscaler方法中往下看。 繼續往下就到了檢查是否設定了Behavior,如果沒有設定那麼走的是normalizeDesiredReplicas方法,這個方法較為簡單,我們直接看看normalizeDesiredReplicasWithBehaviors方法做了什麼,以及是怎麼實現的。 #### normalizeDesiredReplicasWithBehaviors:Behavior限制 關於Behavior具體的例子可以到這裡看:https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/#default-behavior。 ```go func (a *HorizontalController) normalizeDesiredReplicasWithBehaviors(hpa *autoscalingv2.HorizontalPodAutoscaler, key string, currentReplicas, prenormalizedDesiredReplicas, minReplicas int32) int32 { //如果StabilizationWindowSeconds設定為空,那麼給一個預設的值,預設300s a.maybeInitScaleDownStabilizationWindow(hpa) normalizationArg := NormalizationArg{ Key: key, ScaleUpBehavior: hpa.Spec.Behavior.ScaleUp, ScaleDownBehavior: hpa.Spec.Behavior.ScaleDown, MinReplicas: minReplicas, MaxReplicas: hpa.Spec.MaxReplicas, CurrentReplicas: currentReplicas, DesiredReplicas: prenormalizedDesiredReplicas} //根據引數獲取建議副本數 stabilizedRecommendation, reason, message := a.stabilizeRecommendationWithBehaviors(normalizationArg) normalizationArg.DesiredReplicas = stabilizedRecommendation ... //根據scaleDown或scaleUp指定的引數做限制 desiredReplicas, reason, message := a.convertDesiredReplicasWithBehaviorRate(normalizationArg) ... return desiredReplicas } ``` 這個方法主要分為兩部分,一部分是呼叫stabilizeRecommendationWithBehaviors方法來根據時間視窗來獲取一個建議副本數;另一部分convertDesiredReplicasWithBehaviorRate方法是根據scaleDown或scaleUp指定的引數做限制。 **stabilizeRecommendationWithBehaviors** ```go func (a *HorizontalController) stabilizeRecommendationWithBehaviors(args NormalizationArg) (int32, string, string) { recommendation := args.DesiredReplicas foundOldSample := false oldSampleIndex := 0 var scaleDelaySeconds int32 var reason, message string var betterRecommendation func(int32, int32) int32 // 如果期望的副本數大於等於當前的副本數,則延遲時間=scaleUpBehaviro的穩定視窗時間 if args.DesiredReplicas >= args.CurrentReplicas { scaleDelaySeconds = *args.ScaleUpBehavior.StabilizationWindowSeconds betterRecommendation = min reason = "ScaleUpStabilized" message = "recent recommendations were lower than current one, applying the lowest recent recommendation" } else { // 期望副本數<當前的副本數 scaleDelaySeconds = *args.ScaleDownBehavior.StabilizationWindowSeconds betterRecommendation = max reason = "ScaleDownStabilized" message = "recent recommendations were higher than current one, applying the highest recent recommendation" } //獲取一個最大的時間視窗 maxDelaySeconds := max(*args.ScaleUpBehavior.StabilizationWindowSeconds, *args.ScaleDownBehavior.StabilizationWindowSeconds) obsoleteCutoff := time.Now().Add(-time.Second * time.Duration(maxDelaySeconds)) cutoff := time.Now().Add(-time.Second * time.Duration(scaleDelaySeconds)) for i, rec := range a.recommendations[args.Key] { if rec.timestamp.After(cutoff) { // 在截止時間之後,則當前建議有效, 則根據之前的比較函式來決策最終的建議副本數 recommendation = betterRecommendation(rec.recommendation, recommendation) } //如果被遍歷到的建議時間是在obsoleteCutoff之前,那麼需要重新設定建議 if rec.timestamp.Before(obsoleteCutoff) { foundOldSample = true oldSampleIndex = i } } //如果被遍歷到的建議時間是在obsoleteCutoff之前,那麼需要重新設定建議 if foundOldSample { a.recommendations[args.Key][oldSampleIndex] = timestampedRecommendation{args.DesiredReplicas, time.Now()} } else { a.recommendations[args.Key] = append(a.recommendations[args.Key], timestampedRecommendation{args.DesiredReplicas, time.Now()}) } return recommendation, reason, message } ``` 這個方法首先會去校驗當前是擴容還是縮容,如果是擴容,那麼將scaleDelaySeconds設定為ScaleUpBehavior的時間,並將betterRecommendation方法設定為min;如果是縮容那麼則相反。 然後會遍歷建議,如果建議時間在視窗時間cutoff之後,那麼需要呼叫betterRecommendation方法來獲取建議值,然後將獲取到的最終結果返回。 **convertDesiredReplicasWithBehaviorRate** ```go func (a *HorizontalController) convertDesiredReplicasWithBehaviorRate(args NormalizationArg) (int32, string, string) { var possibleLimitingReason, possibleLimitingMessage string //如果期望副本數大於當前副本數 if args.DesiredReplicas > args.CurrentReplicas { //獲取預期擴容的pod數量 scaleUpLimit := calculateScaleUpLimitWithScalingRules(args.CurrentReplicas, a.scaleUpEvents[args.Key], args.ScaleUpBehavior) if scaleUpLimit < args.CurrentReplicas { // We shouldn't scale up further until the scaleUpEvents will be cleaned up scaleUpLimit = args.CurrentReplicas } maximumAllowedReplicas := args.MaxReplicas if maximumAllowedReplicas > scaleUpLimit { maximumAllowedReplicas = scaleUpLimit possibleLimitingReason = "ScaleUpLimit" possibleLimitingMessage = "the desired replica count is increasing faster than the maximum scale rate" } else { possibleLimitingReason = "TooManyReplicas" possibleLimitingMessage = "the desired replica count is more than the maximum replica count" } if args.DesiredReplicas > maximumAllowedReplicas { return maximumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage } } else if args.DesiredReplicas < args.CurrentReplicas { //獲取預期縮容的pod數量 scaleDownLimit := calculateScaleDownLimitWithBehaviors(args.CurrentReplicas, a.scaleDownEvents[args.Key], args.ScaleDownBehavior) if scaleDownLimit > args.CurrentReplicas { // We shouldn't scale down further until the scaleDownEvents will be cleaned up scaleDownLimit = args.CurrentReplicas } minimumAllowedReplicas := args.MinReplicas if minimumAllowedReplicas < scaleDownLimit { minimumAllowedReplicas = scaleDownLimit possibleLimitingReason = "ScaleDownLimit" possibleLimitingMessage = "the desired replica count is decreasing faster than the maximum scale rate" } else { possibleLimitingMessage = "the desired replica count is less than the minimum replica count" possibleLimitingReason = "TooFewReplicas" } if args.DesiredReplicas < minimumAllowedReplicas { return minimumAllowedReplicas, possibleLimitingReason, possibleLimitingMessage } } return args.DesiredReplicas, "DesiredWithinRange", "the desired count is within the acceptable range" } ``` 這個方法和上面的方法有些類似,不過是根據behavior具體行為來做一個約束。如果是scaleUp,那麼需要呼叫calculateScaleUpLimitWithScalingRules來獲取預期擴容的pod數量,calculateScaleUpLimitWithScalingRules方法裡面會根據behavior設定的selectPolicy以及scaleUp.type引數來做一個計算,如下: ```go func calculateScaleUpLimitWithScalingRules(currentReplicas int32, scaleEvents []timestampedScaleEvent, scalingRules *autoscalingv2.HPAScalingRules) int32 { var result int32 var proposed int32 var selectPolicyFn func(int32, int32) int32 if *scalingRules.SelectPolicy == autoscalingv2.DisabledPolicySelect { return currentReplicas // Scaling is disabled } else if *scalingRules.SelectPolicy == autoscalingv2.MinPolicySelect { selectPolicyFn = min // For scaling up, the lowest change ('min' policy) produces a minimum value } else { selectPolicyFn = max // Use the default policy otherwise to produce a highest possible change } for _, policy := range scalingRules.Policies { //獲取最近變更的副本數 replicasAddedInCurrentPeriod := getReplicasChangePerPeriod(policy.PeriodSeconds, scaleEvents) periodStartReplicas := currentReplicas - replicasAddedInCurrentPeriod //根據不同的policy型別,決定不同的預期值 if policy.Type == autoscalingv2.PodsScalingPolicy { proposed = int32(periodStartReplicas + policy.Value) } else if policy.Type == autoscalingv2.PercentScalingPolicy { proposed = int32(math.Ceil(float64(periodStartReplicas) * (1 + float64(policy.Value)/100))) } result = selectPolicyFn(result, proposed) } return result } func getReplicasChangePerPeriod(periodSeconds int32, scaleEvents []timestampedScaleEvent) int32 { period := time.Second * time.Duration(periodSeconds) cutoff := time.Now().Add(-period) var replicas int32 //遍歷最近變更 for _, rec := range scaleEvents { if rec.timestamp.After(cutoff) { // 更新副本修改的數量, 會有正負,最終replicas就是最近變更的數量 replicas += rec.replicaChange } } return replicas } ``` 如果沒有設定selectPolicy那麼selectPolicyFn預設就是max方法,然後在遍歷Policies的時候,如果type是pod,那麼就加上一個具體值,如果是Percent,那麼就加上一個百分比。 如果當前的副本數已經大於scaleUpLimit,那麼則設定scaleUpLimit為當前副本數,如果期望副本數超過了最大允許副本數,那麼直接返回,否則返回期望副本數就好了。 下面來一張圖理一下邏輯: ![image-20201004164913743](https://img.luozhiyun.com/20201004174924.png) ## 總結 水平擴充套件大體邏輯可以用下面這兩張圖來進行一個概括,如果感興趣的話,其中有很多細微的邏輯還是需要參照文件和程式碼一起理解起來才會更加的好。 ![image-20201003154222404](https://img.luozhiyun.com/20201004174910.png) ![image-20201004164913743](https://img.luozhiyun.com/20201004174924.png) ## Reference https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale-walkthrough/ https://github.com/kubernetes/community/blob/master/contributors/design-proposals/instrumentation/custom-metrics-