1. 程式人生 > >基於1.1.2版本kubelet原始碼閱讀(1)

基於1.1.2版本kubelet原始碼閱讀(1)

kubelet的主體流程部分,主要是為了理解kubelet元件的主體流程是如何執行的,是如何獲取pod資訊,然後如何對pod進行部署和update等操作。
(主要是根據網上一些公開的資訊,再根據自己的理解來進行整合,加深自己對k8s原始碼的學習)

1、從cmd/kubelet/kubelet.go開始main()函式

  func main() {
    runtime.GOMAXPROCS(runtime.NumCPU())
    //kubelet 的main函式入口,新建一個NewKubeletServer
    //NewKubeletServer()初始化kubelet Server的fields為預設值。然後在初始化flags和logs之後就可開始Run kubelet Server
    s := app.NewKubeletServer
() //解析flag s.AddFlags(pflag.CommandLine) //future work:後面要了解引數是如何寫到系統裡面的?是InitFlags如何運作麼? util.InitFlags() util.InitLogs() defer util.FlushLogs() verflag.PrintAndExitIfRequested() if err := s.Run(nil); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit
(1) } }

main函式主要執行NewKubeletServer()、AddFlags(pflag.CommandLine)、 s.Run(nil)這三個步驟,下面具體分析這三個函式

2、NewKubeletServer()解析

定義在cmd/kubelet/server.go
NewKubeletServer()建立一個KubeletServer,主要是完成引數的初始化—with default values
return &KubeletServer{…}
下面 給出部分引數的含義:

        Address:                     net.ParseIP("0.0.0.0"),
ContainerRuntime: "docker", DockerExecHandlerName: "native", HealthzBindAddress: net.ParseIP("127.0.0.1"), HealthzPort: 10248, ImageGCHighThresholdPercent: 90,//表示此盤利用率超過90%的時候,會一直對image資源進行回收 ImageGCLowThresholdPercent: 80,//表示此盤利用率低於80%的時候,不會進行GC KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"), MaxContainerCount: 100, //每個節點最多保留100個死亡例項 MaxPerPodContainerCount: 2, //每個容器最多保留2個死亡例項 RegisterNode: true, // will be ignored if no apiserver is configured ResourceContainer: "/kubelet", RootDirectory: defaultRootDir, //預設值是/var/lib/kubelet,存放配置及VM卷等資料

3、 s.Run(nil)

定義在cmd/kubelet/server.go

