1. 程式人生 > >如何在Kubernetes中實現容器原地升級

如何在Kubernetes中實現容器原地升級

Author: [email protected], Based Kubernetes 1.12

摘要:在Kubernetes中,Pod是排程的基本單元,也是所有內建Workload管理的基本單元,無論是Deployment還是StatefulSet,它們在對管理的應用進行更新時,都是以Pod為單位,Pod作為Immutable Unit。然而,在部署業務時,Pod中除了業務容器,經常會有一個甚至多個SideCar Container,如何在不影響業務Container的情況下,完成對SideCar Container的原地升級呢,這正是本文需要探討的技術實現。

為什麼需要容器的原地升級

在Docker的世界,容器映象作為不可變基礎設施,解決了環境依賴的難題,而Kubernetes將這提升到了Pod的高度,希望每次應用的更新都通過ReCreate Pod的方式完成,這個理念是非常好的,這樣每次ReCreate都是全新的、乾淨的應用環境。對於微服務的部署,這種方式並沒有帶來多大的負擔,而對於傳統應用的部署,一個Pod中可能包含了主業務容器,還有不可剝離的依賴業務容器,以及SideCar元件容器等,這時的Pod就顯得很臃腫了,如果因為要更新其中一個SideCar Container而繼續按照ReCreate Pod的方式進行整個Pod的重建,那負擔還是很大的,體現在:

  • Pod的優雅終止時間(預設30s);
  • Pod重新排程後可能存在的多個容器映象的重新下載耗費時間較長;
  • 應用啟動時間;

因此,因為要更新一個輕量的SideCar卻導致了分鐘級的單個Pod的重建過程,如果應用副本數高達成百上千,那麼整體耗費時間可想而知,如果是使用StatefulSet OrderedReady PodManagementPolicy進行更新的,那代價就是難於接受的。

因此,我們迫切希望能實現,只升級Pod中的某個Container,而不用重建整個Pod,這就是我們說的容器原地升級能力。

Kubernetes是否已經支援Container原地升級

答案是:支援!其實早在兩年都前的Kubernetes v1.5版本就有了對應的程式碼邏輯,本文以Kubernetes 1.12版本的程式碼進行解讀。

很多同學肯定會覺得可疑,Kubernetes中連真正的ReStart都沒有,都是ReCreate Pod,怎麼會只更新Container呢?沒錯,在內建的眾多Workload的Controller的邏輯中,確實如此。Kubernetes把容器原地升級的能力只做在Kubelet這一層,並沒有暴露在Deployment、StatefulSet等Controller中直接提供給使用者,原因很簡單,還是建議大家把Pod作為完整的部署單元。

Kubelet啟動後通過syncLoop進入到主迴圈處理Node上Pod Changes事件,監聽來自file,apiserver,http三類的事件並匯聚到kubetypes.PodUpdate Channel(Config Channel)中,由syncLoopIteration不斷從kubetypes.PodUpdate Channel中消費。

  • 為了實現容器原地升級,我們更改Pod.Spec中對應容器的Image,就會生成kubetypes.UPDATE型別的事件,在syncLoopIteration中呼叫HandlePodUpdates進行處理。
pkg/kubelet/kubelet.go:1870

func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		...
		switch u.Op {
		...
		case kubetypes.UPDATE:
			glog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods))
			handler.HandlePodUpdates(u.Pods)
		...
	...
	}
...
}	
  • HandlePodUpdates通過dispatchWork分發任務,交給podWorker.UpdatePod進行Pod的更新處理,每個Pod都會per-pod goroutines進行Pod的管理工作,也就是podWorker.managePodLoop。在managePodLoop中呼叫Kubelet.syncPod進行Pod的sync處理。

  • Kubelet.syncPod中會根據需求進行Pod的Kill、Cgroup的設定、為Static Pod建立Mirror Pod、為Pod建立data directories、等待Volume掛載等工作,最重要的還會呼叫KubeGenericRuntimeManager.SyncPod進行Pod的狀態維護和干預操作。

  • KubeGenericRuntimeManager.SyncPod確保Running Pod處於期望狀態,主要執行以下操作。容器原地升級背後的核心原理就從這裡開始。

    1. Compute sandbox and container changes.
    2. Kill pod sandbox if necessary.
    3. Kill any containers that should not be running.
    4. Create sandbox if necessary.
    5. Create init containers.
    6. Create normal containers.
  • KubeGenericRuntimeManager.SyncPod中首先呼叫kubeGenericRuntimeManager.computePodActions檢查Pod Spec是否發生變更,並且返回PodActions,記錄為了達到期望狀態需要執行的變更內容。

