基於1.1.2版本kubelet原始碼閱讀(1)
kubelet的主體流程部分,主要是為了理解kubelet元件的主體流程是如何執行的,是如何獲取pod資訊,然後如何對pod進行部署和update等操作。
(主要是根據網上一些公開的資訊,再根據自己的理解來進行整合,加深自己對k8s原始碼的學習)
1、從cmd/kubelet/kubelet.go開始main()函式
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
//kubelet 的main函式入口,新建一個NewKubeletServer
//NewKubeletServer()初始化kubelet Server的fields為預設值。然後在初始化flags和logs之後就可開始Run kubelet Server
s := app.NewKubeletServer ()
//解析flag
s.AddFlags(pflag.CommandLine)
//future work:後面要了解引數是如何寫到系統裡面的?是InitFlags如何運作麼?
util.InitFlags()
util.InitLogs()
defer util.FlushLogs()
verflag.PrintAndExitIfRequested()
if err := s.Run(nil); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit (1)
}
}
main函式主要執行NewKubeletServer()、AddFlags(pflag.CommandLine)、 s.Run(nil)這三個步驟,下面具體分析這三個函式
2、NewKubeletServer()解析
定義在cmd/kubelet/server.go
NewKubeletServer()建立一個KubeletServer,主要是完成引數的初始化—with default values
return &KubeletServer{…}
下面 給出部分引數的含義:
Address: net.ParseIP("0.0.0.0"),
ContainerRuntime: "docker",
DockerExecHandlerName: "native",
HealthzBindAddress: net.ParseIP("127.0.0.1"),
HealthzPort: 10248,
ImageGCHighThresholdPercent: 90,//表示此盤利用率超過90%的時候,會一直對image資源進行回收
ImageGCLowThresholdPercent: 80,//表示此盤利用率低於80%的時候,不會進行GC
KubeConfig: util.NewStringFlag("/var/lib/kubelet/kubeconfig"),
MaxContainerCount: 100, //每個節點最多保留100個死亡例項
MaxPerPodContainerCount: 2, //每個容器最多保留2個死亡例項
RegisterNode: true, // will be ignored if no apiserver is configured
ResourceContainer: "/kubelet",
RootDirectory: defaultRootDir, //預設值是/var/lib/kubelet,存放配置及VM卷等資料
3、 s.Run(nil)
定義在cmd/kubelet/server.go
/*
Run 基於指定的KubeletConfig執行一個KubeletServer,never exist!
KubeletConfig可能是nil的。If nil, 在KubeletServer中利用預設值完成初始化設定
If not nil,the default values 會被忽略
*/
func (s *KubeletServer) Run(kcfg *KubeletConfig) error {
if kcfg == nil {
//第一次是通過s.Run(nil)來呼叫
//準備一個KubeletConfig,配置一些引數
//KubeletConfig會返回適合執行的KubeletConfig,如果伺服器設定無效,則返回錯誤。 它不會啟動任何後臺程序。
cfg, err := s.KubeletConfig()**[1]**
if err != nil {
return err
}
kcfg = cfg
//呼叫CreateAPIServerClientConfig初始化clientConfig
//CreateAPIServerClientConfig使用command line flags來生成 client.Config,including api-server-list
//KubeClient是kubelet 和 Api server傳遞資訊的唯一途徑。
////CreateAPIServerClientConfig根據使用者的傳遞--kubeconfig或者預設.kubeconfig檔案來解析client配置。
clientConfig, err := s.CreateAPIServerClientConfig()
if err == nil {
/*
最終初始化KubeClient。
定義在client "k8s.io/kubernetes/pkg/client/unversioned/helper.go"
New(c *Config)為指定的配置建立一個Kubernetes client。
該client可以對pods,replication controllers, daemons, and services這些物件執行list, get, update and delete操作。
如果提供的配置無效,則返回錯誤。
*/
kcfg.KubeClient, err = client.New(clientConfig)
}
if err != nil && len(s.APIServerList) > 0 {
glog.Warningf("No API client: %v", err)
}
//cloudprovider 和IaaS雲商有關
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
return err
}
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
kcfg.Cloud = cloud
}
/*
啟動cadvisor來監控本地的容器
通過CAdvisor我們可以看到kuberlet管理的機器上container的資源統計資訊
cadvisor原始碼在pkg/kubelet/cadvisor/cadvisor_linux.go中
*/
if kcfg.CAdvisorInterface == nil {
ca, err := cadvisor.New(s.CAdvisorPort)
if err != nil {
return err
}
kcfg.CAdvisorInterface = ca
}
util.ReallyCrash = s.ReallyCrashForTesting
rand.Seed(time.Now().UTC().UnixNano())
/*
設定preferred Dockercfg 檔案path,這個是為了讀取.dockercfg檔案,
該檔案中的secret可用於pull/push docker registry中的docker映象,預設的路徑為:/var/lib/kubelet
*/
credentialprovider.SetPreferredDockercfgPath(s.RootDirectory)
glog.V(2).Infof("Using root directory: %v", s.RootDirectory)
// TODO(vmarmol): Do this through container config.
oomAdjuster := oom.NewOomAdjuster()
/*
linux的oom機制,當系統發生OOM時,oom_adj值越小,越不容易被系統kill掉 kubelet一般設定自身oom值為 -900 取值範圍是[-1000,+1000]
主要是通過設定/proc/ /oom_score_adj來控制
設定為-1000,該程序就被排除在OOM Killer強制終止的物件外
*/
if err := oomAdjuster.ApplyOomScoreAdj(0, s.OOMScoreAdj); err != nil {
glog.Warning(err)
}
/*
---程式碼中RunKubelet是下一個入口******
RunKubelet中會新起一個goroutine來非同步執行kubelet;
接下來的程式碼還會執行:if s.HealthzPort > 0提供健康檢查的http服務
*/
if err := RunKubelet(kcfg, nil); err != nil {**[2]**
return err
}
if s.HealthzPort > 0 {
/*
這段code會啟動一個http server來提供health check功能,其實目前就提供了一個/healthz/ping的endpoint,返回的http status code為200就正常。
Kubernetes預設提供的http埠如下(在pkg/master/ports/ports.go中定義):
KubeletStatusPort = 10248 // kubelet的health port,就是上面的HealthzPort。
ProxyStatusPort = 10249 // kube proxy的health port。
KubeletPort = 10250 // kubelet server的port。
SchedulerPort = 10251 // kube scheduler的port。
ControllerManagerPort = 10252 // kube controller manager 的port。
KubeletReadOnlyPort = 10255 // 用於heapster監控和收集kubelet資訊使用。
*/
healthz.DefaultHealthz()
go util.Until(func() {
err := http.ListenAndServe(net.JoinHostPort(s.HealthzBindAddress.String(), strconv.Itoa(s.HealthzPort)), nil)
if err != nil {
glog.Errorf("Starting health server failed: %v", err)
}
}, 5*time.Second, util.NeverStop)
}
if s.RunOnce {
return nil
}
**// run forever**
select {}
}
可以看到s.Run(nil)函式中有cfg, err := s.KubeletConfig()[1]
if err := RunKubelet(kcfg, nil); err != nil {[2]
三個地方需要進一步瞭解
4、cfg, err := s.KubeletConfig()解析
官方解析KubeletConfig returns a KubeletConfig suitable for being run, or an error if the server setup is not valid. It will not start any background processes.
func (s *KubeletServer) KubeletConfig() (*KubeletConfig, error) {
/*
HostNetworkSources=api,http,file 3種類型 定義取得pod的途徑
default=\"*\" all的意思
*/
//GetValidatedSources()--->Gets all validated sources from the specified sources. 定義在pkg/kubelet/type.go中
hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
if err != nil {
return nil, err
}
//設定HostPIDSources,HostIPCSources,也就是設定kubelet的PID和IPC的namespace
hostPIDSources, err := kubelet.GetValidatedSources(strings.Split(s.HostPIDSources, ","))
if err != nil {
return nil, err
}
hostIPCSources, err := kubelet.GetValidatedSources(strings.Split(s.HostIPCSources, ","))
if err != nil {
return nil, err
}
//mount.New(),建立一個mounter物件,後面會用該mounter來裝載pod裡的volumns
mounter := mount.New()
var writer io.Writer = &io.StdWriter{}
/*
決定kubelet是否要跑在container內部
Containerized的預設值是false,即不在container中執行kubelet
*/
if s.Containerized {
glog.V(2).Info("Running kubelet in containerized mode (experimental)")
// 獲得mount,findmnt,umount在機器上的實際路徑
mounter = mount.NewNsenterMounter()
//新建一個檔案writer,如果是要寫入到container內部的話,必須先使用[nsenter](https://github.com/jpetazzo/nsenter)進入到對應的container的namespace裡面
writer = &io.NsenterWriter{}
}
//TLS 數字證書 相關配置
tlsOptions, err := s.InitializeTLS()
if err != nil {
return nil, err
}
/*
設定在docker container裡面執行命令的方式, 預設是native(也就是docker exec),還支援nsenter
建立dockerExecHandler,這個物件用於在docker container裡執行命令
NativeExecHandler使用github.com/fsouza/go-dockerclient的docker client來執行命令,
NsenterExecHandler,顧名思義,就是使用nsenter命令在container的namespace裡執行命令。程式碼在:pkg/kubelet/dockertools/exec.go。
*/
var dockerExecHandler dockertools.ExecHandler
switch s.DockerExecHandlerName {
case "native":
dockerExecHandler = &dockertools.NativeExecHandler{}
case "nsenter":
dockerExecHandler = &dockertools.NsenterExecHandler{}
default:
glog.Warningf("Unknown Docker exec handler %q; defaulting to native", s.DockerExecHandlerName)
dockerExecHandler = &dockertools.NativeExecHandler{}
}
/*
設定image回收策略,HighThresholdPercent表示磁碟使用率最高超過多大(預設90%)的時候,不停地清理image
在後面的資源回收的manager中使用
imageGCPolicy針對images,highThresholdPecent預設為90%,lowThresholdPercent預設為80%。
當images所佔儲存低於lowThresholdPercent時不會GC images,如果大於等於highThresholdPecent就會一直做GC
*/
imageGCPolicy := kubelet.ImageGCPolicy{
HighThresholdPercent: s.ImageGCHighThresholdPercent,
LowThresholdPercent: s.ImageGCLowThresholdPercent,
}
// 設定磁碟絕對保留大小,使用者決定是否可以排程pod進來
//當磁碟空間低於LowDiskSpaceThresholdMB(預設為256M)就不會再接受建立新的pod
diskSpacePolicy := kubelet.DiskSpacePolicy{
DockerFreeDiskMB: s.LowDiskSpaceThresholdMB,
RootFreeDiskMB: s.LowDiskSpaceThresholdMB,
}
//ManifestURL:獲取pod定義的url地址
manifestURLHeader := make(http.Header)
if s.ManifestURLHeader != "" {
pieces := strings.Split(s.ManifestURLHeader, ":")
if len(pieces) != 2 {
return nil, fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
}
manifestURLHeader.Set(pieces[0], pieces[1])
}
//最後返回KubeletConfig物件
return &KubeletConfig{
.......
}, nil
}
5、if err := RunKubelet(kcfg, nil); err != nil ,這是個重點函式
func RunKubelet(kcfg *KubeletConfig, builder KubeletBuilder) error {
//把前面構建好的KubeletConfig作為引數傳遞進來
/*
一開始先設定node name,如果沒有在config中設定就會找host的hostname ,如果有cloudprovider,就去獲取cloud instance的node name
如果不指定的話,就是用當前機器名字(uname -n獲得)
kcfg.HostnameOverride 的值是kubelet --hostname-override xxxx中的xxxx
*/
kcfg.Hostname = nodeutil.GetHostname(kcfg.HostnameOverride)
if len(kcfg.NodeName) == 0 {
// Query the cloud provider for our node name, default to Hostname
nodeName := kcfg.Hostname
if kcfg.Cloud != nil {
var err error
instances, ok := kcfg.Cloud.Instances()
if !ok {
return fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err = instances.CurrentNodeName(kcfg.Hostname)
if err != nil {
return fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
}
kcfg.NodeName = nodeName
}
/*
新建一個廣播事件通道
建立一個eventBroadcaster(在pkg/client/record/event.go),該物件用於向api server傳送kubelet管理pods時的各種事件
*/
eventBroadcaster := record.NewBroadcaster()
//建eventRecord並且賦值給kubelet cfg,後面會用到,eventRecord會把event傳送到eventBroadcaster中的watcher
kcfg.Recorder = eventBroadcaster.NewRecorder(api.EventSource{Component: "kubelet", Host: kcfg.NodeName})
eventBroadcaster.StartLogging(glog.V(3).Infof)
if kcfg.KubeClient != nil {
//這地方表明kubelet會把自己的事情通知 api server
glog.V(4).Infof("Sending events to api server.")
if kcfg.EventRecordQPS == 0.0 {
//eventBroadcaster開始從watcher的result channel中獲取event,傳送給api server
eventBroadcaster.StartRecordingToSink(kcfg.KubeClient.Events(""))
} else {
eventClient := *kcfg.KubeClient
eventClient.Throttle = util.NewTokenBucketRateLimiter(kcfg.EventRecordQPS, kcfg.EventBurst)
eventBroadcaster.StartRecordingToSink(eventClient.Events(""))
}
} else {
glog.Warning("No api server defined - no events will be sent to API server.")
}
privilegedSources := capabilities.PrivilegedSources{
HostNetworkSources: kcfg.HostNetworkSources,
HostPIDSources: kcfg.HostPIDSources,
HostIPCSources: kcfg.HostIPCSources,
}
//先設定所有containers都要遵從的capabilities,後面在run pod的時候會用到capabilities裡的這些constraints
capabilities.Setup(kcfg.AllowPrivileged, privilegedSources, 0)
//設定PreferredDockercfgPath
credentialprovider.SetPreferredDockercfgPath(kcfg.RootDirectory)
if builder == nil {
/*
預設情況下,會建立一個CreateAndInitKubelet的builder,
然後執行CreateAndInitKubelet,設定當前kuberlet管理的node上container的停掉之後回收時間間隔,保留個數,最大container個數。
同時獲得一個podCfg物件(pc = makePodSourceConfig(kc)),
podCfg按照如下方式進行初始化:
config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
*/
builder = createAndInitKubelet**[3]**
}
if kcfg.OSInterface == nil {
kcfg.OSInterface = kubecontainer.RealOS{}
}
/*
*********k, podCfg, err := builder(kcfg)********
由於run kubelet時build為nil,所以builder函式就是createAndInitKubelet。
podCfg存放取回來的pod資源資訊
*/
k, podCfg, err := builder(kcfg)**[3]**
if err != nil {
return fmt.Errorf("failed to create kubelet: %v", err)
}
util.ApplyRLimitForSelf(kcfg.MaxOpenFiles)
/*
上面的createAndInitKubelet中 k.BirthCry() 向Api server傳送一個"Starting kubelet"的訊息
啟動起來,process pods and exit.
最後根據是否執行一次,執行k.RunOnce(podCfg.Updates())或者startKubelet(k, podCfg, kcfg),
startKubelet(k, podCfg, kcfg)中是執行k.Run(podCfg.Updates())
這個Run()方法在pkg/kubelet/kubelet.go 中定義:func (kl *Kubelet) Run(updates <-chan PodUpdate)
*/
// process pods and exit.
if kcfg.Runonce {
/*
把podCfg.Updates()傳遞進去
RunOnce函式從一個配置中 輪詢更新,並執行關聯的pod。
定義在pkg/kubeletrunonce.go中 func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error)
*/
if _, err := k.RunOnce(podCfg.Updates()); err != nil {**[4]**
return fmt.Errorf("runonce failed: %v", err)
}
glog.Infof("Started kubelet as runonce")
} else {
//---下面進入startKubelet
startKubelet(k, podCfg, kcfg)**[5]**
glog.Infof("Started kubelet")
}
return nil
}
RunKubelet(kcfg, nil)函式中重要的點在於[3][4][5],其中
[3]builder = createAndInitKubelet
k, podCfg, err := builder(cfg)
[3]預設情況下,會建立一個CreateAndInitKubelet的builder,
由於run kubelet時build為nil,所以builder函式就是createAndInitKubelet。
即執行builder(cfg),也就是CreateAndInitKubelet(cfg)
[4]k.RunOnce(podCfg.Updates())
[5]startKubelet(k, podCfg, cfg) 利用podCfg的資訊執行相關的pod
下面對這三個點進行詳細解析。
6、createAndInitKubelet(kc *KubeletConfig) 解析
重點函式
func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.PodConfig, err error) {
var kubeClient client.Interface
if kc.KubeClient != nil {
kubeClient = kc.KubeClient
}
gcPolicy := kubelet.ContainerGCPolicy{
//k8s已經做了images所佔空間和containers數量的限制,可以很大程度上降低disk被撐爆的可能性
MinAge: kc.MinimumGCAge, //預設10s
MaxPerPodContainer: kc.MaxPerPodContainerCount, //預設2,每個容器最多保留2個死亡例項
MaxContainers: kc.MaxContainerCount, //預設100,每個節點最多保留100個死亡例項
}
daemonEndpoints := &api.NodeDaemonEndpoints{
KubeletEndpoint: api.DaemonEndpoint{Port: int(kc.Port)},
}
/*
pc = makePodSourceConfig(kc),kubelet從三個渠道來獲取pods的資訊。
pod資源的生產者,通過chan來交付給消費者kubelet
獲得一個podCfg物件(pc = makePodSourceConfig(kc))
pc存放著剛獲取到的pod資源
************************************
************************************
************pod資源的生產者****************
************************************
************************************
*/
pc = makePodSourceConfig(kc)**[6]**
/*
k, err = kubelet.NewMainKubelet(...)新建一個Kubelet例項,在NewMainKubelet函式裡面進行容器管理和節點相關的初始化
kubelet.NewMainKubelet在pkg/kubelet/kubelet.go中定義了 函式NewMainKubelet完成了真正的初始化工作
開始真正建立kubelet物件!!
*/
k, err = kubelet.NewMainKubelet(
.......
)
if err != nil {
return nil, nil, err
}
//得到一個建立好的kubelet物件
//k.BirthCry(),向api server傳送一個"Starting kubelet"的訊息
k.BirthCry()
//觸發kubelet開啟垃圾回收協程以清理無用的容器和映象,釋放磁碟空間
k.StartGarbageCollection()
return k, pc, nil
//回到RunKubelet,執行一條startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig),開始啟動剛才建立好的kubelet。
}
createAndInitKubelet裡面重要的是pc = makePodSourceConfig(kc)[6],主要負責取得三個途徑定義的pod資源,從而通過chan交付給kubelet元件
&&&&&&&&&&&&&&&&&&&&&&&&
6.1 pc = makePodSourceConfig(kc) 解析
見後面第8點解析,這是pod資源的生產者,通過chan來交付給消費者kubelet
7、下面開始解析 標註[4]和[5]
[4]k.RunOnce(podCfg.Updates())
//從本地 manifest 或者 遠端URL 讀取並執行pod,結束就退出。和 –api-servers 、–enable-server 引數不相容
[5]startKubelet(k, podCfg, kcfg)
//---看到Kubelet.Run,這個才是kubelet的真正的入口,進入pkg的真正業務原始碼部分 引數k就是kubelet*******
func startKubelet(k KubeletBootstrap, podCfg *config.PodConfig, kc *KubeletConfig) {
//startKubelet方法首先啟動一個協程,讓kubelet處理來自於Pod Source的Pod Update訊息,然後啟動kubelet server
// start the kubelet
/*
**********重點了解************
podCfg就是 通過上面RunKubelet()函式的createAndInitKubelet的makePodSourceConfig取得的filesource,urlsource和api server source的pod資訊的源頭
Updates()方法會返回一個channel,該channel會不斷pop up出從三個sources接收到的pod的change state
Run函式定義在pkg\kubelet\kubelet.go func (kl *Kubelet) Run(updates <-chan PodUpdate)
*/
go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)**[7]**
// start the kubelet server
if kc.EnableServer {
go util.Until(func() {
//這裡建立了一個http server,預設監聽在0.0.0.0:10250上,可以通過http的get request獲取不少資訊
k.ListenAndServe(kc.Address, kc.Port, kc.TLSOptions, kc.EnableDebuggingHandlers)
}, 0, util.NeverStop)
}
if kc.ReadOnlyPort > 0 {
go util.Until(func() {
//這裡也建立了一個http server,該server提供給heapster來收集當前kubelet的metris資訊
k.ListenAndServeReadOnly(kc.Address, kc.ReadOnlyPort)
}, 0, util.NeverStop)
}
}
startKubelet(k, podCfg, kcfg)函式重要的點在於go util.Until(func() { k.Run(podCfg.Updates()) }, 0, util.NeverStop)**[7]**,啟動一個協程,讓kubelet處理來自於Pod Source的Pod Update訊息
8、pc = makePodSourceConfig(kc) 解析
pc存放著剛獲取到的pod資源;
重點---pod 的管理過程:(生產過程)
apiServer負責接收server發過來的Pod管理資訊,通過channel推送到PodConfig。
PodConfig的mux使用Storage的Merge方法,Merge方法又會通過Updates 這個channel將Pod資料推給Kubelet,真正的Pod處理在Kubelet包中。
Kubelet通過syncLoop監聽channel,收到資料後就執行關於Pod和容器的處理,真正操作容器的方法是呼叫dockertools包中的方法。
生產方式有3種,定義在pkg/kubelet/config/apiserver.go file.go http.go 三個檔案中。
生產者:通過三種方式進行pod資訊的獲取,也就是生產者,通過chan的方式法送給消費者。
func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
//這裡構建了一個chan,並作為返回值返回給了上一級呼叫者。
/*
建立一個PodConfig物件,並根據啟動引數中的Pod Source是否提供,來建立對應型別的Pod Source物件
Pod Source在各種協程中執行,拉去pod資訊並彙總輸出到同一個Pod Channel中等待kubelet處理。
makePodSourceConfig(kc *KubeletConfig) 負責建立PodConfig物件
*/
/*
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
定義在pkg/kubelet/config/config.go中
cfg是一個傳送管道!!!!生產者->消費者kubelet
此函式建立PodConfig物件。它建立起了apiServer到後端kubelet處理訊息之間的聯絡。
建立一個pods source的storage然後封裝在config裡!
完成podcfg的初始化工作
PodConfigNotificationIncremental是傳遞增加,更新和刪除的訊息的標識,這樣建立完成一個PodConfig例項。
*/
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kc.Recorder)
//下面分別定義了3中模式,HostNetworkSources=api,http,file
// define file config source
if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
/*
加入一個來自本地file/directory的pods源頭,這個configFile預設是沒有的,
如果需要在kubelet初始化的時候建立一些static pods,就可以使用這種方式。
這種pods可以被更新,使用FileCheckFrequency和HTTPCheckFrequency來控制更新後被重新建立的頻率
cfg.Channel(kubelet.FileSource) 把資源放進cfg中
*/
config.NewSourceFile(kc.ConfigFile, kc.NodeName, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
}
//ManifestURL:獲取pod定義的url地址
// define url config source
if kc.ManifestURL != "" {
glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)
/*
從一個url來獲取pod,但只能有一個pod
這種pods可以被更新,使用FileCheckFrequency和HTTPCheckFrequency來控制更新後被重新建立的頻率
*/
config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if kc.KubeClient != nil {
glog.Infof("Watching apiserver")
/*
!!!重頭戲,所有來自API Server建立的pods都會通過這條路徑被kubelet監測到並做相關操作。
*/
config.NewSourceApiserver(kc.KubeClient, kc.NodeName, cfg.Channel(kubelet.ApiserverSource))
}
return cfg
}
三種獲取pod方式的具體實現在後面再進行解析,定義在pkg/kubelet/config/apiserver.go file.go http.go 三個檔案中。
9、解析上面[7]k.Run(podCfg.Updates())
Run函式定義在pkg\kubelet\kubelet.go func (kl *Kubelet) Run(updates <-chan PodUpdate)
/*
重點---pod 的管理過程(消費):
Kubelet的Run方法迴圈監聽updates channel上的訊息。當收到訊息時就進行處理。
以增加一個pod為例,HandlePodAddition的dispatchWork最終會呼叫到dockertools包中的方法,進行Pod內容器的操作。
此處的呼叫關係比較深
*/
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan PodUpdate) {
//真正的Run的執行體,這個是在之前的k8s.io\kubernetes\cmd\kubelet\app\中的startKubelet中執行的
//啟動一個http的log server,可以用於檢視/var/log目錄下的檔案;
if kl.logServer == nil {
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
}
if kl.kubeClient == nil {
glog.Warning("No api server defined - no node status update will be sent.")
}
//把kubelet遷移到一個cgroups的容器裡;
// Move Kubelet to a container.
if kl.resourceContainer != "" {
// Fixme: I need to reside inside ContainerManager interface.
err := util.RunInResourceContainer(kl.resourceContainer)
if err != nil {
glog.Warningf("Failed to move Kubelet to container %q: %v", kl.resourceContainer, err)
}
glog.Infof("Running in container %q", kl.resourceContainer)
}
/*
start各種上面建立的manager
分別啟動如下元件(下面說明以docker container作為管理物件為主):
imageManager, 同步本地快取的image資訊和記憶體中的快取資訊一致
cadvisor,啟動並且通過埠暴露api
containerManager,確保container是否正常狀態
oomWatcher 通過cadvisor獲得記憶體的使用資訊,並且廣播給api-server
*/
if err := kl.imageManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ImageManager %v", err)
glog.Errorf("Failed to start ImageManager, images may not be garbage collected: %v", err)
}
if err := kl.cadvisor.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start CAdvisor %v", err)
glog.Errorf("Failed to start CAdvisor, system may not be properly monitored: %v", err)
}
if err := kl.containerManager.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start ContainerManager %v", err)
glog.Errorf("Failed to start ContainerManager, system may not be properly isolated: %v", err)
}
if err := kl.oomWatcher.Start(kl.nodeRef); err != nil {
kl.recorder.Eventf(kl.nodeRef, "KubeletSetupFailed", "Failed to start OOM watcher %v", err)
glog.Errorf("Failed to start OOM watching: %v", err)
}
/*
The container runtime to use. Possible values: 'docker', 'rkt'
啟動對應的runtime
開一個goroutine來更新docker daemon的啟動狀態:go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
*/
go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop)
// Start a goroutine responsible for killing pods (that are not properly
// handled by pod workers).
go util.Until(kl.podKiller, 1*time.Second, util.NeverStop)
// Run the system oom watcher forever.
/*
kl.statusManager.Start()
從apiserver 同步pod的資訊 ;Starting to sync pod status with apiserver
定義在pkg/kubelet/status/manager.go
*/
kl.statusManager.Start()
/*
*******重點了解**********
kl.syncLoop(updates, kl) 處理updates 也就是PodConfig產生的updates
這個方法會使用一個forever loop 永不結束
*/
kl.syncLoop(updates, kl)
}
Run(updates <-chan PodUpdate)函式中重要的是最後的kl.syncLoop(updates, kl)
10、kl.syncLoop(updates, kl)解析
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
/*
譯:syncLoop是處理更改的主迴圈。 它監視來自三個通道(file,apiserver和http)的更改,並建立它們的並集(merge)。
對於看到的任何新更改,將針對所需狀態和執行狀態運行同步。
如果配置沒有發生變化,將在每個同步頻率秒同步最後一個已知的期望狀態。 永遠不會返回。
*/
func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
//其中的update是被傳遞下來的pod資訊,handler其實就是kubelet自身
glog.Info("Starting kubelet main sync loop.")
kl.resyncTicker = time.NewTicker(kl.resyncInterval)
var housekeepingTimestamp time.Time
for {
//先判斷docker deamon和network是否就緒,如果沒有,就一直等待直到就緒
if !kl.containerRuntimeUp() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, container runtime is not up.")
continue
}
if !kl.doneNetworkConfigure() {
time.Sleep(5 * time.Second)
glog.Infof("Skipping pod synchronization, network is not configured")
continue
}
// Make sure we sync first to receive the pods from the sources before
// performing housekeeping.
// before performing housekeeping,確保一開始先從三個源頭同步獲取到pod的資料
//重要的函式,真正處理chan的地方
if !kl.syncLoopIteration(updates, handler) {**[8]**
break
}
// We don't want to perform housekeeping too often, so we set a minimum
// period for it. Housekeeping would be performed at least once every
// kl.resyncInterval, and *no* more than once every
// housekeepingMinimumPeriod.
// TODO (#13418): Investigate whether we can/should spawn a dedicated
// goroutine for housekeeping
if !kl.sourcesReady() {
// If the sources aren't ready, skip housekeeping, as we may
// accidentally delete pods from unready sources.
glog.V(4).Infof("Skipping cleanup, sources aren't ready yet.")
} else if housekeepingTimestamp.IsZero() {
housekeepingTimestamp = time.Now()
} else if time.Since(housekeepingTimestamp) > housekeepingMinimumPeriod {
glog.V(4).Infof("SyncLoop (housekeeping)")
if err := handler.HandlePodCleanups(); err != nil {
glog.Errorf("Failed cleaning pods: %v", err)
}
housekeepingTimestamp = time.Now()
}
}
}
重要的地方在於[8]syncLoopIteration(updates, handler)
11、syncLoopIteration(updates, handler)解析
/*
重點---pod 的管理過程:
apiServer負責接收server發過來的Pod管理資訊,通過channel推送到PodConfig。
PodConfig的mux使用Storage的Merge方法,Merge方法又會通過Updates 這個channel將Pod資料推給Kubelet,真正的Pod處理在Kubelet包中。
Kubelet通過syncLoop監聽channel,收到資料後就執行關於Pod和容器的處理,真正操作容器的方法呼叫dockertools包中的方法
*/
func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandler) bool {
//負責消費PodUpdate,handler其實就是kubelet自身
//從傳送管道中,獲取到pod資訊,然後根據pod的型別,分別呼叫了不同的處理介面。
kl.syncLoopMonitor.Store(time.Now())
select {
case u, open := <-updates:
if !open {
glog.Errorf("Update channel is closed. Exiting the sync loop.")
return false
}
switch u.Op {
case ADD:
glog.V(2).Infof("SyncLoop (ADD): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodAdditions(u.Pods)
case UPDATE:
glog.V(2).Infof("SyncLoop (UPDATE): %q", kubeletUtil.FormatPodNames(u.Pods))
//定義在下面的func (kl *Kubelet) HandlePodUpdates(pods []*api.Pod)
handler.HandlePodUpdates(u.Pods)
case REMOVE:
glog.V(2).Infof("SyncLoop (REMOVE): %q", kubeletUtil.FormatPodNames(u.Pods))
handler.HandlePodDeletions(u.Pods)
case SET:
// TODO: Do we want to support this?
glog.Errorf("Kubelet does not support snapshot update")
}
case <-kl.resyncTicker.C:
// Periodically syncs all the pods and performs cleanup tasks.
glog.V(4).Infof("SyncLoop (periodic sync)")
handler.HandlePodSyncs(kl.podManager.GetPods())
}
kl.syncLoopMonitor.Store(time.Now())
return true
}
這就是kubelet的主體流程部分。
後面將進一步解析pod獲取的三種方式、syncLoopIteration中如何具體執行相關的pod
一個生產消費者模型(通過一個通道chan)