/*
    Run 基於指定的KubeletConfig執行一個KubeletServer,never exist!
    KubeletConfig可能是nil的。If nil, 在KubeletServer中利用預設值完成初始化設定
    If not nil,the default values 會被忽略
*/
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
    if kcfg == nil {
        //第一次是通過s.Run(nil)來呼叫
        //準備一個KubeletConfig,配置一些引數
        //KubeletConfig會返回適合執行的KubeletConfig,如果伺服器設定無效,則返回錯誤。 它不會啟動任何後臺程序。
        cfg, err := s.KubeletConfig()**[1]**
        if err != nil {
            return err
        }
        kcfg = cfg
        //呼叫CreateAPIServerClientConfig初始化clientConfig
        //CreateAPIServerClientConfig使用command line flags來生成 client.Config,including api-server-list
        //KubeClient是kubelet 和 Api server傳遞資訊的唯一途徑。
        ////CreateAPIServerClientConfig根據使用者的傳遞--kubeconfig或者預設.kubeconfig檔案來解析client配置。
        clientConfig, err := s.CreateAPIServerClientConfig()
        if err == nil {
            /*
                最終初始化KubeClient。
                定義在client "k8s.io/kubernetes/pkg/client/unversioned/helper.go"
                New(c *Config)為指定的配置建立一個Kubernetes client。
                該client可以對pods,replication controllers, daemons, and services這些物件執行list, get, update and delete操作。
                如果提供的配置無效,則返回錯誤。
            */
            kcfg.KubeClient, err = client.New(clientConfig)
        }
        if err != nil && len(s.APIServerList) > 0 {
            glog.Warningf("No API client: %v", err)
        }
        //cloudprovider 和IaaS雲商有關
        cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
        if err != nil {
            return err
        }
        glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
        kcfg.Cloud = cloud
    }
    /*
        啟動cadvisor來監控本地的容器
        通過CAdvisor我們可以看到kuberlet管理的機器上container的資源統計資訊
        cadvisor原始碼在pkg/kubelet/cadvisor/cadvisor_linux.go中
    */
    if kcfg.CAdvisorInterface == nil {
        ca, err := cadvisor.New(s.CAdvisorPort)
        if err != nil {
            return err
        }
        kcfg.CAdvisorInterface = ca
    }

    util.ReallyCrash = s.ReallyCrashForTesting
    rand.Seed(time.Now().UTC().UnixNano())
    /*
        設定preferred Dockercfg 檔案path,這個是為了讀取.dockercfg檔案,
        該檔案中的secret可用於pull/push docker registry中的docker映象,預設的路徑為:/var/lib/kubelet
    */
    credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)

    glog.V(2).Infof("Using root directory: %v", s.RootDirectory)

    // TODO(vmarmol): Do this through container config.
    oomAdjuster := oom.NewOomAdjuster()
    /*
         linux的oom機制,當系統發生OOM時,oom_adj值越小,越不容易被系統kill掉    kubelet一般設定自身oom值為 -900  取值範圍是[-1000,+1000]
        主要是通過設定/proc/ /oom_score_adj來控制
        設定為-1000,該程序就被排除在OOM Killer強制終止的物件外
    */
    if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
        glog.Warning(err)
    }
    /*
         ---程式碼中RunKubelet是下一個入口******
        RunKubelet中會新起一個goroutine來非同步執行kubelet;
        接下來的程式碼還會執行:if s.HealthzPort > 0提供健康檢查的http服務
    */
    if err := RunKubelet(kcfg, nil); err != nil {**[2]**
        return err
    }

    if s.HealthzPort > 0 {
        /*
            這段code會啟動一個http server來提供health check功能,其實目前就提供了一個/healthz/ping的endpoint,返回的http status code為200就正常。

            Kubernetes預設提供的http埠如下(在pkg/master/ports/ports.go中定義):
            KubeletStatusPort = 10248 // kubelet的health port,就是上面的HealthzPort。
            ProxyStatusPort = 10249 // kube proxy的health port。
            KubeletPort = 10250 // kubelet server的port。
            SchedulerPort = 10251 // kube scheduler的port。
            ControllerManagerPort = 10252 // kube controller manager 的port。
            KubeletReadOnlyPort = 10255 // 用於heapster監控和收集kubelet資訊使用。
        */

        healthz.DefaultHealthz()
        go util.Until(func() {
            err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
            if err != nil {
                glog.Errorf("Starting health server failed: %v", err)
            }
        }, 5*time.Second, util.NeverStop)
    }

    if s.RunOnce {
        return nil
    }

    **// run forever**
    select {}
}   

