1. 程式人生 > >【kubernetes/k8s原始碼分析】kubelet原始碼分析-statusManager與probeManager

【kubernetes/k8s原始碼分析】kubelet原始碼分析-statusManager與probeManager

簡介

   在 kubelet 初始化的時候,會NewMainKubelet函式中建立 statusManager 和 probeManager。

   statusManager 負責維護狀態資訊,並把 pod 狀態更新到 apiserver,但是不負責監控 pod 狀態的變化,而是提供介面供其他元件呼叫,比如 probeManager。

  probeManager 定時去監控 pod 中容器狀況,一旦發現狀態變化呼叫 statusManager 提供的方法更新 pod 狀態。

  readinessProbe 檢測容器是否可以接受請求,如果檢測結果失敗,則將其從 service 的 endpoints 中移除,後續的請求也就不會發送給這個容器;livenessProbe 檢測容器是否存活,如果檢測結果失敗,kubelet 會殺死這個容器,並重啟一個新容器(除非 RestartPolicy 設定成了 Never)。

klet.livenessManager = proberesults.NewManager()

klet.podCache = kubecontainer.NewCache()
// podManager is also responsible for keeping secretManager and configMapManager contents up-to-date.
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient), secretManager, configMapManager)
  初始化probeManager,看到readiness liveness是不是特別熟悉
	klet.probeManager = prober.NewManager(
		klet.statusManager,
		klet.livenessManager,
		klet.runner,
		containerRefManager,
		kubeDeps.Recorder)

StatusManager

  statusManager 對應的程式碼在 pkg/kubelet/status/status_manager.go 檔案中,主要介面如下:

type PodStatusProvider interface {
       // GetPodStatus returns the cached status for the provided pod UID, as well as whether it
       // was a cache hit.
       GetPodStatus(uid types.UID) (v1.PodStatus, bool)
}

    PodStatusprovider主要為其他元件提供介面

type Manager interface {
       PodStatusProvider

       // Start the API server status sync loop.
       Start()

       // SetPodStatus caches updates the cached status for the given pod, and triggers a status update.
       SetPodStatus(pod *v1.Pod, status v1.PodStatus)

       // SetContainerReadiness updates the cached container status with the given readiness, and
       // triggers a status update.
       SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool)

       // TerminatePod resets the container status for the provided pod to terminated and triggers
       // a status update.
       TerminatePod(pod *v1.Pod)

       // RemoveOrphanedStatuses scans the status cache and removes any entries for pods not included in
       // the provided podUIDs.
       RemoveOrphanedStatuses(podUIDs map[types.UID]bool)
}

     這個介面的方法:獲取 pod 的狀態、goroutine 同步工作、修改 pod 狀態。修改狀態的方法有多個,每個都有不同的用途:

  • SetPodStatus:如 pod 狀態發生變化會呼叫這個方法,把新狀態更新到 apiserver,一般在 kubelet 維護 pod 生命週期的時候會呼叫
  • SetContainerReadiness:如果健康檢查發現 pod 中容器的狀態變化會呼叫這個方法,修改 pod 的健康狀態
  • TerminatePod:刪除 pod 時候呼叫這個方法,把 pod 中所有的容器置為 terminated 
  • RemoveOrphanedStatuses:刪除孤兒 pod,直接把對應的狀態資料從快取中刪除即可
kl.statusManager.Start()
kl.probeManager.Start()

 1. Start函式

  路徑: pkg/kubelet/status/status_manager.go 

  Start() 方法是在 kubelet 執行的時候呼叫的,它會啟動一個 goroutine 執行更新操作:

func (m *manager) Start() {
       // Don't start the status manager if we don't have a client. This will happen
       // on the master, where the kubelet is responsible for bootstrapping the pods
       // of the master components.
       if m.kubeClient == nil {
              glog.Infof("Kubernetes client is nil, not starting status manager.")
              return
       }

       glog.Info("Starting to sync pod status with apiserver")
       syncTicker := time.Tick(syncPeriod)
       // syncPod and syncBatch share the same go routine to avoid sync races.
       go wait.Forever(func() {
              select {
              case syncRequest := <-m.podStatusChannel:
                     glog.V(5).Infof("Status Manager: syncing pod: %q, with status: (%d, %v) from podStatusChannel",
                            syncRequest.podUID, syncRequest.status.version, syncRequest.status.status)
                     m.syncPod(syncRequest.podUID, syncRequest.status)
              case <-syncTicker:
                     m.syncBatch()
              }
       }, 0)
}

從兩個 channel 監聽處理:syncTicker 是個定時器,也就是說它會定時保證 apiserver 和自己快取的最新 pod 狀態保持一致;podStatusChannel 是所有 pod 狀態更新發送到的地方,呼叫方不會直接操作這個 channel,而是通過呼叫上面提到的修改狀態的各種方法,這些方法內部會往這個 channel 寫資料。

  1.2 syncBatch函式

        syncBatch定期的和apiserver同步pod狀態:清除掉孤立的版本。最終呼叫syncPod進行更新狀態

func (m *manager) syncBatch() {
       var updatedStatuses []podStatusSyncRequest
       podToMirror, mirrorToPod := m.podManager.GetUIDTranslations()
       func() { // Critical section
              m.podStatusesLock.RLock()
              defer m.podStatusesLock.RUnlock()

              // Clean up orphaned versions.
              for uid := range m.apiStatusVersions {
                     _, hasPod := m.podStatuses[uid]
                     _, hasMirror := mirrorToPod[uid]
                     if !hasPod && !hasMirror {
                            delete(m.apiStatusVersions, uid)
                     }
              }

              for uid, status := range m.podStatuses {
                     syncedUID := uid
                     if mirrorUID, ok := podToMirror[uid]; ok {
                            if mirrorUID == "" {
                                   glog.V(5).Infof("Static pod %q (%s/%s) does not have a corresponding mirror pod; skipping", uid, status.podName, status.podNamespace)
                                   continue
                            }
                            syncedUID = mirrorUID
                     }
                     if m.needsUpdate(syncedUID, status) {
                            updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                     } else if m.needsReconcile(uid, status.status) {
                            // Delete the apiStatusVersions here to force an update on the pod status
                            // In most cases the deleted apiStatusVersions here should be filled
                            // soon after the following syncPod() [If the syncPod() sync an update
                            // successfully].
                            delete(m.apiStatusVersions, syncedUID)
                            updatedStatuses = append(updatedStatuses, podStatusSyncRequest{uid, status})
                     }
              }
       }()

       for _, update := range updatedStatuses {
              glog.V(5).Infof("Status Manager: syncPod in syncbatch. pod UID: %q", update.podUID)
              m.syncPod(update.podUID, update.status)
       }
}

  1. 2 syncPod函式

     syncPod 根據引數中的 pod 和它的狀態資訊對 apiserver 中的資料進行更新(呼叫API CoreV1().Pods(namespace).Patch),如果發現 pod 已經被刪除也會把它從內部資料結構中刪除。