pkg/kubelet/kuberuntime/kuberuntime_manager.go:451

// computePodActions checks whether the pod spec has changed and returns the changes if true.
func (m *kubeGenericRuntimeManager) computePodActions(pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions {
	glog.V(5).Infof("Syncing Pod %q: %+v", format.Pod(pod), pod)

	createPodSandbox, attempt, sandboxID := m.podSandboxChanged(pod, podStatus)
	changes := podActions{
		KillPod:           createPodSandbox,
		CreateSandbox:     createPodSandbox,
		SandboxID:         sandboxID,
		Attempt:           attempt,
		ContainersToStart: []int{},
		ContainersToKill:  make(map[kubecontainer.ContainerID]containerToKillInfo),
	}

	// If we need to (re-)create the pod sandbox, everything will need to be
	// killed and recreated, and init containers should be purged.
	if createPodSandbox {
		if !shouldRestartOnFailure(pod) && attempt != 0 {
			// Should not restart the pod, just return.
			return changes
		}
		if len(pod.Spec.InitContainers) != 0 {
			// Pod has init containers, return the first one.
			changes.NextInitContainerToStart = &pod.Spec.InitContainers[0]
			return changes
		}
		// Start all containers by default but exclude the ones that succeeded if
		// RestartPolicy is OnFailure.
		for idx, c := range pod.Spec.Containers {
			if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure {
				continue
			}
			changes.ContainersToStart = append(changes.ContainersToStart, idx)
		}
		return changes
	}

	// Check initialization progress.
	initLastStatus, next, done := findNextInitContainerToRun(pod, podStatus)
	if !done {
		if next != nil {
			initFailed := initLastStatus != nil && isContainerFailed(initLastStatus)
			if initFailed && !shouldRestartOnFailure(pod) {
				changes.KillPod = true
			} else {
				changes.NextInitContainerToStart = next
			}
		}
		// Initialization failed or still in progress. Skip inspecting non-init
		// containers.
		return changes
	}

	// Number of running containers to keep.
	keepCount := 0
	// check the status of containers.
	for idx, container := range pod.Spec.Containers {
		containerStatus := podStatus.FindContainerStatusByName(container.Name)

		// Call internal container post-stop lifecycle hook for any non-running container so that any
		// allocated cpus are released immediately. If the container is restarted, cpus will be re-allocated
		// to it.
		if containerStatus != nil && containerStatus.State != kubecontainer.ContainerStateRunning {
			if err := m.internalLifecycle.PostStopContainer(containerStatus.ID.ID); err != nil {
				glog.Errorf("internal container post-stop lifecycle hook failed for container %v in pod %v with error %v",
					container.Name, pod.Name, err)
			}
		}

		// If container does not exist, or is not running, check whether we
		// need to restart it.
		if containerStatus == nil || containerStatus.State != kubecontainer.ContainerStateRunning {
			if kubecontainer.ShouldContainerBeRestarted(&container, pod, podStatus) {
				message := fmt.Sprintf("Container %+v is dead, but RestartPolicy says that we should restart it.", container)
				glog.V(3).Infof(message)
				changes.ContainersToStart = append(changes.ContainersToStart, idx)
			}
			continue
		}
		// The container is running, but kill the container if any of the following condition is met.
		reason := ""
		restart := shouldRestartOnFailure(pod)
		if expectedHash, actualHash, changed := containerChanged(&container, containerStatus); changed {
			reason = fmt.Sprintf("Container spec hash changed (%d vs %d).", actualHash, expectedHash)
			// Restart regardless of the restart policy because the container
			// spec changed.
			restart = true
		} else if liveness, found := m.livenessManager.Get(containerStatus.ID); found && liveness == proberesults.Failure {
			// If the container failed the liveness probe, we should kill it.
			reason = "Container failed liveness probe."
		} else {
			// Keep the container.
			keepCount += 1
			continue
		}

		// We need to kill the container, but if we also want to restart the
		// container afterwards, make the intent clear in the message. Also do
		// not kill the entire pod since we expect container to be running eventually.
		message := reason
		if restart {
			message = fmt.Sprintf("%s. Container will be killed and recreated.", message)
			changes.ContainersToStart = append(changes.ContainersToStart, idx)
		}

		changes.ContainersToKill[containerStatus.ID] = containerToKillInfo{
			name:      containerStatus.Name,
			container: &pod.Spec.Containers[idx],
			message:   message,
		}
		glog.V(2).Infof("Container %q (%q) of pod %s: %s", container.Name, containerStatus.ID, format.Pod(pod), message)
	}

	if keepCount == 0 && len(changes.ContainersToStart) == 0 {
		changes.KillPod = true
	}

	return changes
}
  • computePodActions會檢查Pod Sandbox是否發生變更、各個Container(包括InitContainer)的狀態等因素來決定是否要重建整個Pod。

  • 遍歷Pod內所有Containers:

    • 如果容器還沒啟動,則會根據Container的重啟策略決定是否將Container新增到待啟動容器列表中(PodActions.ContainersToStart);
    • 如果容器的Spec發生變更(比較Hash值),則無論重啟策略是什麼,都要根據新的Spec重建容器,將Container新增到待啟動容器列表中(PodActions.ContainersToStart);
    • 如果Container Spec沒有變更,liveness probe也是成功的,則該Container將保持不動,否則會將容器將入到待Kill列表中(PodActions.ContainersToKill);

PodActions表示要對Pod進行的操作資訊:

pkg/kubelet/kuberuntime/kuberuntime_manager.go:369
// podActions keeps information what to do for a pod.
type podActions struct {
	// Stop all running (regular and init) containers and the sandbox for the pod.
	KillPod bool
	// Whether need to create a new sandbox. If needed to kill pod and create a
	// a new pod sandbox, all init containers need to be purged (i.e., removed).
	CreateSandbox bool
	// The id of existing sandbox. It is used for starting containers in ContainersToStart.
	SandboxID string
	// The attempt number of creating sandboxes for the pod.
	Attempt uint32

	// The next init container to start.
	NextInitContainerToStart *v1.Container
	// ContainersToStart keeps a list of indexes for the containers to start,
	// where the index is the index of the specific container in the pod spec (
	// pod.Spec.Containers.
	ContainersToStart []int
	// ContainersToKill keeps a map of containers that need to be killed, note that
	// the key is the container ID of the container, while
	// the value contains necessary information to kill a container.
	ContainersToKill map[kubecontainer.ContainerID]containerToKillInfo
}

因此,computePodActions的關鍵是的計算出了待啟動的和待Kill的容器列表。接下來,KubeGenericRuntimeManager.SyncPod就會在分別呼叫KubeGenericRuntimeManager.killContainer和startContainer去殺死和啟動容器。

func (m *kubeGenericRuntimeManager) SyncPod(pod *v1.Pod, _ v1.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
	podContainerChanges := m.computePodActions(pod, podStatus)
	...

	// Step 2: Kill the pod if the sandbox has changed.
	if podContainerChanges.KillPod {
		...
	} else {
		// Step 3: kill any running containers in this pod which are not to keep.
		for containerID, containerInfo := range podContainerChanges.ContainersToKill {
			glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerInfo.name, containerID, format.Pod(pod))
			killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerInfo.name)
			result.AddSyncResult(killContainerResult)
			if err := m.killContainer(pod, containerID, containerInfo.name, containerInfo.message, nil); err != nil {
				killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
				glog.Errorf("killContainer %q(id=%q) for pod %q failed: %v", containerInfo.name, containerID, format.Pod(pod), err)
				return
			}
		}
	}

	...

	// Step 4: Create a sandbox for the pod if necessary.
	podSandboxID := podContainerChanges.SandboxID
	if podContainerChanges.CreateSandbox {
		...
	}

	...

	// Step 5: start the init container.
	if container := podContainerChanges.NextInitContainerToStart; container != nil {
	...		

	}

	// Step 6: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		container := &pod.Spec.Containers[idx]
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(pod, container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
			continue
		}

		glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
		if msg, err := m.startContainer(podSandboxID, podSandboxConfig, container, pod, podStatus, pullSecrets, podIP, kubecontainer.ContainerTypeRegular); err != nil {
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				glog.V(3).Infof("container start failed: %v: %s", err, msg)
			default:
				utilruntime.HandleError(fmt.Errorf("container start failed: %v: %s", err, msg))
			}
			continue
		}
	}

	return
}

