1. 程式人生 > >kubernetes元件kubelet之原始碼分析 啟動流程

kubernetes元件kubelet之原始碼分析 啟動流程

1.kubelet簡介

kubelet是在每個節點上執行的主要“節點代理”。
kubelet的工作原理是PodSpec。
kubelet採用一組通過各種機制提供的PodSpecs(主要通過apiserver),並確保這些PodSpec中描述的容器執行正常。
kubelet不管理不是由Kubernetes建立的容器。
除了來自Apiserver的PodSpec之外,還有三種可以向Kubelet提供容器清單的方法。
檔案:路徑作為標誌在命令列中傳遞。此路徑下的檔案將被定期監控以進行更新。監控期間預設為20秒,可通過標誌進行配置。
HTTP端點:HTTP端點作為引數在命令列中傳遞。每20秒檢查一次這個端點(也可以用一個標誌來配置)。
HTTP伺服器:kubelet還可以監聽HTTP並響應一個簡單的API提交新的清單。

2.分析的程式碼版本 v1.5.0

3.KubeletServer 配置物件

kubelet 的路口檔案跟其他元件一樣,都是在cmd/目錄下,其真正實現的原始碼在/pkg

➜  kubernetes git:(master) ✗ git checkout v1.5.0
Note: checking out 'v1.5.0'.
➜  kubernetes git:(58b7c16a52) ✗ 
➜  cmd git:(58b7c16a52) ✗ tree kubelet 
kubelet
├── BUILD
├── OWNERS
├── app
│   ├── BUILD
│   ├── auth.go
│ ├── bootstrap.go │ ├── bootstrap_test.go │ ├── options │ │ ├── BUILD │ │ └── options.go │ ├── plugins.go │ ├── server.go │ ├── server_linux.go │ ├── server_test.go │ └── server_unsupported.go └── kubelet.go 2 directories, 14 files

kubelet main 函式入口

package main

import (
    "fmt"
"os" "k8s.io/kubernetes/cmd/kubelet/app" "k8s.io/kubernetes/cmd/kubelet/app/options" _ "k8s.io/kubernetes/pkg/client/metrics/prometheus" // for client metric registration "k8s.io/kubernetes/pkg/util/flag" "k8s.io/kubernetes/pkg/util/logs" _ "k8s.io/kubernetes/pkg/version/prometheus" // for version metric registration "k8s.io/kubernetes/pkg/version/verflag" "github.com/spf13/pflag" ) func main() { s := options.NewKubeletServer() s.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() if err := app.Run(s, nil); err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) } }

這段程式碼的作用:

建立一個 KubeletServer 物件,這個物件儲存著 kubelet 執行需要的所有配置資訊
解析命令列,根據命令列的引數更新 KubeletServer
根據 KubeletServer 的配置執行真正的 kubelet 程式

options.NewKubeletServer() 主要是初始化啟動kubelet所需要的配置引數

// KubeletServer encapsulates all of the parameters necessary for starting up
// a kubelet. These can either be set via command line or directly.
type KubeletServer struct {
    componentconfig.KubeletConfiguration

    KubeConfig          flag.StringFlag
    BootstrapKubeconfig string

    // If true, an invalid KubeConfig will result in the Kubelet exiting with an error.
    RequireKubeConfig bool
    AuthPath          flag.StringFlag // Deprecated -- use KubeConfig instead
    APIServerList     []string        // Deprecated -- use KubeConfig instead

    // Insert a probability of random errors during calls to the master.
    ChaosChance float64
    // Crash immediately, rather than eating panics.
    ReallyCrashForTesting bool

    // TODO(mtaufen): It is increasingly looking like nobody actually uses the
    //                Kubelet's runonce mode anymore, so it may be a candidate
    //                for deprecation and removal.
    // If runOnce is true, the Kubelet will check the API server once for pods,
    // run those in addition to the pods specified by the local manifest, and exit.
    RunOnce bool
}

// NewKubeletServer will create a new KubeletServer with default values.
func NewKubeletServer() *KubeletServer {
    versioned := &v1alpha1.KubeletConfiguration{}
    api.Scheme.Default(versioned)
    config := componentconfig.KubeletConfiguration{}
    api.Scheme.Convert(versioned, &config, nil)
    return &KubeletServer{
        KubeConfig:           flag.NewStringFlag("/var/lib/kubelet/kubeconfig"),
        RequireKubeConfig:    false, // in 1.5, default to true
        KubeletConfiguration: config,
    }
}

這方面涉及的太多,例如 DockerEndpoint 等等 讀者有興趣 可以去看看原始碼
在這裡就不再解釋

app.Run(s, nil) 是真正建立並啟動kubelet例項

func Run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) error {
    if err := run(s, kubeDeps); err != nil {
        return fmt.Errorf("failed to run Kubelet: %v", err)

    }
    return nil
}