func (m *manager) syncPod(uid types.UID, status versionedPodStatus) {
       if !m.needsUpdate(uid, status) {
              glog.V(1).Infof("Status for pod %q is up-to-date; skipping", uid)
              return
       }

       // TODO: make me easier to express from client code
       pod, err := m.kubeClient.Core().Pods(status.podNamespace).Get(status.podName, metav1.GetOptions{})
       if errors.IsNotFound(err) {
              glog.V(3).Infof("Pod %q (%s) does not exist on the server", status.podName, uid)
              // If the Pod is deleted the status will be cleared in
              // RemoveOrphanedStatuses, so we just ignore the update here.
              return
       }
       if err != nil {
              glog.Warningf("Failed to get status for pod %q: %v", format.PodDesc(status.podName, status.podNamespace, uid), err)
              return
       }

       translatedUID := m.podManager.TranslatePodUID(pod.UID)
       if len(translatedUID) > 0 && translatedUID != uid {
              glog.V(2).Infof("Pod %q was deleted and then recreated, skipping status update; old UID %q, new UID %q", format.Pod(pod), uid, translatedUID)
              m.deletePodStatus(uid)
              return
       }
       pod.Status = status.status
       if err := podutil.SetInitContainersStatusesAnnotations(pod); err != nil {
              glog.Error(err)
       }
       // TODO: handle conflict as a retry, make that easier too.
       newPod, err := m.kubeClient.Core().Pods(pod.Namespace).UpdateStatus(pod)
       if err != nil {
              glog.Warningf("Failed to update status for pod %q: %v", format.Pod(pod), err)
              return
       }
       pod = newPod

       glog.V(3).Infof("Status for pod %q updated successfully: (%d, %+v)", format.Pod(pod), status.version, status.status)
       m.apiStatusVersions[pod.UID] = status.version

       // We don't handle graceful deletion of mirror pods.
       if m.canBeDeleted(pod, status.status) {
              deleteOptions := metav1.NewDeleteOptions(0)
              // Use the pod UID as the precondition for deletion to prevent deleting a newly created pod with the same name and namespace.
              deleteOptions.Preconditions = metav1.NewUIDPreconditions(string(pod.UID))
              err = m.kubeClient.Core().Pods(pod.Namespace).Delete(pod.Name, deleteOptions)
              if err != nil {
                     glog.Warningf("Failed to delete status for pod %q: %v", format.Pod(pod), err)
                     return
              }
              glog.V(3).Infof("Pod %q fully terminated and removed from etcd", format.Pod(pod))
              m.deletePodStatus(uid)
       }
}

2. ProbeManager

  2.1 Manager介面

    路徑pkg/kubelet/prober/prober_manager.go

type Manager interface {
       // AddPod creates new probe workers for every container probe. This should be called for every
       // pod created.
       AddPod(pod *v1.Pod)

       // RemovePod handles cleaning up the removed pod state, including terminating probe workers and
       // deleting cached results.
       RemovePod(pod *v1.Pod)

       // CleanupPods handles cleaning up pods which should no longer be running.
       // It takes a list of "active pods" which should not be cleaned up.
       CleanupPods(activePods []*v1.Pod)

       // UpdatePodStatus modifies the given PodStatus with the appropriate Ready state for each
       // container based on container running status, cached probe results and worker states.
       UpdatePodStatus(types.UID, *v1.PodStatus)

       // Start starts the Manager sync loops.
       Start()
}

     probeManager 檢測 pod 中容器的健康狀態,目前有兩種 probe:readiness 和 liveness。

const (
       liveness probeType = iota
       readiness
)

      readinessProbe 檢測容器是否可以接受請求,如果檢測結果失敗,則將其從 service 的 endpoints 中移除,後續的請求也就不會發送給這個容器;livenessProbe 檢測容器是否存活,如果檢測結果失敗,kubelet 會殺死這個容器,並重啟一個新容器(除非 RestartPolicy 設定成了 Never)。

      並不是所有的 pod 中的容器都有健康檢查的探針,如果沒有則不進行檢測則認為容器是正常的。

  資料流:

      Run ->

      syncLoop(kubelet) ->

      syncLoopIteration ->

      HandlePodAdditions

      在每次建立新 pod 的時候,kubelet 都會呼叫 probeManager.AddPod(pod) 方法,呼叫的位置為:

func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
       start := kl.clock.Now()
       sort.Sort(sliceutils.PodsByCreationTime(pods))
       for _, pod := range pods {
              existingPods := kl.podManager.GetPods()
              // Always add the pod to the pod manager. Kubelet relies on the pod
              // manager as the source of truth for the desired state. If a pod does
              // not exist in the pod manager, it means that it has been deleted in
              // the apiserver and no action (other than cleanup) is required.
              kl.podManager.AddPod(pod)

              if kubepod.IsMirrorPod(pod) {
                     kl.handleMirrorPod(pod, start)
                     continue
              }

              if !kl.podIsTerminated(pod) {
                     // Only go through the admission process if the pod is not
                     // terminated.

                     // We failed pods that we rejected, so activePods include all admitted
                     // pods that are alive.
                     activePods := kl.filterOutTerminatedPods(existingPods)

                     // Check if we can admit the pod; if not, reject it.
                     if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
                            kl.rejectPod(pod, reason, message)
                            continue
                     }
              }
              mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
              kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
              kl.probeManager.AddPod(pod)
       }
}

       它對應的實現在 pkg/kubelet/prober/prober_manager.go 檔案中:

func (m *manager) AddPod(pod *v1.Pod) {
       m.workerLock.Lock()
       defer m.workerLock.Unlock()

       key := probeKey{podUID: pod.UID}
       for _, c := range pod.Spec.Containers {
              key.containerName = c.Name

              if c.ReadinessProbe != nil {
                     key.probeType = readiness
                     if _, ok := m.workers[key]; ok {
                            glog.Errorf("Readiness probe already exists! %v - %v",
                                   format.Pod(pod), c.Name)
                            return
                     }
                     w := newWorker(m, readiness, pod, c)
               m.workers[key] = w
               go w.run()
              }

              if c.LivenessProbe != nil {
                     key.probeType = liveness
                     if _, ok := m.workers[key]; ok {
                            glog.Errorf("Liveness probe already exists! %v - %v",
                                   format.Pod(pod), c.Name)
                            return
                     }
                     w := newWorker(m, liveness, pod, c)
                     m.workers[key] = w
                     go w.run()
              }
       }
}

      遍歷 pod 中的容器,如定義了 readiness 或者 liveness,就建立一個 worker一個 goroutine 在後臺執行這個 worker,run()函式定期探測容器,建立兩個定時器。

func (w *worker) run() {
       probeTickerPeriod := time.Duration(w.spec.PeriodSeconds) * time.Second
       probeTicker := time.NewTicker(probeTickerPeriod)

       defer func() {
              // Clean up.
              probeTicker.Stop()
              if !w.containerID.IsEmpty() {
                     w.resultsManager.Remove(w.containerID)
              }

              w.probeManager.removeWorker(w.pod.UID, w.container.Name, w.probeType)
       }()

       // If kubelet restarted the probes could be started in rapid succession.
       // Let the worker wait for a random portion of tickerPeriod before probing.
       time.Sleep(time.Duration(rand.Float64() * float64(probeTickerPeriod)))

probeLoop:
       for w.doProbe() {
              // Wait for next probe tick.
              select {
              case <-w.stopCh:
                     break probeLoop
              case <-probeTicker.C:
                     // continue
              }
       }
}

    2.2 doProbe探測容器一次並報告結果,該函式返回值是否worker繼續執行