可以看到s.Run(nil)函式中有cfg, err := s.KubeletConfig()[1]
if err := RunKubelet(kcfg, nil); err != nil {[2]
三個地方需要進一步瞭解

4、cfg, err := s.KubeletConfig()解析

官方解析KubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup is not valid. It will not start any background processes.

func (s *KubeletServer) KubeletConfig() (*KubeletConfig, error) {
    /*
        HostNetworkSources=api,http,file   3種類型 定義取得pod的途徑
        default=\"*\"   all的意思
    */
    //GetValidatedSources()--->Gets all validated sources from the specified sources. 定義在pkg/kubelet/type.go中
    hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
    if err != nil {
        return nil, err
    }
    //設定HostPIDSources,HostIPCSources,也就是設定kubelet的PID和IPC的namespace
    hostPIDSources, err := kubelet.GetValidatedSources(strings.Split(s.HostPIDSources, ","))
    if err != nil {
        return nil, err
    }
    hostIPCSources, err := kubelet.GetValidatedSources(strings.Split(s.HostIPCSources, ","))
    if err != nil {
        return nil, err
    }

    //mount.New(),建立一個mounter物件,後面會用該mounter來裝載pod裡的volumns
    mounter := mount.New()
    var writer io.Writer = &io.StdWriter{}
    /*
        決定kubelet是否要跑在container內部
        Containerized的預設值是false,即不在container中執行kubelet
    */
    if s.Containerized {
        glog.V(2).Info("Running kubelet in containerized mode (experimental)")
        // 獲得mount,findmnt,umount在機器上的實際路徑
        mounter = mount.NewNsenterMounter()
        //新建一個檔案writer,如果是要寫入到container內部的話,必須先使用[nsenter](https://github.com/jpetazzo/nsenter)進入到對應的container的namespace裡面
        writer = &io.NsenterWriter{}
    }
    //TLS 數字證書 相關配置
    tlsOptions, err := s.InitializeTLS()
    if err != nil {
        return nil, err
    }
    /*
        設定在docker container裡面執行命令的方式, 預設是native(也就是docker exec),還支援nsenter
        建立dockerExecHandler,這個物件用於在docker container裡執行命令

        NativeExecHandler使用github.com/fsouza/go-dockerclient的docker client來執行命令,
        NsenterExecHandler,顧名思義,就是使用nsenter命令在container的namespace裡執行命令。程式碼在:pkg/kubelet/dockertools/exec.go。
    */
    var dockerExecHandler dockertools.ExecHandler
    switch s.DockerExecHandlerName {
    case "native":
        dockerExecHandler = &dockertools.NativeExecHandler{}
    case "nsenter":
        dockerExecHandler = &dockertools.NsenterExecHandler{}
    default:
        glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
        dockerExecHandler = &dockertools.NativeExecHandler{}
    }
    /*
        設定image回收策略,HighThresholdPercent表示磁碟使用率最高超過多大(預設90%)的時候,不停地清理image
        在後面的資源回收的manager中使用
        imageGCPolicy針對images,highThresholdPecent預設為90%,lowThresholdPercent預設為80%。
        當images所佔儲存低於lowThresholdPercent時不會GC images,如果大於等於highThresholdPecent就會一直做GC
    */
    imageGCPolicy := kubelet.ImageGCPolicy{
        HighThresholdPercent: s.ImageGCHighThresholdPercent,
        LowThresholdPercent:  s.ImageGCLowThresholdPercent,
    }
    // 設定磁碟絕對保留大小,使用者決定是否可以排程pod進來
    //當磁碟空間低於LowDiskSpaceThresholdMB(預設為256M)就不會再接受建立新的pod
    diskSpacePolicy := kubelet.DiskSpacePolicy{
        DockerFreeDiskMB: s.LowDiskSpaceThresholdMB,
        RootFreeDiskMB:   s.LowDiskSpaceThresholdMB,
    }
    //ManifestURL:獲取pod定義的url地址
    manifestURLHeader := make(http.Header)
    if s.ManifestURLHeader != "" {
        pieces := strings.Split(s.ManifestURLHeader, ":")
        if len(pieces) != 2 {
            return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
        }
        manifestURLHeader.Set(pieces[0], pieces[1])
    }
    //最後返回KubeletConfig物件
    return &KubeletConfig{
        .......
    }, nil
}

5、if err := RunKubelet(kcfg, nil); err != nil ,這是個重點函式

func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
    //把前面構建好的KubeletConfig作為引數傳遞進來

    /*
        一開始先設定node name,如果沒有在config中設定就會找host的hostname ,如果有cloudprovider,就去獲取cloud instance的node name
        如果不指定的話,就是用當前機器名字(uname -n獲得)
        kcfg.HostnameOverride 的值是kubelet --hostname-override xxxx中的xxxx
    */
    kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride)

    if len(kcfg.NodeName) == 0 {
        // Query the cloud provider for our node name, default to Hostname
        nodeName := kcfg.Hostname
        if kcfg.Cloud != nil {
            var err error
            instances, ok := kcfg.Cloud.Instances()
            if !ok {
                return fmt.Errorf("failed to get instances from cloud provider")
            }

            nodeName, err = instances.CurrentNodeName(kcfg.Hostname)
            if err != nil {
                return fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
            }

            glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
        }

        kcfg.NodeName = nodeName
    }
    /*
        新建一個廣播事件通道
        建立一個eventBroadcaster(在pkg/client/record/event.go),該物件用於向api server傳送kubelet管理pods時的各種事件
    */
    eventBroadcaster := record.NewBroadcaster()
    //建eventRecord並且賦值給kubelet cfg,後面會用到,eventRecord會把event傳送到eventBroadcaster中的watcher
    kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
    eventBroadcaster.StartLogging(glog.V(3).Infof)
    if kcfg.KubeClient != nil {
        //這地方表明kubelet會把自己的事情通知 api server
        glog.V(4).Infof("Sending events to api server.")
        if kcfg.EventRecordQPS == 0.0 {
            //eventBroadcaster開始從watcher的result channel中獲取event,傳送給api server
            eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events(""))
        } else {
            eventClient := *kcfg.KubeClient
            eventClient.Throttle = util.NewTokenBucketRateLimiter(kcfg.EventRecordQPS, kcfg.EventBurst)
            eventBroadcaster.StartRecordingToSink(eventClient.Events(""))
        }
    } else {
        glog.Warning("No api server defined - no events will be sent to API server.")
    }

    privilegedSources := capabilities.PrivilegedSources{
        HostNetworkSources: kcfg.HostNetworkSources,
        HostPIDSources:     kcfg.HostPIDSources,
        HostIPCSources:     kcfg.HostIPCSources,
    }
    //先設定所有containers都要遵從的capabilities,後面在run pod的時候會用到capabilities裡的這些constraints
    capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0)
    //設定PreferredDockercfgPath
    credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)

    if builder == nil {
        /*
            預設情況下,會建立一個CreateAndInitKubelet的builder,
            然後執行CreateAndInitKubelet,設定當前kuberlet管理的node上container的停掉之後回收時間間隔,保留個數,最大container個數。
            同時獲得一個podCfg物件(pc = makePodSourceConfig(kc)),
            podCfg按照如下方式進行初始化:
                config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
        */
        builder = createAndInitKubelet**[3]**
    }
    if kcfg.OSInterface == nil {
        kcfg.OSInterface = kubecontainer.RealOS{}
    }
    /*
        *********k, podCfg, err := builder(kcfg)********
        由於run kubelet時build為nil,所以builder函式就是createAndInitKubelet。
        podCfg存放取回來的pod資源資訊
    */
    k, podCfg, err := builder(kcfg)**[3]**
    if err != nil {
        return fmt.Errorf("failed to create kubelet: %v", err)
    }

    util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)

    /*
        上面的createAndInitKubelet中 k.BirthCry() 向Api server傳送一個"Starting kubelet"的訊息
        啟動起來,process pods and exit.
        最後根據是否執行一次,執行k.RunOnce(podCfg.Updates())或者startKubelet(k, podCfg, kcfg),
        startKubelet(k, podCfg, kcfg)中是執行k.Run(podCfg.Updates())

        這個Run()方法在pkg/kubelet/kubelet.go 中定義:func (kl *Kubelet) Run(updates <-chan PodUpdate)
    */
    // process pods and exit.

    if kcfg.Runonce {
        /*
            把podCfg.Updates()傳遞進去
            RunOnce函式從一個配置中 輪詢更新,並執行關聯的pod。
            定義在pkg/kubeletrunonce.go中 func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error)
        */
        if _, err := k.RunOnce(podCfg.Updates()); err != nil {**[4]**
            return fmt.Errorf("runonce failed: %v", err)
        }
        glog.Infof("Started kubelet as runonce")
    } else {
        //---下面進入startKubelet
        startKubelet(k, podCfg, kcfg)**[5]**
        glog.Infof("Started kubelet")
    }
    return nil
}

