1. 程式人生 > >11.深入k8s:kubelet工作原理及其初始化原始碼分析

11.深入k8s:kubelet工作原理及其初始化原始碼分析

![62953793_p0](https://img.luozhiyun.com/20200920121011.jpg) > 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com > > 原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19) kubelet資訊量是很大的,通過我這一篇文章肯定是講不全的,大家可以根據自己的情況到Reference或文章的連結補錄自己缺失的知識。 ## kubelet 主要功能 在kubernetes叢集中,每個Node節點都會啟動kubelet程序,用來處理Master節點下發到本節點的任務,管理Pod和其中的容器。 ### pod 管理 Kubelet 以 PodSpec 的方式工作。PodSpec 是描述一個 Pod 的 YAML 或 JSON 物件。 kubelet 採用一組通過各種機制提供的 PodSpecs(主要通過 apiserver),並確保這些 PodSpecs 中描述的 Pod 正常健康執行。 官方提供了4中方式來獲取容器資訊: * apiserver:通過 API Server 監聽 etcd 目錄獲取資料; * File:啟動引數 --config 指定的配置目錄下的檔案; * 通過 url 從網路上某個地址來獲取資訊 拿apiserver來說,如果Kubelet 監聽到etcd中有新的繫結到本節點的 Pod,則按照 Pod 清單的要求建立該 Pod;如果發現本地的 Pod 被修改,則 Kubelet 會做出相應的修改。 ### 容器健康檢查 容器健康檢查這個我們在前面已經聊過,主要是通過LivenessProbe 與ReadinessProbe來判斷容器是否健康。 * LivenessProbe :用於判斷容器是否健康,告訴 Kubelet 一個容器什麼時候處於不健康的狀態。如果 LivenessProbe 探針探測到容器不健康,則 Kubelet 將刪除該容器,並根據容器的重啟策略做相應的處理。如果一個容器不包含 LivenessProbe 探針,那麼 Kubelet 認為該容器的 LivenessProbe 探針返回的值永遠是 “Success”; * ReadinessProbe:用於判斷容器是否啟動完成且準備接收請求。如果 ReadinessProbe 探針探測到失敗,則 Pod 的狀態將被修改。Endpoint Controller 將從 Service 的 Endpoint 中刪除包含該容器所在 Pod 的 IP 地址的 Endpoint 條目。 ### 容器監控 Kubelet 通過 cAdvisor 獲取其所在節點及容器的資料。cAdvisor 是一個開源的分析容器資源使用率和效能特性的代理工具,整合到 Kubelet中,當Kubelet啟動時會同時啟動cAdvisor,且一個cAdvisor只監控一個Node節點的資訊。cAdvisor 自動查詢所有在其所在節點上的容器,自動採集 CPU、記憶體、檔案系統和網路使用的統計資訊。cAdvisor 通過它所在節點機的 Root 容器,採集並分析該節點機的全面使用情況。 ### kubelet 工作原理 這裡借用網上的一張圖來說明情況: ![img](https://img.luozhiyun.com/20200920120525.png) 由圖我們可以看到kubelet 的工作核心,就是一個控制迴圈,即:SyncLoop。驅動整個控制迴圈的事件有:pod更新事件、pod生命週期變化、kubelet本身設定的執行週期、定時清理事件等。 在SyncLoop迴圈上還有很多xxManager,例如probeManager 會定時去監控 pod 中容器的健康狀況,當前支援兩種型別的探針:livenessProbe 和readinessProbe;statusManager 負責維護狀態資訊,並把 pod 狀態更新到 apiserver;containerRefManager 容器引用的管理,相對簡單的Manager,用來報告容器的建立,失敗等事件等等。 kubelet 呼叫下層容器執行時的執行過程,並不會直接呼叫 Docker 的 API,而是通過一組叫作 CRI(Container Runtime Interface,容器執行時介面)的 gRPC 介面來間接執行的。 ![img](https://img.luozhiyun.com/20200920120529.png) CRI是k8s對容器的操作抽離出的一系列的介面,kubelet 就只需要跟這個介面打交道,而不需要關注底層的容器時docker還是rkt,底層的容器只需要自己提供一個該介面的實現,然後對 kubelet 暴露出 gRPC 服務即可。有關CRI的可以內容可以看看這篇:[Introducing Container Runtime Interface](https://kubernetes.io/blog/2016/12/container-runtime-interface-cri-in-kubernetes/)。 一般來說CRI介面可以分為兩組: 一組是ImageService,主要是容器映象相關的操作,比如拉取映象、刪除映象等。 另一組是RuntimeService,主要是跟容器相關的操作,比如建立、啟動、刪除Container、Exec等。 如下圖(沒有列全): ![image-20200919210145733](https://img.luozhiyun.com/20200920120534.png) ## kubelet執行原始碼分析 ### **Run** ![image-20200920115529322](https://img.luozhiyun.com/20200920120537.png) 檔案地址:kubernetes\pkg\kubelet\kubelet.go ```go func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { //註冊 logServer if kl.logServer == nil { kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/"))) } if kl.kubeClient == nil { klog.Warning("No api server defined - no node status update will be sent.") } //Cloud Provider 擴充套件相關:https://kubernetes.feisky.xyz/extension/cloud-provider if kl.cloudResourceSyncManager != nil { go kl.cloudResourceSyncManager.Run(wait.NeverStop) } //呼叫 kl.initializeModules 首先啟動不依賴 container runtime 的一些模組 if err := kl.initializeModules(); err != nil { kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error()) klog.Fatal(err) } //啟動 volume manager go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop) if kl.kubeClient != nil { //執行 kl.syncNodeStatus 定時同步 Node 狀態 go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop) //呼叫 kl.fastStatusUpdateOnce 更新容器執行時啟動時間以及執行首次狀態同步 go kl.fastStatusUpdateOnce() // start syncing lease //NodeLease 機制 go kl.nodeLeaseController.Run(wait.NeverStop) } //執行 kl.updateRuntimeUp 定時更新 Runtime 狀態 go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) // Set up iptables util rules //執行 kl.syncNetworkUtil 定時同步 iptables 規則 if kl.makeIPTablesUtilChains { kl.initNetworkUtil() } //獲取 pk.podKillingCh異常pod, 並定時清理異常 pod go wait.Until(kl.podKiller.PerformPodKillingWork, 1*time.Second, wait.NeverStop) // Start component sync loops. //啟動 statusManager、probeManager、runtimeClassManager kl.statusManager.Start() kl.probeManager.Start() // Start syncing RuntimeClasses if enabled. if kl.runtimeClassManager != nil { kl.runtimeClassManager.Start(wait.NeverStop) } // Start the pod lifecycle event generator. //啟動 pleg 該模組主要用於週期性地向 container runtime 重新整理當前所有容器的狀態 //https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md kl.pleg.Start() kl.syncLoop(updates, kl) } ``` 這個方法會做以下事情: 1. 註冊logServer; 2. 如果設定了Cloud Provider,那麼會啟動雲資源管理器,具體的可以檢視文章:[cloud-provider](https://kubernetes.feisky.xyz/extension/cloud-provider); 3. 呼叫kl.initializeModules啟動不依賴 container runtime 的一些模組,這個方法我們下面再分析; 4. 啟動 volume manager; 5. 執行 kl.syncNodeStatus 定時同步 Node 狀態; 6. 呼叫kl.fastStatusUpdateOnce啟動一個迴圈更新pod CIDR、runtime狀態以及node狀態; 7. 呼叫kl.nodeLeaseController.Run啟動NodeLease機制,NodeLease機制是一種上報心跳的方式,可以通過更加輕量化節約資源的方式,並提升效能上報node的心跳資訊,具體看: [Lease object](https://kubernetes.io/docs/concepts/architecture/nodes/#heartbeats); 8. 執行 kl.updateRuntimeUp 定時更新 Runtime 狀態; 9. 執行 kl.syncNetworkUtil 定時同步 iptables 規則; 10. 獲取 pk.podKillingCh異常pod, 並定時清理異常 pod; 11. 然後啟動 statusManager、probeManager、runtimeClassManager; 12. 啟動 pleg模組,該模組主要用於週期性地向 container runtime 上報當前所有容器的狀態,具體可以看:[Pod Lifecycle Event Generator (PLEG)](https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-generator.md); 13. 呼叫kl.syncLoop啟動kublet事件迴圈; #### initializeModules 下面我們看看initializeModules方法做了些什麼。 ```go func (kl *Kubelet) initializeModules() error { ... //建立檔案目錄 if err := kl.setupDataDirs(); err != nil { return err } //建立 ContainerLogsDir if _, err := os.Stat(ContainerLogsDir); err != nil { if err := kl.os.MkdirAll(ContainerLogsDir, 0755); err != nil { return fmt.Errorf("failed to create directory %q: %v", ContainerLogsDir, err) } } //啟動 imageManager kl.imageManager.Start() //啟動 certificate manager ,證書相關 if kl.serverCertificateManager != nil { kl.serverCertificateManager.Start() } //啟動 oomWatcher. if err := kl.oomWatcher.Start(kl.nodeRef); err != nil { return fmt.Errorf("failed to start OOM watcher %v", err) } //啟動 resource analyzer,重新整理volume stats到快取中 kl.resourceAnalyzer.Start() return nil } ``` initializeModules方法主要做了以下幾件事: 1. 建立建立檔案目錄、Container的log目錄; 2. 啟動 imageManager,這個管理器實際上是realImageGCManager,我們待會看; 3. 啟動 certificate manager ,證書相關; 4. 啟動 oomWatcher監視器; 5. 啟動 resource analyzer,定時重新整理volume stats到快取中; **realImageGCManager#Start** 檔案路徑:pkg/kubelet/images/image_gc_manager.go ```go func (im *realImageGCManager) Start() { go wait.Until(func() { var ts time.Time if im.initialized { ts = time.Now() } //找出所有的image,並刪除不再使用的image _, err := im.detectImages(ts) if err != nil { klog.Warningf("[imageGCManager] Failed to monitor images: %v", err) } else { im.initialized = true } }, 5*time.Minute, wait.NeverStop) //更新image的快取 go wait.Until(func() { //呼叫容器介面,獲取最新的image images, err := im.runtime.ListImages() if err != nil { klog.Warningf("[imageGCManager] Failed to update image list: %v", err) } else { im.imageCache.set(images) } }, 30*time.Second, wait.NeverStop) } ``` realImageGCManager的start方法會啟動兩個協程,然後分別定時呼叫detectImages方法與imageCache的set方法。detectImages方法裡面主要就是呼叫ImageService和RuntimeService的方法找出所有正在使用的image,然後刪除不再使用的image。 這裡ListImages和detectImages裡面用到的GetPods方法都是呼叫了CRI的方法, #### fastStatusUpdateOnce ```go func (kl *Kubelet) fastStatusUpdateOnce() { for { time.Sleep(100 * time.Millisecond) node, err := kl.GetNode() if err != nil { klog.Errorf(err.Error()) continue } if len(node.Spec.PodCIDRs) != 0 { podCIDRs := strings.Join(node.Spec.PodCIDRs, ",") if _, err := kl.updatePodCIDR(podCIDRs); err != nil { klog.Errorf("Pod CIDR update to %v failed %v", podCIDRs, err) continue } //更新 Runtime 狀態 kl.updateRuntimeUp() //更新 節點 狀態 kl.syncNodeStatus() return } } } ``` FastStatusUpdateOnce 函式啟動一個迴圈,嘗試立即更新POD CIDR。更新pod CIDR後,它會觸發執行時更新和節點狀態更新。函式在一次成功的節點狀態更新後直接返回。該功能僅在 kubelet 啟動期間執行,通過儘快更新 pod cidr、執行時狀態和節點狀態來提高準備就緒節點的延遲。 **updateRuntimeUp** ```go //首次執行的時候會初始化runtime依賴模組,然後更新runtimeState func (kl *Kubelet) updateRuntimeUp() { kl.updateRuntimeMux.Lock() defer kl.updateRuntimeMux.Unlock() //獲取 containerRuntime Status s, err := kl.containerRuntime.Status() if err != nil { klog.Errorf("Container runtime sanity check failed: %v", err) return } if s == nil { klog.Errorf("Container runtime status is nil") return } klog.V(4).Infof("Container runtime status: %v", s) //檢查 network 和 runtime 是否處於 ready 狀態 networkReady := s.GetRuntimeCondition(kubecontainer.NetworkReady) if networkReady == nil || !networkReady.Status { klog.Errorf("Container runtime network not ready: %v", networkReady) kl.runtimeState.setNetworkState(fmt.Errorf("runtime network not ready: %v", networkReady)) } else { // Set nil if the container runtime network is ready. kl.runtimeState.setNetworkState(nil) } // information in RuntimeReady condition will be propagated to NodeReady condition. //獲取執行時狀態 runtimeReady := s.GetRuntimeCondition(kubecontainer.RuntimeReady) // If RuntimeReady is not set or is false, report an error. if runtimeReady == nil || !runtimeReady.Status { err := fmt.Errorf("Container runtime not ready: %v", runtimeReady) klog.Error(err) kl.runtimeState.setRuntimeState(err) return } kl.runtimeState.setRuntimeState(nil) //呼叫 kl.initializeRuntimeDependentModules 啟動依賴模組 kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) kl.runtimeState.setRuntimeSync(kl.clock.Now()) } ``` updateRuntimeUp會獲取container執行狀態資訊,然後根據返回RuntimeStatus檢查網路、runtime是不是已經處於ready狀態;接著呼叫kl.initializeRuntimeDependentModules初始化依賴模組,這裡會啟動cadvisor、containerManager、evictionManager、containerLogManager、pluginManager;最後設定Runtime同步時間。 最後看看syncLoop方法 #### syncLoop ```go func (kl *Kubelet) syncLoop(updates <-chan kubetypes.PodUpdate, handler SyncHandler) { ... syncTicker := time.NewTicker(time.Second) defer syncTicker.Stop() housekeepingTicker := time.NewTicker(housekeepingPeriod) defer housekeepingTicker.Stop() plegCh := kl.pleg.Watch() for { ... kl.syncLoopMonitor.Store(kl.clock.Now()) if !kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) { break } kl.syncLoopMonitor.Store(kl.clock.Now()) } } ``` syncLoop方法在一個迴圈中不斷的呼叫syncLoopIteration方法執行主要邏輯。 #### **syncLoopIteration** syncLoopIteration方法比較長,拆開來看。 #### syncCh ```go func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, //方法會監聽多個 channel,當發現任何一個 channel 有資料就交給 handler 去處理,在 handler 中通過呼叫 dispatchWork 分發任務 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { select { //該模組將同時 watch 3 個不同來源的 pod 資訊的變化(file,http,apiserver), //一旦某個來源的 pod 資訊發生了更新(建立/更新/刪除),這個 channel 中就會出現被更新的 pod 資訊和更新的具體操作; case u, open := <-configCh: if !open { klog.Errorf("Update channel is closed. Exiting the sync loop.") return false } switch u.Op { case kubetypes.ADD: klog.V(2).Infof("SyncLoop (ADD, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodAdditions(u.Pods) case kubetypes.UPDATE: klog.V(2).Infof("SyncLoop (UPDATE, %q): %q", u.Source, format.PodsWithDeletionTimestamps(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.REMOVE: klog.V(2).Infof("SyncLoop (REMOVE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodRemoves(u.Pods) case kubetypes.RECONCILE: klog.V(4).Infof("SyncLoop (RECONCILE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodReconcile(u.Pods) case kubetypes.DELETE: klog.V(2).Infof("SyncLoop (DELETE, %q): %q", u.Source, format.Pods(u.Pods)) handler.HandlePodUpdates(u.Pods) case kubetypes.SET: klog.Errorf("Kubelet does not support snapshot update") default: klog.Errorf("Invalid event type received: %d.", u.Op) } kl.sourcesReady.AddSource(u.Source) ... } ``` configCh讀取配置事件的管道,該模組將同時 watch 3 個不同來源的 pod 資訊的變化(file,http,apiserver),一旦某個來源的 pod 資訊發生了更新(建立/更新/刪除),這個 channel 中就會出現被更新的 pod 資訊和更新的具體操作。這裡對於pod的操作我們下一篇再講。 #### plegCh ```go func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, //方法會監聽多個 channel,當發現任何一個 channel 有資料就交給 handler 去處理,在 handler 中通過呼叫 dispatchWork 分發任務 syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { ... case e := <-plegCh: if e.Type == pleg.ContainerStarted { kl.lastContainerStartedTime.Add(e.ID, time.Now()) } if isSyncPodWorthy(e) { if pod, ok := kl.podManager.GetPodByUID(e.ID); ok { klog.V(2).Infof("SyncLoop (PLEG): %q, event: %#v", format.Pod(pod), e) handler.HandlePodSyncs([]*v1.Pod{pod}) } else { klog.V(4).Infof("SyncLoop (PLEG): ignore irrelevant event: %#v", e) } } if e.Type == pleg.ContainerDied { if containerID, ok := e.Data.(string); ok { kl.cleanUpContainersInPod(e.ID, containerID) } } ... } ``` PLEG.Start的時候會每秒鐘啟動呼叫一次relist,根據最新的PodStatus生成PodLiftCycleEvent,然後存入到PLE Channel中。 syncLoop會呼叫pleg.Watch方法獲取PLE Channel管道,然後傳給syncLoopIteration方法,在syncLoopIteration方法中也就是plegCh這個管道,syncLoopIteration會消費plegCh中的資料,在 handler 中通過呼叫 dispatchWork 分發任務。 #### syncCh ```go func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { ... // 每秒鐘會執行到一次 case <-syncCh: // Sync pods waiting for sync podsToSync := kl.getPodsToSync() if len(podsToSync) == 0 { break } klog.V(4).Infof("SyncLoop (SYNC): %d pods; %s", len(podsToSync), format.Pods(podsToSync)) //同步最新儲存的 pod 狀態 handler.HandlePodSyncs(podsToSync) ... } ``` syncCh是由syncLoop方法裡面建立的一個定時任務,每秒鐘會向syncCh新增一個數據,然後就會執行到這裡。這個方法會同步所有等待同步的pod。 #### livenessManager.Updates ```go func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { ... case update := <-kl.livenessManager.Updates(): //如果探針檢測失敗,需要更新pod的狀態 if update.Result == proberesults.Failure { pod, ok := kl.podManager.GetPodByUID(update.PodUID) if !ok { klog.V(4).Infof("SyncLoop (container unhealthy): ignore irrelevant update: %#v", update) break } klog.V(1).Infof("SyncLoop (container unhealthy): %q", format.Pod(pod)) handler.HandlePodSyncs([]*v1.Pod{pod}) } ... } ``` 對失敗的pod或者liveness檢查失敗的pod進行sync操作。 #### housekeepingCh ```go func (kl *Kubelet) syncLoopIteration(configCh <-chan kubetypes.PodUpdate, handler SyncHandler, syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool { ... // 每兩秒鐘執行一次 case <-housekeepingCh: if !kl.sourcesReady.AllReady() { klog.V(4).Infof("SyncLoop (housekeeping, skipped): sources aren't ready yet.") } else { klog.V(4).Infof("SyncLoop (housekeeping)") //執行一些清理工作,包括終止pod workers、刪除不想要的pod,移除volumes、pod目錄 if err := handler.HandlePodCleanups(); err != nil { klog.Errorf("Failed cleaning pods: %v", err) } } ... } ``` housekeepingCh這個管道也是由syncLoop建立,每兩秒鐘會觸發清理。 ## 總結 kubelet.Run部分主要執行kubelet包含的各種manager的執行,大部分會以一部執行緒的方式定時執行。瞭解了CRI是怎麼一回事,通過CRI介面可以做什麼。 接下來看了syncLoop主函式,這個函式主要對pod的生命週期進行管理,包括對pod進行add 、update、remove、delete等操作,這些具體的程式碼執行過程留到下一篇,pod的初始化時再講,syncLoop還需要更新根據不同的channel觸發不同的操作,如更新runtime快取、同步pod、觸發清理pod、liveness檢查失敗的pod進行sync操作等。 ## Reference https://kubernetes.io/docs/reference/command-line-tools-reference/kubelet/ https://kubernetes.io/docs/tasks/configure-pod-container/configure-liveness-readiness-startup-probes/ https://developer.ibm.com/technologies/containers/blogs/kube-cri-overview/ https://kubernetes.io/docs/concepts/architecture/cloud-controller/ https://kubernetes.feisky.xyz/extension/cloud-provider https://kubernetes.io/blog/2016/12/container-runtime-interface-cri-in-kubernetes/ https://developers.redhat.com/blog/2019/11/13/pod-lifecycle-event-generator-understanding-the-pleg-is-not-healthy-issue-in-kubernetes/ https://zhuanlan.zhihu.com/p/110980720 https://kubernetes.io/docs/concepts/architecture/nodes/#heartbeats https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/pod-lifecycle-event-gene