func (w *worker) doProbe() (keepGoing bool) {
       defer func() { recover() }() // Actually eat panics (HandleCrash takes care of logging)
       defer runtime.HandleCrash(func(_ interface{}) { keepGoing = true })

       status, ok := w.probeManager.statusManager.GetPodStatus(w.pod.UID)
       if !ok {
              // Either the pod has not been created yet, or it was already deleted.
              glog.V(3).Infof("No status for pod: %v", format.Pod(w.pod))
              return true
       }

       // Worker should terminate if pod is terminated.
       if status.Phase == v1.PodFailed || status.Phase == v1.PodSucceeded {
              glog.V(3).Infof("Pod %v %v, exiting probe worker",
                     format.Pod(w.pod), status.Phase)
              return false
       }

       c, ok := podutil.GetContainerStatus(status.ContainerStatuses, w.container.Name)
       if !ok || len(c.ContainerID) == 0 {
              // Either the container has not been created yet, or it was deleted.
              glog.V(3).Infof("Probe target container not found: %v - %v",
                     format.Pod(w.pod), w.container.Name)
              return true // Wait for more information.
       }

       if w.containerID.String() != c.ContainerID {
              if !w.containerID.IsEmpty() {
                     w.resultsManager.Remove(w.containerID)
              }
              w.containerID = kubecontainer.ParseContainerID(c.ContainerID)
              w.resultsManager.Set(w.containerID, w.initialValue, w.pod)
              // We've got a new container; resume probing.
              w.onHold = false
       }

       if w.onHold {
              // Worker is on hold until there is a new container.
              return true
       }

       if c.State.Running == nil {
              glog.V(3).Infof("Non-running container probed: %v - %v",
                     format.Pod(w.pod), w.container.Name)
              if !w.containerID.IsEmpty() {
                     w.resultsManager.Set(w.containerID, results.Failure, w.pod)
              }
              // Abort if the container will not be restarted.
              return c.State.Terminated == nil ||
                     w.pod.Spec.RestartPolicy != v1.RestartPolicyNever
       }

       if int32(time.Since(c.State.Running.StartedAt.Time).Seconds()) < w.spec.InitialDelaySeconds {
              return true
       }

       // TODO: in order for exec probes to correctly handle downward API env, we must be able to reconstruct
       // the full container environment here, OR we must make a call to the CRI in order to get those environment
       // values from the running container.
       result, err := w.probeManager.prober.probe(w.probeType, w.pod, status, w.container, w.containerID)
       if err != nil {
              // Prober error, throw away the result.
              return true
       }

       if w.lastResult == result {
              w.resultRun++
       } else {
              w.lastResult = result
              w.resultRun = 1
       }

       if (result == results.Failure && w.resultRun < int(w.spec.FailureThreshold)) ||
              (result == results.Success && w.resultRun < int(w.spec.SuccessThreshold)) {
              // Success or failure is below threshold - leave the probe state unchanged.
              return true
       }

       w.resultsManager.Set(w.containerID, result, w.pod)

       if w.probeType == liveness && result == results.Failure {
              // The container fails a liveness check, it will need to be restarted.
              // Stop probing until we see a new container ID. This is to reduce the
              // chance of hitting #21751, where running `docker exec` when a
              // container is being stopped may lead to corrupted container state.
              w.onHold = true
              w.resultRun = 1
       }

       return true
}

    2.3 runProbe函式

    最主要函式probe,呼叫最終函式執行的命令exec,HTTP, TCP三種方式,程式碼如下:

func (pb *prober) runProbe(p *v1.Probe, pod *v1.Pod, status v1.PodStatus, container v1.Container, containerID kubecontainer.ContainerID) (probe.Result, string, error) {
       timeout := time.Duration(p.TimeoutSeconds) * time.Second
       if p.Exec != nil {
              glog.V(4).Infof("Exec-Probe Pod: %v, Container: %v, Command: %v", pod, container, p.Exec.Command)
              command := kubecontainer.ExpandContainerCommandOnlyStatic(p.Exec.Command, container.Env)
              return pb.exec.Probe(pb.newExecInContainer(container, containerID, command, timeout))
       }
       if p.HTTPGet != nil {
              scheme := strings.ToLower(string(p.HTTPGet.Scheme))
              host := p.HTTPGet.Host
              if host == "" {
                     host = status.PodIP
              }
              port, err := extractPort(p.HTTPGet.Port, container)
              if err != nil {
                     return probe.Unknown, "", err
              }
              path := p.HTTPGet.Path
              glog.V(4).Infof("HTTP-Probe Host: %v://%v, Port: %v, Path: %v", scheme, host, port, path)
              url := formatURL(scheme, host, port, path)
              headers := buildHeader(p.HTTPGet.HTTPHeaders)
              glog.V(4).Infof("HTTP-Probe Headers: %v", headers)
              return pb.http.Probe(url, headers, timeout)
       }
       if p.TCPSocket != nil {
              port, err := extractPort(p.TCPSocket.Port, container)
              if err != nil {
                     return probe.Unknown, "", err
              }
              host := p.TCPSocket.Host
              if host == "" {
                     host = status.PodIP
              }
              glog.V(4).Infof("TCP-Probe Host: %v, Port: %v, Timeout: %v", host, port, timeout)
              return pb.tcp.Probe(host, port, timeout)
       }
       glog.Warningf("Failed to find probe builder for container: %v", container)
       return probe.Unknown, "", fmt.Errorf("Missing probe handler for %s:%s", format.Pod(pod), container.Name)
}

  2.3.1  Probe函式 exec方式

    路徑: pkg/probe/exec/exec.go

    執行的為cmd命令