RunKubelet(kcfg, nil)函式中重要的點在於[3][4][5],其中
[3]builder = createAndInitKubelet
k, podCfg, err := builder(cfg)
[3]預設情況下,會建立一個CreateAndInitKubelet的builder,
由於run kubelet時build為nil,所以builder函式就是createAndInitKubelet。
即執行builder(cfg),也就是CreateAndInitKubelet(cfg)
[4]k.RunOnce(podCfg.Updates())
[5]startKubelet(k, podCfg, cfg) 利用podCfg的資訊執行相關的pod
下面對這三個點進行詳細解析。

6、createAndInitKubelet(kc *KubeletConfig) 解析

重點函式
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
    var kubeClient client.Interface
    if kc.KubeClient != nil {
        kubeClient = kc.KubeClient
    }
    gcPolicy := kubelet.ContainerGCPolicy{
        //k8s已經做了images所佔空間和containers數量的限制,可以很大程度上降低disk被撐爆的可能性
        MinAge:             kc.MinimumGCAge,            //預設10s
        MaxPerPodContainer: kc.MaxPerPodContainerCount, //預設2,每個容器最多保留2個死亡例項
        MaxContainers:      kc.MaxContainerCount,       //預設100,每個節點最多保留100個死亡例項
    }
    daemonEndpoints := &api.NodeDaemonEndpoints{
        KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},
    }
    /*
        pc = makePodSourceConfig(kc),kubelet從三個渠道來獲取pods的資訊。
        pod資源的生產者,通過chan來交付給消費者kubelet
        獲得一個podCfg物件(pc = makePodSourceConfig(kc))
        pc存放著剛獲取到的pod資源

    ************************************
    ************************************
    ************pod資源的生產者****************
    ************************************
    ************************************
    */

    pc = makePodSourceConfig(kc)**[6]**
    /*
        k, err = kubelet.NewMainKubelet(...)新建一個Kubelet例項,在NewMainKubelet函式裡面進行容器管理和節點相關的初始化
        kubelet.NewMainKubelet在pkg/kubelet/kubelet.go中定義了    函式NewMainKubelet完成了真正的初始化工作
        開始真正建立kubelet物件!!
    */
    k, err = kubelet.NewMainKubelet(
        .......
    )
    if err != nil {
        return nil, nil, err
    }
    //得到一個建立好的kubelet物件
    //k.BirthCry(),向api server傳送一個"Starting kubelet"的訊息
    k.BirthCry()
    //觸發kubelet開啟垃圾回收協程以清理無用的容器和映象,釋放磁碟空間
    k.StartGarbageCollection()

    return k, pc, nil
    //回到RunKubelet,執行一條startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig),開始啟動剛才建立好的kubelet。
}