func run(s *options.KubeletServer, kubeDeps *kubelet.KubeletDeps) (err error) {
    // TODO: this should be replaced by a --standalone flag
    standaloneMode := (len(s.APIServerList) == 0 && !s.RequireKubeConfig)

    if s.ExitOnLockContention && s.LockFilePath == "" {
        return errors.New("cannot exit on lock file contention: no lock file specified")
    }

    done := make(chan struct{})
    if s.LockFilePath != "" {
        glog.Infof("acquiring file lock on %q", s.LockFilePath)
        if err := flock.Acquire(s.LockFilePath); err != nil {
            return fmt.Errorf("unable to acquire file lock on %q: %v", s.LockFilePath, err)
        }
        if s.ExitOnLockContention {
            glog.Infof("watching for inotify events for: %v", s.LockFilePath)
            if err := watchForLockfileContention(s.LockFilePath, done); err != nil {
                return err
            }
        }
    }

    // Set feature gates based on the value in KubeletConfiguration
    err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
    if err != nil {
        return err
    }

    // Register current configuration with /configz endpoint
    cfgz, cfgzErr := initConfigz(&s.KubeletConfiguration)
    if utilconfig.DefaultFeatureGate.DynamicKubeletConfig() {
        // Look for config on the API server. If it exists, replace s.KubeletConfiguration
        // with it and continue. initKubeletConfigSync also starts the background thread that checks for new config.

        // Don't do dynamic Kubelet configuration in runonce mode
        if s.RunOnce == false {
            remoteKC, err := initKubeletConfigSync(s)
            if err == nil {
                // Update s (KubeletServer) with new config from API server
                s.KubeletConfiguration = *remoteKC
                // Ensure that /configz is up to date with the new config
                if cfgzErr != nil {
                    glog.Errorf("was unable to register configz before due to %s, will not be able to set now", cfgzErr)
                } else {
                    setConfigz(cfgz, &s.KubeletConfiguration)
                }
                // Update feature gates from the new config
                err = utilconfig.DefaultFeatureGate.Set(s.KubeletConfiguration.FeatureGates)
                if err != nil {
                    return err
                }
            }
        }
    }

    if kubeDeps == nil {
        var kubeClient, eventClient *clientset.Clientset
        var cloud cloudprovider.Interface

        if s.CloudProvider != componentconfigv1alpha1.AutoDetectCloudProvider {
            cloud, err = cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
            if err != nil {
                return err
            }
            if cloud == nil {
                glog.V(2).Infof("No cloud provider specified: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            } else {
                glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
            }
        }

        if s.BootstrapKubeconfig != "" {
            nodeName, err := getNodeName(cloud, nodeutil.GetHostname(s.HostnameOverride))
            if err != nil {
                return err
            }
            if err := bootstrapClientCert(s.KubeConfig.Value(), s.BootstrapKubeconfig, s.CertDirectory, nodeName); err != nil {
                return err
            }
        }

        clientConfig, err := CreateAPIServerClientConfig(s)
        if err == nil {
            kubeClient, err = clientset.NewForConfig(clientConfig)
            if err != nil {
                glog.Warningf("New kubeClient from clientConfig error: %v", err)
            }
            // make a separate client for events
            eventClientConfig := *clientConfig
            eventClientConfig.QPS = float32(s.EventRecordQPS)
            eventClientConfig.Burst = int(s.EventBurst)
            eventClient, err = clientset.NewForConfig(&eventClientConfig)
        }
        if err != nil {
            if s.RequireKubeConfig {
                return fmt.Errorf("invalid kubeconfig: %v", err)
            }
            if standaloneMode {
                glog.Warningf("No API client: %v", err)
            }
        }

        kubeDeps, err = UnsecuredKubeletDeps(s)
        if err != nil {
            return err
        }

        kubeDeps.Cloud = cloud
        kubeDeps.KubeClient = kubeClient
        kubeDeps.EventClient = eventClient
    }

    if kubeDeps.Auth == nil {
        nodeName, err := getNodeName(kubeDeps.Cloud, nodeutil.GetHostname(s.HostnameOverride))
        if err != nil {
            return err
        }
        auth, err := buildAuth(nodeName, kubeDeps.KubeClient, s.KubeletConfiguration)
        if err != nil {
            return err
        }
        kubeDeps.Auth = auth
    }

    if kubeDeps.CAdvisorInterface == nil {
        kubeDeps.CAdvisorInterface, err = cadvisor.New(uint(s.CAdvisorPort), s.ContainerRuntime, s.RootDirectory)
        if err != nil {
            return err
        }
    }

    if kubeDeps.ContainerManager == nil {
        if s.SystemCgroups != "" && s.CgroupRoot == "" {
            return fmt.Errorf("invalid configuration: system container was specified and cgroup root was not specified")
        }
        kubeDeps.ContainerManager, err = cm.NewContainerManager(
            kubeDeps.Mounter,
            kubeDeps.CAdvisorInterface,
            cm.NodeConfig{
                RuntimeCgroupsName:    s.RuntimeCgroups,
                SystemCgroupsName:     s.SystemCgroups,
                KubeletCgroupsName:    s.KubeletCgroups,
                ContainerRuntime:      s.ContainerRuntime,
                CgroupsPerQOS:         s.ExperimentalCgroupsPerQOS,
                CgroupRoot:            s.CgroupRoot,
                CgroupDriver:          s.CgroupDriver,
                ProtectKernelDefaults: s.ProtectKernelDefaults,
                EnableCRI:             s.EnableCRI,
            },
            s.ExperimentalFailSwapOn)

        if err != nil {
            return err
        }
    }

    if err := checkPermissions(); err != nil {
        glog.Error(err)
    }

    utilruntime.ReallyCrash = s.ReallyCrashForTesting

    rand.Seed(time.Now().UTC().UnixNano())

    // TODO(vmarmol): Do this through container config.
    oomAdjuster := kubeDeps.OOMAdjuster
    if err := oomAdjuster.ApplyOOMScoreAdj(0, int(s.OOMScoreAdj)); err != nil {
        glog.Warning(err)
    }

    if err := RunKubelet(&s.KubeletConfiguration, kubeDeps, s.RunOnce, standaloneMode); err != nil {
        return err
    }

    if s.HealthzPort > 0 {
        healthz.DefaultHealthz()
        go wait.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress, strconv.Itoa(int(s.HealthzPort))), nil)
            if err != nil {
                glog.Errorf("Starting health server failed: %v", err)
            }
        }, 5*time.Second, wait.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    <-done
    return nil
}

KubeDeps 包含的元件很多,下面列出一些:

CAdvisorInterface:提供 cAdvisor 介面功能的元件,用來獲取監控資訊
DockerClient:docker 客戶端,用來和 docker 互動
KubeClient:apiserver 客戶端,用來和 api server 通訊
Mounter:執行 mount 相關操作
NetworkPlugins:網路外掛,執行網路設定工作
VolumePlugins:volume 外掛,執行 volume 設定工作 

run 方法允許傳進來的 kubeDeps 為空,這個時候它會自動生成預設的 kubeDeps 物件,這也就是我們上面程式碼的邏輯。執行 HTTP Server 的程式碼我們暫時略過,留作以後再講,繼續來看 RunKubelet,它的程式碼是這樣的

func RunKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, runOnce bool, standaloneMode bool) error {
    hostname := nodeutil.GetHostname(kubeCfg.HostnameOverride)
    // Query the cloud provider for our node name, default to hostname if kcfg.Cloud == nil
    nodeName, err := getNodeName(kubeDeps.Cloud, hostname)
    if err != nil {
        return err
    }

    eventBroadcaster := record.NewBroadcaster()
    kubeDeps.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: string(nodeName)})
    eventBroadcaster.StartLogging(glog.V(3).Infof)
    if kubeDeps.EventClient != nil {
        glog.V(4).Infof("Sending events to api server.")
        eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{Interface: kubeDeps.EventClient.Events("")})
    } else {
        glog.Warning("No api server defined - no events will be sent to API server.")
    }

    // TODO(mtaufen): I moved the validation of these fields here, from UnsecuredKubeletConfig,
    //                so that I could remove the associated fields from KubeletConfig. I would
    //                prefer this to be done as part of an independent validation step on the
    //                KubeletConfiguration. But as far as I can tell, we don't have an explicit
    //                place for validation of the KubeletConfiguration yet.
    hostNetworkSources, err := kubetypes.GetValidatedSources(kubeCfg.HostNetworkSources)
    if err != nil {
        return err
    }

    hostPIDSources, err := kubetypes.GetValidatedSources(kubeCfg.HostPIDSources)
    if err != nil {
        return err
    }

    hostIPCSources, err := kubetypes.GetValidatedSources(kubeCfg.HostIPCSources)
    if err != nil {
        return err
    }

    privilegedSources := capabilities.PrivilegedSources{
        HostNetworkSources: hostNetworkSources,
        HostPIDSources:     hostPIDSources,
        HostIPCSources:     hostIPCSources,
    }
    capabilities.Setup(kubeCfg.AllowPrivileged, privilegedSources, 0)

    credentialprovider.SetPreferredDockercfgPath(kubeCfg.RootDirectory)
    glog.V(2).Infof("Using root directory: %v", kubeCfg.RootDirectory)

    builder := kubeDeps.Builder
    if builder == nil {
        builder = CreateAndInitKubelet
    }
    if kubeDeps.OSInterface == nil {
        kubeDeps.OSInterface = kubecontainer.RealOS{}
    }
    k, err := builder(kubeCfg, kubeDeps, standaloneMode)
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    // NewMainKubelet should have set up a pod source config if one didn't exist
    // when the builder was run. This is just a precaution.
    if kubeDeps.PodConfig == nil {
        return fmt.Errorf("failed to create kubelet, pod source config was nil!")
    }
    podCfg := kubeDeps.PodConfig

    rlimit.RlimitNumFiles(uint64(kubeCfg.MaxOpenFiles))

    // TODO(dawnchen): remove this once we deprecated old debian containervm images.
    // This is a workaround for issue: https://github.com/opencontainers/runc/issues/726
    // The current chosen number is consistent with most of other os dist.
    const maxkeysPath = "/proc/sys/kernel/keys/root_maxkeys"
    const minKeys uint64 = 1000000
    key, err := ioutil.ReadFile(maxkeysPath)
    if err != nil {
        glog.Errorf("Cannot read keys quota in %s", maxkeysPath)
    } else {
        fields := strings.Fields(string(key))
        nkey, _ := strconv.ParseUint(fields[0], 10, 64)
        if nkey < minKeys {
            glog.Infof("Setting keys quota in %s to %d", maxkeysPath, minKeys)
            err = ioutil.WriteFile(maxkeysPath, []byte(fmt.Sprintf("%d", uint64(minKeys))), 0644)
            if err != nil {
                glog.Warningf("Failed to update %s: %v", maxkeysPath, err)
            }
        }
    }
    const maxbytesPath = "/proc/sys/kernel/keys/root_maxbytes"
    const minBytes uint64 = 25000000
    bytes, err := ioutil.ReadFile(maxbytesPath)
    if err != nil {
        glog.Errorf("Cannot read keys bytes in %s", maxbytesPath)
    } else {
        fields := strings.Fields(string(bytes))
        nbyte, _ := strconv.ParseUint(fields[0], 10, 64)
        if nbyte < minBytes {
            glog.Infof("Setting keys bytes in %s to %d", maxbytesPath, minBytes)
            err = ioutil.WriteFile(maxbytesPath, []byte(fmt.Sprintf("%d", uint64(minBytes))), 0644)
            if err != nil {
                glog.Warningf("Failed to update %s: %v", maxbytesPath, err)
            }
        }
    }

    // process pods and exit.
    if runOnce {
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {
            return fmt.Errorf("runonce failed: %v", err)
        }
        glog.Infof("Started kubelet %s as runonce", version.Get().String())
    } else {
        err := startKubelet(k, podCfg, kubeCfg, kubeDeps)
        if err != nil {
            return err
        }
        glog.Infof("Started kubelet %s", version.Get().String())
    }
    return nil
}

RunKubelet 的內容可以分成三個部分:

1.初始化各個物件,比如 eventBroadcaster,這樣就能給 apiserver 傳送 kubelet 的事件
2.通過 builder 創建出來 Kubelet
3.根據執行模式,執行 Kubelet

建立工作是在 k, err := builder(kubeCfg, kubeDeps, standaloneMode) 這句完成的,預設的 builder 是 CreateAndInitKubelet:

func CreateAndInitKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps, standaloneMode bool) (k kubelet.KubeletBootstrap, err error) {
    // TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
    // up into "per source" synchronizations

    k, err = kubelet.NewMainKubelet(kubeCfg, kubeDeps, standaloneMode)
    if err != nil {
        return nil, err
    }

    k.BirthCry()

    k.StartGarbageCollection()

    return k, nil
}

kubelet 的建立

func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *KubeletDeps, standaloneMode bool) (*Kubelet, error) {
    ......

    // PodConfig 非常重要,它是 pod 資訊的來源,kubelet 支援檔案、URL 和 apiserver 三種渠道,PodConfig 將它們匯聚到一起,通過管道來傳遞
    if kubeDeps.PodConfig == nil {
        kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName)
    }
    ......

    // exec 處理函式,進入到容器中執行命令的方式。之前使用的是 nsenter 命令列的方式,後來 docker 提供了 `docker exec` 命令,預設是後者
    var dockerExecHandler dockertools.ExecHandler
    switch kubeCfg.DockerExecHandlerName {
    case "native":
        dockerExecHandler = &dockertools.NativeExecHandler{}
    case "nsenter":
        dockerExecHandler = &dockertools.NsenterExecHandler{}
    default:
        glog.Warningf("Unknown Docker exec handler %q; defaulting to native", kubeCfg.DockerExecHandlerName)
        dockerExecHandler = &dockertools.NativeExecHandler{}
    }

    // 使用 reflector 把 ListWatch 得到的服務資訊實時同步到 serviceStore 物件中
    serviceStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
    if kubeClient != nil {
        serviceLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "services", api.NamespaceAll, fields.Everything())
        cache.NewReflector(serviceLW, &api.Service{}, serviceStore, 0).Run()
    }
    serviceLister := &cache.StoreToServiceLister{Indexer: serviceStore}

    // 使用 reflector 把 ListWatch 得到的節點資訊實時同步到  nodeStore 物件中
    nodeStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
    if kubeClient != nil {
        fieldSelector := fields.Set{api.ObjectNameField: string(nodeName)}.AsSelector()
        nodeLW := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "nodes", api.NamespaceAll, fieldSelector)
        cache.NewReflector(nodeLW, &api.Node{}, nodeStore, 0).Run()
    }
    nodeLister := &cache.StoreToNodeLister{Store: nodeStore}
    nodeInfo := &predicates.CachedNodeInfo{StoreToNodeLister: nodeLister}

    ......

    // 根據配置資訊和各種物件建立 Kubelet 例項
    klet := &Kubelet{
        hostname:                       hostname,
        nodeName:                       nodeName,
        dockerClient:                   kubeDeps.DockerClient,
        kubeClient:                     kubeClient,
        ......
        clusterDomain:                  kubeCfg.ClusterDomain,
        clusterDNS:                     net.ParseIP(kubeCfg.ClusterDNS),
        serviceLister:                  serviceLister,
        nodeLister:                     nodeLister,
        nodeInfo:                       nodeInfo,
        masterServiceNamespace:         kubeCfg.MasterServiceNamespace,
        streamingConnectionIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
        recorder:                       kubeDeps.Recorder,
        cadvisor:                       kubeDeps.CAdvisorInterface,
        diskSpaceManager:               diskSpaceManager,
        ......
    }

    ......

    // 網路外掛的初始化工作
    if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
        return nil, err
    } else {
        klet.networkPlugin = plug
    }

    // 從 cAdvisor 獲取當前機器的資訊
    machineInfo, err := klet.GetCachedMachineInfo()
    ......

    procFs := procfs.NewProcFS()
    imageBackOff := flowcontrol.NewBackOff(backOffPeriod, MaxContainerBackOff)

    klet.livenessManager = proberesults.NewManager()

    // podManager 負責管理當前節點上的 pod 資訊,它儲存了所有 pod 的內容,包括 static pod。
    // kubelet 從本地檔案、網路地址和 apiserver 三個地方獲取 pod 的內容,
    klet.podCache = kubecontainer.NewCache()
    klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
    ......

    // 建立 runtime 物件,以後會改用 CRI 介面和 runtime 互動,目前使用 DockerManager
    if kubeCfg.EnableCRI {
        ......
    } else {
        switch kubeCfg.ContainerRuntime {
        case "docker":
            runtime := dockertools.NewDockerManager(
                kubeDeps.DockerClient,
                kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
                klet.livenessManager,
                containerRefManager,
                klet.podManager,
                machineInfo,
                kubeCfg.PodInfraContainerImage,
                float32(kubeCfg.RegistryPullQPS),
                int(kubeCfg.RegistryBurst),
                ContainerLogsDir,
                kubeDeps.OSInterface,
                klet.networkPlugin,
                klet,
                klet.httpClient,
                dockerExecHandler,
                kubeDeps.OOMAdjuster,
                procFs,
                klet.cpuCFSQuota,
                imageBackOff,
                kubeCfg.SerializeImagePulls,
                kubeCfg.EnableCustomMetrics,
                klet.hairpinMode == componentconfig.HairpinVeth && kubeCfg.NetworkPluginName != "kubenet",
                kubeCfg.SeccompProfileRoot,
                kubeDeps.ContainerRuntimeOptions...,
            )
            klet.containerRuntime = runtime
            klet.runner = kubecontainer.DirectStreamingRunner(runtime)
        case "rkt":
            ......
        default:
            return nil, fmt.Errorf("unsupported container runtime %q specified", kubeCfg.ContainerRuntime)
        }
    }

    ......

    klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache, clock.RealClock{})
    klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime)
    klet.updatePodCIDR(kubeCfg.PodCIDR)

    // 建立 containerGC 物件,進行週期性的容器清理工作
    containerGC, err := kubecontainer.NewContainerGC(klet.containerRuntime, containerGCPolicy)
    if err != nil {
        return nil, err
    }
    klet.containerGC = containerGC
    klet.containerDeletor = newPodContainerDeletor(klet.containerRuntime, integer.IntMax(containerGCPolicy.MaxPerPodContainer, minDeadContainerInPod))

    // 建立 imageManager 物件,管理映象
    imageManager, err := images.NewImageGCManager(klet.containerRuntime, kubeDeps.CAdvisorInterface, kubeDeps.Recorder, nodeRef, imageGCPolicy)
    if err != nil {
        return nil, fmt.Errorf("failed to initialize image manager: %v", err)
    }
    klet.imageManager = imageManager

    // statusManager 實時檢測節點上 pod 的狀態,並更新到 apiserver 對應的 pod 
    klet.statusManager = status.NewManager(kubeClient, klet.podManager)

    // probeManager 檢測 pod 的狀態,並通過 statusManager 進行更新
    klet.probeManager = prober.NewManager(
        klet.statusManager,
        klet.livenessManager,
        klet.runner,
        containerRefManager,
        kubeDeps.Recorder)

    // volumeManager 管理節點上 volume

    klet.volumePluginMgr, err =
        NewInitializedVolumePluginMgr(klet, kubeDeps.VolumePlugins)
    if err != nil {
        return nil, err
    }
    ......
    // setup volumeManager
    klet.volumeManager, err = volumemanager.NewVolumeManager(
        kubeCfg.EnableControllerAttachDetach,
        nodeName,
        klet.podManager,
        klet.kubeClient,
        klet.volumePluginMgr,
        klet.containerRuntime,
        kubeDeps.Mounter,
        klet.getPodsDir(),
        kubeDeps.Recorder,
        kubeCfg.ExperimentalCheckNodeCapabilitiesBeforeMount)

    // 儲存了節點上正在執行的 pod 資訊
    runtimeCache, err := kubecontainer.NewRuntimeCache(klet.containerRuntime)
    if err != nil {
        return nil, err
    }
    klet.runtimeCache = runtimeCache
    klet.reasonCache = NewReasonCache()
    klet.workQueue = queue.NewBasicWorkQueue(klet.clock)

    // podWorkers 是具體的執行者
    klet.podWorkers = newPodWorkers(klet.syncPod, kubeDeps.Recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)

    ......
    klet.kubeletConfiguration = *kubeCfg
    return klet, nil
}

NewMainKubelet 正如名字所示,主要的工作就是建立 Kubelet 這個物件,它包含了 kubelet 執行需要的所有物件,上面的程式碼就是各種物件的初始化和賦值的過程,這裡只介紹幾個非常重要的物件來說:

podConfig:這個物件裡面會從檔案、網路和 apiserver 三個來源中匯聚節點要執行的 pod 資訊,並通過管道傳送出來,讀取這個管道就能獲取實時的 pod 最新配置
ServiceLister:能夠讀取 kubernetes 中服務資訊
nodeLister:能夠讀取 apiserver 中節點的資訊
diskSpaceManager:返回容器儲存空間的資訊
podManager:快取了 pod 的資訊,是所有需要該資訊都會去訪問的地方
runtime:容器執行時,對容器引擎(docker 或者 rkt)的一層封裝,負責呼叫容器引擎介面管理容器的狀態,比如啟動、暫停、殺死容器等
probeManager:如果 pod 配置了狀態監測,那麼 probeManager 會定時檢查 pod 是否正常工作,並通過 statusManager 向 apiserver 更新 pod 的狀態
volumeManager:負責容器需要的 volume 管理。檢測某個 volume 是否已經 mount、獲取 pod 使用的 volume 等
podWorkers:具體的執行者,每次有 pod 需要更新的時候都會發送給它

這裡並不一一展開所有物件的實現和具體功能,以後的文章會對其中一些繼續分析。

kubelet 的執行

func startKubelet(k kubelet.KubeletBootstrap, podCfg *config.PodConfig, kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *kubelet.KubeletDeps) error {
    // start the kubelet
    go wait.Until(func() { k.Run(podCfg.Updates()) }, 0, wait.NeverStop)

    // start the kubelet server
    if kubeCfg.EnableServer {
        go wait.Until(func() {
            k.ListenAndServe(net.ParseIP(kubeCfg.Address), uint(kubeCfg.Port), kubeDeps.TLSOptions, kubeDeps.Auth, kubeCfg.EnableDebuggingHandlers)
        }, 0, wait.NeverStop)
    }
    if kubeCfg.ReadOnlyPort > 0 {
        go wait.Until(func() {
            k.ListenAndServeReadOnly(net.ParseIP(kubeCfg.Address), uint(kubeCfg.ReadOnlyPort))
        }, 0, wait.NeverStop)
    }

    return nil
}

執行 kubelet 主要啟動兩個功能,k.Run() 來進入主迴圈,k.ListenAndServe() 啟動 kubelet 的 API 服務,後者並不是這篇文章的重點,我們來看看前者,它的執行入口是 k.Run(podCfg.Updates()),podCfg.Updates() 我們前面已經說過,它是一個管道,會實時地傳送過來 pod 最新的配置資訊,至於是怎麼實現的,我們以後再說,這裡知道它的作用就行。Run 方法的程式碼如下:

// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        glog.Warning("No api server defined - no node status update will be sent.")
    }
    if err := kl.initializeModules(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, api.EventTypeWarning, events.KubeletSetupFailed, err.Error())
        glog.Error(err)
        kl.runtimeState.setInitError(err)
    }

    // Start volume manager
    go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)

    if kl.kubeClient != nil {
        // Start syncing node status immediately, this may set up things the runtime needs to run.
        go wait.Until(kl.syncNodeStatus, kl.nodeStatusUpdateFrequency, wait.NeverStop)
    }
    go wait.Until(kl.syncNetworkStatus, 30*time.Second, wait.NeverStop)
    go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)

    // Start loop to sync iptables util rules
    if kl.makeIPTablesUtilChains {
        go wait.Until(kl.syncNetworkUtil, 1*time.Minute, wait.NeverStop)
    }

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go wait.Until(kl.podKiller, 1*time.Second, wait.NeverStop)

    // Start component sync loops.
    kl.statusManager.Start()
    kl.probeManager.Start()

    // Start the pod lifecycle event generator.
    kl.pleg.Start()
    kl.syncLoop(updates, kl)
}

基本上就是 kubelet 各種元件的啟動,每個元件都是以 goroutine 執行的,這裡不做贅述。最後一句 kl.syncLoop(updates, kl) 是處理所有 pod 更新的主迴圈,獲取 pod 的變化(新建、修改和刪除),呼叫對應的處理函式保證節點上的容器符合 pod 的配置。