type ExecProber interface {
	Probe(e exec.Cmd) (probe.Result, string, error)
}

type execProber struct{}

func (pr execProber) Probe(e exec.Cmd) (probe.Result, string, error) {
	data, err := e.CombinedOutput()
	glog.V(4).Infof("Exec probe response: %q", string(data))
	if err != nil {
		exit, ok := err.(exec.ExitError)
		if ok {
			if exit.ExitStatus() == 0 {
				return probe.Success, string(data), nil
			} else {
				return probe.Failure, string(data), nil
			}
		}
		return probe.Unknown, "", err
	}
	return probe.Success, string(data), nil
}

  2.3.2  probe函式 HTTPGET方式

    路徑: pkg/probe/http/http.go

    HTTP請求response code 在200到400之間為成功

/ DoHTTPProbe checks if a GET request to the url succeeds.
// If the HTTP response code is successful (i.e. 400 > code >= 200), it returns Success.
// If the HTTP response code is unsuccessful or HTTP communication fails, it returns Failure.
// This is exported because some other packages may want to do direct HTTP probes.
func DoHTTPProbe(url *url.URL, headers http.Header, client HTTPGetInterface) (probe.Result, string, error) {
	req, err := http.NewRequest("GET", url.String(), nil)
	if err != nil {
		// Convert errors into failures to catch timeouts.
		return probe.Failure, err.Error(), nil
	}
	if _, ok := headers["User-Agent"]; !ok {
		if headers == nil {
			headers = http.Header{}
		}
		// explicitly set User-Agent so it's not set to default Go value
		v := version.Get()
		headers.Set("User-Agent", fmt.Sprintf("kube-probe/%s.%s", v.Major, v.Minor))
	}
	req.Header = headers
	if headers.Get("Host") != "" {
		req.Host = headers.Get("Host")
	}
	res, err := client.Do(req)
	if err != nil {
		// Convert errors into failures to catch timeouts.
		return probe.Failure, err.Error(), nil
	}
	defer res.Body.Close()
	b, err := ioutil.ReadAll(res.Body)
	if err != nil {
		return probe.Failure, "", err
	}
	body := string(b)
	if res.StatusCode >= http.StatusOK && res.StatusCode < http.StatusBadRequest {
		glog.V(4).Infof("Probe succeeded for %s, Response: %v", url.String(), *res)
		return probe.Success, body, nil
	}
	glog.V(4).Infof("Probe failed for %s with request headers %v, response body: %v", url.String(), headers, body)
	return probe.Failure, fmt.Sprintf("HTTP probe failed with statuscode: %d", res.StatusCode), nil
}

  2.3.3  probe函式 TCPSocket方式

    路徑: pkg/probe/tcp/tcp.go

    可以建立TCP連線為成功