createAndInitKubelet裡面重要的是pc = makePodSourceConfig(kc)[6],主要負責取得三個途徑定義的pod資源,從而通過chan交付給kubelet元件

&&&&&&&&&&&&&&&&&&&&&&&&
6.1 pc = makePodSourceConfig(kc) 解析
見後面第8點解析,這是pod資源的生產者,通過chan來交付給消費者kubelet

7、下面開始解析 標註[4]和[5]

[4]k.RunOnce(podCfg.Updates())
//從本地 manifest 或者 遠端URL 讀取並執行pod,結束就退出。和 –api-servers 、–enable-server 引數不相容

[5]startKubelet(k, podCfg, kcfg)

//---看到Kubelet.Run,這個才是kubelet的真正的入口,進入pkg的真正業務原始碼部分  引數k就是kubelet*******
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
    //startKubelet方法首先啟動一個協程,讓kubelet處理來自於Pod Source的Pod Update訊息,然後啟動kubelet server
    // start the kubelet

    /*
        **********重點了解************
        podCfg就是 通過上面RunKubelet()函式的createAndInitKubelet的makePodSourceConfig取得的filesource,urlsource和api server source的pod資訊的源頭
        Updates()方法會返回一個channel,該channel會不斷pop up出從三個sources接收到的pod的change state
        Run函式定義在pkg\kubelet\kubelet.go   func (kl *Kubelet) Run(updates <-chan PodUpdate)
    */
    go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)**[7]**

    // start the kubelet server
    if kc.EnableServer {
        go util.Until(func() {
            //這裡建立了一個http server,預設監聽在0.0.0.0:10250上,可以通過http的get request獲取不少資訊
            k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)
        }, 0, util.NeverStop)
    }
    if kc.ReadOnlyPort > 0 {
        go util.Until(func() {
            //這裡也建立了一個http server,該server提供給heapster來收集當前kubelet的metris資訊
            k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
        }, 0, util.NeverStop)
    }
}
startKubelet(k, podCfg, kcfg)函式重要的點在於go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)**[7]**,啟動一個協程,讓kubelet處理來自於Pod Source的Pod Update訊息