我們只關注整個流程中與容器原地升級原理相關的程式碼邏輯,對應的流程圖如下:

驗證

使用StatefulSet部署一個Demo,然後修改某個Pod的Spec中nginx容器的映象版本,通過kubelet日誌可以發現的確如此。

  kubelet[1121]: I0412 16:34:28.356083    1121 kubelet.go:1868] SyncLoop (UPDATE, "api"): "web-2_default(2813f459-59cc-11e9-a1f7-525400e7b58a)"
  kubelet[1121]: I0412 16:34:28.657836    1121 kuberuntime_manager.go:549] Container "nginx" ({"docker" "8d16517eb4b7b5b84755434eb25c7ab83667bca44318cbbcd89cf8abd232973f"}) of pod web-2_default(2813f459-59cc-11e9-a1f7-525400e7b58a): Container spec hash changed (3176550502 vs 1676109989).. Container will be killed and recreated.
  kubelet[1121]: I0412 16:34:28.658529    1121 kuberuntime_container.go:548] Killing container "docker://8d16517eb4b7b5b84755434eb25c7ab83667bca44318cbbcd89cf8abd232973f" with 10 second grace period
  kubelet[1121]: I0412 16:34:28.814944    1121 kuberuntime_manager.go:757] checking backoff for container "nginx" in pod "web-2_default(2813f459-59cc-11e9-a1f7-525400e7b58a)"
  kubelet[1121]: I0412 16:34:29.179953    1121 kubelet.go:1906] SyncLoop (PLEG): "web-2_default(2813f459-59cc-11e9-a1f7-525400e7b58a)", event: &pleg.PodLifecycleEvent{ID:"2813f459-59cc-11e9-a1f7-525400e7b58a", Type:"ContainerDied", Data:"8d16517eb4b7b5b84755434eb25c7ab83667bca44318cbbcd89cf8abd232973f"}
  kubelet[1121]: I0412 16:34:29.182257    1121 kubelet.go:1906] SyncLoop (PLEG): "web-2_default(2813f459-59cc-11e9-a1f7-525400e7b58a)", event: &pleg.PodLifecycleEvent{ID:"2813f459-59cc-11e9-a1f7-525400e7b58a", Type:"ContainerStarted", Data:"52e30b1aa621a20ae2eae5accf98c451c1be3aed781609d5635a79e48eb98222"}

從本地docker ps -a命令也能得到驗證:老的容器被終止了,新的容器起來了,而且watch Pod發現Pod沒有重建。

總結

總結一下,當用戶修改了Pod Spec中某個Container的Image資訊後,在KubeGenericRuntimeManager.computePodActions中發現該Container Spec Hash發生改變,呼叫KubeGenericRuntimeManager.killContainer將容器優雅終止。舊的容器被殺死之後,computePodActions中會發現Pod Spec中定義的Container沒有啟動,就會呼叫KubeGenericRuntimeManager.startContainer啟動新的容器,如此即完成Pod不重建的前提下實現容器的原地升級。瞭解技術原理後,我們可以開發一個CRD/Operator,在Operator的邏輯中,實現業務負載層面的灰度的或者滾動的容器原地升級的能力,這樣就能解決臃腫Pod中只更新某個映象而不影響其他