// DoTCPProbe checks that a TCP socket to the address can be opened.
// If the socket can be opened, it returns Success
// If the socket fails to open, it returns Failure.
// This is exported because some other packages may want to do direct TCP probes.
func DoTCPProbe(addr string, timeout time.Duration) (probe.Result, string, error) {
	conn, err := net.DialTimeout("tcp", addr, timeout)
	if err != nil {
		// Convert errors to failures to handle timeouts.
		return probe.Failure, err.Error(), nil
	}
	err = conn.Close()
	if err != nil {
		glog.Errorf("Unexpected error closing TCP probe socket: %v (%#v)", err, err)
	}
	return probe.Success, "", nil
}

   w.resultsManager.Set(w.containerID, result, w.pod) 來儲存檢測結果。結果儲存在快取中,併發送到 m.updates 管道。對於 liveness 來說,它的管道消費者是 kubelet,syncLoopIteration監聽channel:

case update := <-kl.livenessManager.Updates():
       if update.Result == proberesults.Failure {
              // The liveness manager detected a failure; sync the pod.

              // We should not use the pod from livenessManager, because it is never updated after
              // initialization.
              pod, ok := kl.podManager.GetPodByUID(update.PodUID)
              if !ok {
                     // If the pod no longer exists, ignore the update.
                     glog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update)
                     break
              }
              glog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod))
              handler.HandlePodSyncs([]*v1.Pod{pod})
       }

    liveness 關係pod 的存亡,所以需要 kubelet 的處理。而 readiness 失敗也不會重建 pod,它的處理邏輯是不同的,只需要呼叫一次即可:

func (m *manager) Start() {
       // Start syncing readiness.
       go wait.Forever(m.updateReadiness, 0)
}
func (m *manager) updateReadiness() {
       update := <-m.readinessManager.Updates()

       ready := update.Result == results.Success
       m.statusManager.SetContainerReadiness(update.PodUID, update.ContainerID, ready)
}

proberManager 啟動執行一個 goroutine 定時讀取 readinessManager channel中的資料,並呼叫 statusManager 去更新 apiserver 中 pod 的狀態資訊。負責 Service 邏輯的元件獲取到了這個狀態,就能根據不同的值來決定是否需要更新 endpoints 的內容,也就是 service 的請求是否傳送到這個 pod。

func (m *manager) SetContainerReadiness(podUID types.UID, containerID kubecontainer.ContainerID, ready bool) {
       m.podStatusesLock.Lock()
       defer m.podStatusesLock.Unlock()

       pod, ok := m.podManager.GetPodByUID(podUID)
       if !ok {
              glog.V(4).Infof("Pod %q has been deleted, no need to update readiness", string(podUID))
              return
       }

       oldStatus, found := m.podStatuses[pod.UID]
       if !found {
              glog.Warningf("Container readiness changed before pod has synced: %q - %q",
                     format.Pod(pod), containerID.String())
              return
       }

       // Find the container to update.
       containerStatus, _, ok := findContainerStatus(&oldStatus.status, containerID.String())
       if !ok {
              glog.Warningf("Container readiness changed for unknown container: %q - %q",
                     format.Pod(pod), containerID.String())
              return
       }

       if containerStatus.Ready == ready {
              glog.V(4).Infof("Container readiness unchanged (%v): %q - %q", ready,
                     format.Pod(pod), containerID.String())
              return
       }

       // Make sure we're not updating the cached version.
       status, err := copyStatus(&oldStatus.status)
       if err != nil {
              return
       }
       containerStatus, _, _ = findContainerStatus(&status, containerID.String())
       containerStatus.Ready = ready

       // Update pod condition.
       readyConditionIndex := -1
       for i, condition := range status.Conditions {
              if condition.Type == v1.PodReady {
                     readyConditionIndex = i
                     break
              }
       }
       readyCondition := GeneratePodReadyCondition(&pod.Spec, status.ContainerStatuses, status.Phase)
       if readyConditionIndex != -1 {
              status.Conditions[readyConditionIndex] = readyCondition
       } else {
              glog.Warningf("PodStatus missing PodReady condition: %+v", status)
              status.Conditions = append(status.Conditions, readyCondition)
       }

       m.updateStatusInternal(pod, status, false)
}

    SetContainerReadiness更新cache容器狀態,會觸發更新狀態,首先檢測是否需要更新(條件檢查);