8、pc = makePodSourceConfig(kc) 解析

pc存放著剛獲取到的pod資源;

重點---pod 的管理過程:(生產過程)
apiServer負責接收server發過來的Pod管理資訊,通過channel推送到PodConfig。
PodConfig的mux使用Storage的Merge方法,Merge方法又會通過Updates 這個channel將Pod資料推給Kubelet,真正的Pod處理在Kubelet包中。
Kubelet通過syncLoop監聽channel,收到資料後就執行關於Pod和容器的處理,真正操作容器的方法是呼叫dockertools包中的方法。

生產方式有3種,定義在pkg/kubelet/config/apiserver.go    file.go  http.go 三個檔案中。
生產者:通過三種方式進行pod資訊的獲取,也就是生產者,通過chan的方式法送給消費者。
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
    //這裡構建了一個chan,並作為返回值返回給了上一級呼叫者。

    /*
        建立一個PodConfig物件,並根據啟動引數中的Pod Source是否提供,來建立對應型別的Pod Source物件
        Pod Source在各種協程中執行,拉去pod資訊並彙總輸出到同一個Pod Channel中等待kubelet處理。

        makePodSourceConfig(kc *KubeletConfig) 負責建立PodConfig物件
    */

    /*
        cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
        定義在pkg/kubelet/config/config.go中
        cfg是一個傳送管道!!!!生產者->消費者kubelet
        此函式建立PodConfig物件。它建立起了apiServer到後端kubelet處理訊息之間的聯絡。
        建立一個pods source的storage然後封裝在config裡!
        完成podcfg的初始化工作
        PodConfigNotificationIncremental是傳遞增加,更新和刪除的訊息的標識,這樣建立完成一個PodConfig例項。
    */
    // source of all configuration

    cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)

    //下面分別定義了3中模式,HostNetworkSources=api,http,file
    // define file config source
    if kc.ConfigFile != "" {
        glog.Infof("Adding manifest file: %v", kc.ConfigFile)
        /*
            加入一個來自本地file/directory的pods源頭,這個configFile預設是沒有的,
            如果需要在kubelet初始化的時候建立一些static pods,就可以使用這種方式。
            這種pods可以被更新,使用FileCheckFrequency和HTTPCheckFrequency來控制更新後被重新建立的頻率

             cfg.Channel(kubelet.FileSource) 把資源放進cfg中
        */
        config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
    }
    //ManifestURL:獲取pod定義的url地址
    // define url config source
    if kc.ManifestURL != "" {
        glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)
        /*
            從一個url來獲取pod,但只能有一個pod
            這種pods可以被更新,使用FileCheckFrequency和HTTPCheckFrequency來控制更新後被重新建立的頻率
        */
        config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
    }
    if kc.KubeClient != nil {
        glog.Infof("Watching apiserver")
        /*
            !!!重頭戲,所有來自API Server建立的pods都會通過這條路徑被kubelet監測到並做相關操作。
        */
        config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource))
    }
    return cfg
}

三種獲取pod方式的具體實現在後面再進行解析,定義在pkg/kubelet/config/apiserver.go file.go http.go 三個檔案中。

9、解析上面[7]k.Run(podCfg.Updates())

Run函式定義在pkg\kubelet\kubelet.go func (kl *Kubelet) Run(updates <-chan PodUpdate)

/*
    重點---pod 的管理過程(消費):
    Kubelet的Run方法迴圈監聽updates channel上的訊息。當收到訊息時就進行處理。
    以增加一個pod為例,HandlePodAddition的dispatchWork最終會呼叫到dockertools包中的方法,進行Pod內容器的操作。
    此處的呼叫關係比較深
*/
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
    //真正的Run的執行體,這個是在之前的k8s.io\kubernetes\cmd\kubelet\app\中的startKubelet中執行的

    //啟動一個http的log server,可以用於檢視/var/log目錄下的檔案;
    if kl.logServer == nil {
        kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
    }
    if kl.kubeClient == nil {
        glog.Warning("No api server defined - no node status update will be sent.")
    }
    //把kubelet遷移到一個cgroups的容器裡;
    // Move Kubelet to a container.
    if kl.resourceContainer != "" {
        // Fixme: I need to reside inside ContainerManager interface.
        err := util.RunInResourceContainer(kl.resourceContainer)
        if err != nil {
            glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)
        }
        glog.Infof("Running in container %q", kl.resourceContainer)
    }
    /*
        start各種上面建立的manager
        分別啟動如下元件(下面說明以docker container作為管理物件為主):
        imageManager, 同步本地快取的image資訊和記憶體中的快取資訊一致
        cadvisor,啟動並且通過埠暴露api
        containerManager,確保container是否正常狀態
        oomWatcher 通過cadvisor獲得記憶體的使用資訊,並且廣播給api-server
    */
    if err := kl.imageManager.Start(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ImageManager %v", err)
        glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
    }

    if err := kl.cadvisor.Start(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start CAdvisor %v", err)
        glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
    }

    if err := kl.containerManager.Start(); err != nil {
        kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ContainerManager %v", err)
        glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
    }

    if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
        kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start OOM watcher %v", err)
        glog.Errorf("Failed to start OOM watching: %v", err)
    }
    /*
        The container runtime to use. Possible values: 'docker', 'rkt'
        啟動對應的runtime
        開一個goroutine來更新docker daemon的啟動狀態:go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
    */
    go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)

    // Start a goroutine responsible for killing pods (that are not properly
    // handled by pod workers).
    go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)

    // Run the system oom watcher forever.
    /*
        kl.statusManager.Start()
        從apiserver 同步pod的資訊 ;Starting to sync pod status with apiserver
        定義在pkg/kubelet/status/manager.go
    */
    kl.statusManager.Start()
    /*
        *******重點了解**********
        kl.syncLoop(updates, kl) 處理updates 也就是PodConfig產生的updates
        這個方法會使用一個forever loop  永不結束
    */
    kl.syncLoop(updates, kl)

}

Run(updates <-chan PodUpdate)函式中重要的是最後的kl.syncLoop(updates, kl)

10、kl.syncLoop(updates, kl)解析

// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.

/*
    譯:syncLoop是處理更改的主迴圈。 它監視來自三個通道(file,apiserver和http)的更改,並建立它們的並集(merge)。
       對於看到的任何新更改,將針對所需狀態和執行狀態運行同步。
       如果配置沒有發生變化,將在每個同步頻率秒同步最後一個已知的期望狀態。 永遠不會返回。
*/
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
    //其中的update是被傳遞下來的pod資訊,handler其實就是kubelet自身
    glog.Info("Starting kubelet main sync loop.")
    kl.resyncTicker = time.NewTicker(kl.resyncInterval)
    var housekeepingTimestamp time.Time
    for {
        //先判斷docker deamon和network是否就緒,如果沒有,就一直等待直到就緒
        if !kl.containerRuntimeUp() {
            time.Sleep(5 * time.Second)
            glog.Infof("Skipping pod synchronization, container runtime is not up.")
            continue
        }
        if !kl.doneNetworkConfigure() {
            time.Sleep(5 * time.Second)
            glog.Infof("Skipping pod synchronization, network is not configured")
            continue
        }

        // Make sure we sync first to receive the pods from the sources before
        // performing housekeeping.

        // before performing housekeeping,確保一開始先從三個源頭同步獲取到pod的資料

        //重要的函式,真正處理chan的地方
        if !kl.syncLoopIteration(updates, handler) {**[8]**
            break
        }
        // We don't want to perform housekeeping too often, so we set a minimum
        // period for it. Housekeeping would be performed at least once every
        // kl.resyncInterval, and *no* more than once every
        // housekeepingMinimumPeriod.
        // TODO (#13418): Investigate whether we can/should spawn a dedicated
        // goroutine for housekeeping
        if !kl.sourcesReady() {
            // If the sources aren't ready, skip housekeeping, as we may
            // accidentally delete pods from unready sources.
            glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
        } else if housekeepingTimestamp.IsZero() {
            housekeepingTimestamp = time.Now()
        } else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod {
            glog.V(4).Infof("SyncLoop (housekeeping)")
            if err := handler.HandlePodCleanups(); err != nil {
                glog.Errorf("Failed cleaning pods: %v", err)
            }
            housekeepingTimestamp = time.Now()
        }
    }
}

重要的地方在於[8]syncLoopIteration(updates, handler)

11、syncLoopIteration(updates, handler)解析

/*
    重點---pod 的管理過程:
    apiServer負責接收server發過來的Pod管理資訊,通過channel推送到PodConfig。
    PodConfig的mux使用Storage的Merge方法,Merge方法又會通過Updates 這個channel將Pod資料推給Kubelet,真正的Pod處理在Kubelet包中。
    Kubelet通過syncLoop監聽channel,收到資料後就執行關於Pod和容器的處理,真正操作容器的方法呼叫dockertools包中的方法
*/
func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) bool {
    //負責消費PodUpdate,handler其實就是kubelet自身
    //從傳送管道中,獲取到pod資訊,然後根據pod的型別,分別呼叫了不同的處理介面。
    kl.syncLoopMonitor.Store(time.Now())
    select {
    case u, open := <-updates:
        if !open {
            glog.Errorf("Update channel is closed. Exiting the sync loop.")
            return false
        }
        switch u.Op {
        case ADD:
            glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))
            handler.HandlePodAdditions(u.Pods)
        case UPDATE:
            glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))
            //定義在下面的func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod)
            handler.HandlePodUpdates(u.Pods)
        case REMOVE:
            glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))
            handler.HandlePodDeletions(u.Pods)
        case SET:
            // TODO: Do we want to support this?
            glog.Errorf("Kubelet does not support snapshot update")
        }
    case <-kl.resyncTicker.C:
        // Periodically syncs all the pods and performs cleanup tasks.
        glog.V(4).Infof("SyncLoop (periodic sync)")
        handler.HandlePodSyncs(kl.podManager.GetPods())
    }
    kl.syncLoopMonitor.Store(time.Now())
    return true
}

這就是kubelet的主體流程部分。
後面將進一步解析pod獲取的三種方式、syncLoopIteration中如何具體執行相關的pod
一個生產消費者模型(通過一個通道chan)

參考: