1. 程式人生 > >7.深入k8s:任務呼叫Job與CronJob及原始碼分析

7.深入k8s:任務呼叫Job與CronJob及原始碼分析

> 轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com 在使用job中,我會結合原始碼進行一定的講解,我們也可以從原始碼中一窺究竟,一些細節k8s是如何處理的,從而感受k8s的魅力。原始碼版本是[1.19](https://github.com/kubernetes/kubernetes/tree/release-1.19) ![img](https://img.luozhiyun.com/20200823163612.png) ## Job ### Job的基本使用 Job主要是用來任務呼叫,可以一個或多個 Pod,並確保指定數量的 Pod 可以成功執行到程序正常結束。 建立一個Job: ```yaml apiVersion: batch/v1 kind: Job metadata: name: pi spec: template: spec: containers: - name: pi image: perl command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] restartPolicy: Never backoffLimit: 4 ``` 這個Job會建立一個容器,然後執行命令進行π的計算, 然後我們建立這個pod: ```shell $ kubectl create -f job.yaml $ kubectl describe jobs/pi Name: pi Namespace: default Selector: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea Labels: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea job-name=pi Annotations: Parallelism: 1 Completions: 1 ... Pods Statuses: 0 Running / 1 Succeeded / 0 Failed Pod Template: Labels: controller-uid=cf78ebe4-07f9-4234-b8f9-2fe92df352ea job-name=pi Containers: pi: Image: resouer/ubuntu-bc ... Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal SuccessfulCreate 29m job-controller Created pod: pi-g9fs4 Normal Completed 27m job-controller Job completed ``` 可以看到建立物件後,Pod模板中,被自動加上了一個controller-uid=< 一個隨機字串 > 這樣的 Label。而這個 Job 物件本身,則被自動加上了這個 Label 對應的 Selector,從而 保證了 Job 與它所管理的 Pod 之間的匹配關係。這個uid避免了不同Job物件的Pod不會重合。 ```shell $ kubectl get pod NAME READY STATUS RESTARTS AGE pi-g9fs4 0/1 Completed 0 33m $ kubectl describe pod pi-g9fs4 ... Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled 35m default-scheduler Successfully assigned default/pi-g9fs4 to 192.168.13.130 Normal Pulling 35m kubelet, 192.168.13.130 Pulling image "resouer/ubuntu-bc" Normal Pulled 35m kubelet, 192.168.13.130 Successfully pulled image "resouer/ubuntu-bc" Normal Created 35m kubelet, 192.168.13.130 Created container pi Normal Started 35m kubelet, 192.168.13.130 Started container pi ``` 我們可以看到Pod在建立好執行完畢之後會進入到Completed狀態。上面的yaml定義中restartPolicy=Never也保證了這個Pod只會執行一次。 如果建立的Pod執行失敗了,那麼Job Controller會不斷建立一個新的Pod: ```shell $ kubectl get pods NAME READY STATUS RESTARTS AGE pi-55h89 0/1 ContainerCreating 0 2s pi-tqbcz 0/1 Error 0 5s ``` #### 引數說明 **spec.backoffLimit** 我們在上面的欄位中定義了為4,表示重試次數為4。 **restartPolicy** 在執行過程中,可能發生各種系統問題導致的Pod執行失敗,如果設定restartPolicy為OnFailure,那麼在執行中發生的失敗後Job Controller會重啟Pod裡面的容器,而不是建立新的Pod。 還可以設定為Never,表示容器執行失敗之後不會重啟。更多具體的參見[Pod生命週期](https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#example-states)。 **spec.activeDeadlineSeconds** 表示最長執行時間,單位是秒。如: ```yaml spec: backoffLimit: 5 activeDeadlineSeconds: 100 ``` 這樣設定之後會進入pastActiveDeadline進行校驗`job.Spec.ActiveDeadlineSeconds`是不是為空,不是空的話,會比較Pod的執行時間duration是否大於`job.Spec.ActiveDeadlineSeconds`設定的值,如果大於,那麼會標記Pod終止的原因是DeadlineExceeded。 在job Controller的原始碼中,我們可以看到這部分的邏輯: job Controller首先會去校驗任務是不是處理次數是不是超過了BackoffLimit設定,如果沒有超過的話就校驗有沒有設定ActiveDeadlineSeconds,如果設定了的話,就校驗當前job執行時間是否超過了ActiveDeadlineSeconds設定的的時間,超過了那麼會打上標記,表示這個job執行失敗。 ```go ... jobHaveNewFailure := failed > job.Status.Failed exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { // check if the number of pod restart exceeds backoff (for restart OnFailure only) // OR if the number of failed jobs increased since the last syncJob jobFailed = true failureReason = "BackoffLimitExceeded" failureMessage = "Job has reached the specified backoff limit" } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" failureMessage = "Job was active longer than specified deadline" } ... func pastActiveDeadline(job *batch.Job) bool { if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { return false } now := metav1.Now() start := job.Status.StartTime.Time duration := now.Time.Sub(start) allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second return duration >= allowedDuration } ``` ### Job的並行任務 在 Job 物件中,負責並行控制的引數有兩個: 1. `spec.parallelism`表示一個 Job 在任意時間最多可以啟動多少個 Pod 同時執行; 2. `spec.completions`表示Job 的最小完成數。 舉例: ```yaml apiVersion: batch/v1 kind: Job metadata: name: pi spec: parallelism: 2 completions: 4 template: spec: containers: - name: pi image: perl command: ["perl", "-Mbignum=bpi", "-wle", "print bpi(2000)"] restartPolicy: Never backoffLimit: 4 ``` 在建立任務之後,我們可以看到最多隻會有兩個Pod同時執行: ```shell $ kubectl get pod NAME READY STATUS RESTARTS AGE pi-8fsrn 0/1 ContainerCreating 0 30s pi-job-67kwg 0/1 Completed 0 14h pi-wlbm5 0/1 ContainerCreating 0 30s ``` 每當有一個 Pod 完成計算進入 Completed 狀態時,就會有一個新的 Pod 被自動創建出來,並且快速地從 Pending 狀態進入到 ContainerCreating 狀態。 最終我們可以看到job的COMPLETIONS會標記全部完成: ```shell $ kubectl get job NAME COMPLETIONS DURATION AGE pi 4/4 2m52s 2m52s ``` Job Controller中會會根據配置的併發數來確認當前處於 active 的 pods 數量是否合理,如果不合理的話則進行調整。 如果處於 active 狀態的 pods 數大於 job 設定的併發數 `job.Spec.Parallelism`,則併發刪除多餘的 active pods。 ### Job原始碼分析 通過上面的使用例子,我們可以看到job的使用時非常的簡單的,下面我們通過原始碼來理解一下這job的執行邏輯。 核心原始碼位置在job_controller.go中Controller類的syncJob方法中: syncJob方法很長,我還是想要將這個方法拆開來進行說明。 **Controller#syncJob** ```go func (jm *Controller) syncJob(key string) (bool, error) { ... job := *sharedJob // if job was finished previously, we don't want to redo the termination // 如果job已經跑完了,那麼直接返回,避免重跑 if IsJobFinished(&job) { return true, nil } // retrieve the previous number of retry // 獲取job的重試次數 previousRetry := jm.queue.NumRequeues(key) jobNeedsSync := jm.expectations.SatisfiedExpectations(key) //獲取這個job的pod列表 pods, err := jm.getPodsForJob(&job) if err != nil { return false, err } //找到這個job中仍然活躍的pod activePods := controller.FilterActivePods(pods) active := int32(len(activePods)) //獲取job中執行成功的pod數和執行失敗的pod數 succeeded, failed := getStatus(pods) conditions := len(job.Status.Conditions) // job first start //設定job 的啟動時間 if job.Status.StartTime == nil { now := metav1.Now() job.Status.StartTime = &now // enqueue a sync to check if job past ActiveDeadlineSeconds if job.Spec.ActiveDeadlineSeconds != nil { klog.V(4).Infof("Job %s has ActiveDeadlineSeconds will sync after %d seconds", key, *job.Spec.ActiveDeadlineSeconds) jm.queue.AddAfter(key, time.Duration(*job.Spec.ActiveDeadlineSeconds)*time.Second) } } ... } ``` 這部分的程式碼會校驗job是否已經跑完了,如果跑完了直接返回; 然後獲取job的重試次數,以及與job關聯的pod列表,並計算出活躍的pod數量、執行成功的pod數量、以及失敗的pod數量; 接下來如果job是首次啟動,那麼需要設定job的啟動時間。 繼續: ```go func (jm *Controller) syncJob(key string) (bool, error) { ... var manageJobErr error jobFailed := false var failureReason string var failureMessage string //failed次數超過了job.Status.Failed說明有新的pod執行失敗了 jobHaveNewFailure := failed > job.Status.Failed // new failures happen when status does not reflect the failures and active // is different than parallelism, otherwise the previous controller loop // failed updating status so even if we pick up failure it is not a new one //如果有新的pod執行失敗,並且活躍的pod不等於並行Parallelism數 //並且重試次數超過了BackoffLimit exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) && (int32(previousRetry)+1 > *job.Spec.BackoffLimit) //重試次數是否超標 if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { // check if the number of pod restart exceeds backoff (for restart OnFailure only) // OR if the number of failed jobs increased since the last syncJob jobFailed = true failureReason = "BackoffLimitExceeded" failureMessage = "Job has reached the specified backoff limit" // job執行時間是否超過了ActiveDeadlineSeconds } else if pastActiveDeadline(&job) { jobFailed = true failureReason = "DeadlineExceeded" failureMessage = "Job was active longer than specified deadline" } ... } ``` 這段程式碼是用來判斷job是否執行失敗,判斷依據是job重試次數是否超過了BackoffLimit,以及job的執行時間是否超過了設定的ActiveDeadlineSeconds。 上面這裡會獲取上一次執行的Failed次數和這次的job的failed次數進行比較,如果failed多了表示又產生了新的執行失敗的pod。如果執行失敗會標識出失敗原因,以及設定jobFailed為true。 在上面的程式碼中呼叫了pastBackoffLimitOnFailure方法和pastActiveDeadline方法,我們分別看一下: **pastBackoffLimitOnFailure** ```go func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool { //如果RestartPolicy為OnFailure,那麼直接返回 if job.Spec.Template.Spec.RestartPolicy != v1.RestartPolicyOnFailure { return false } result := int32(0) for i := range pods { po := pods[i] //如果pod狀態為Running或Pending //獲取到pod對應的重啟次數以及Container狀態,包含pod中的InitContainer if po.Status.Phase == v1.PodRunning || po.Status.Phase == v1.PodPending { for j := range po.Status.InitContainerStatuses { stat := po.Status.InitContainerStatuses[j] result += stat.RestartCount } for j := range po.Status.ContainerStatuses { stat := po.Status.ContainerStatuses[j] result += stat.RestartCount } } } //如果BackoffLimit等於,那麼只要重啟了一次,則返回true if *job.Spec.BackoffLimit == 0 { return result > 0 } //比較重啟次數是否超過了BackoffLimit return result >= *job.Spec.BackoffLimit } ``` 這個方法會校驗job的RestartPolicy策略,不是OnFailure才繼續往下執行。然後會遍歷pod列表,將pod列表中的重啟次數累加並與BackoffLimit進行比較,超過了則返回true。 **pastActiveDeadline** ```go func pastActiveDeadline(job *batch.Job) bool { if job.Spec.ActiveDeadlineSeconds == nil || job.Status.StartTime == nil { return false } now := metav1.Now() start := job.Status.StartTime.Time duration := now.Time.Sub(start) allowedDuration := time.Duration(*job.Spec.ActiveDeadlineSeconds) * time.Second return duration >= allowedDuration } ``` 這個方法會算出job的執行時間duration,然後和ActiveDeadlineSeconds進行比較,如果超過了則返回true。 我們回到syncJob中繼續往下: ```go func (jm *Controller) syncJob(key string) (bool, error) { ... //job執行失敗 if jobFailed { errCh := make(chan error, active) //將job裡面的active的pod刪除 jm.deleteJobPods(&job, activePods, errCh) select { case manageJobErr = <-errCh: if manageJobErr != nil { break } default: } // update status values accordingly //清空active數 failed += active active = 0 job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, failureReason, failureMessage)) jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) } else { //如果job需要同步,並且job沒有被刪除,則呼叫manageJob進行同步工作 if jobNeedsSync && job.DeletionTimestamp == nil { active, manageJobErr = jm.manageJob(activePods, succeeded, &job) } //完成數等於pod 執行成功的數量 completions := succeeded complete := false //如果沒有設定Completions,那麼只要有pod完成,那麼job就算完成 if job.Spec.Completions == nil { if succeeded >
0 && active == 0 { complete = true } } else { //如果實際完成數大於或等於Completions if completions >= *job.Spec.Completions { complete = true //如果還有pod處於active狀態,傳送EventTypeWarning事件 if active > 0 { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManyActivePods", "Too many active pods running after completion count reached") } //如果實際完成數大於Completions,傳送EventTypeWarning事件 if completions >
*job.Spec.Completions { jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached") } } } //job完成了則更新 job.Status.Conditions 和 job.Status.CompletionTime 欄位 if complete { job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, "", "")) now := metav1.Now() job.Status.CompletionTime = &now jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed") } } ... } ``` 這一段中會根據jobFailed的狀態進行判斷。 如果jobFailed為true則表示這個job執行失敗,需要刪除這個job關聯的所有pod,並且清空active數。 如果jobFailed為false則表示這個job處於非false狀態。如果job需要同步,並且job沒有被刪除,則呼叫manageJob進行同步工作; 接下來會對設定的Completions進行處理,如果Completions沒有設定,那麼只要有一個pod執行完畢,那麼這個pod就算完成; 如果實際完成的pod數量大於completions或仍然有pod處於active中,則傳送相應的事件資訊。最後更新job的狀態為完成。 我們接下來一口氣看看manageJob中這個同步方法裡面做了什麼,這個方法是job管理pod執行數量的核心方法: **Controller#manageJob** ```go func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batch.Job) (int32, error) { ... //如果處於 active 狀態的 pods 數大於 job 設定的併發數 job.Spec.Parallelism if active >
parallelism { //多出的個數 diff := active - parallelism errCh = make(chan error, diff) jm.expectations.ExpectDeletions(jobKey, int(diff)) klog.V(4).Infof("Too many pods running job %q, need %d, deleting %d", jobKey, parallelism, diff) //pods 排序,以便可以優先刪除一些pod: // 判斷 pod 狀態:Not ready < ready // 是否已經被排程:unscheduled< scheduled //判斷 pod phase :pending < running sort.Sort(controller.ActivePods(activePods)) active -= diff wait := sync.WaitGroup{} wait.Add(int(diff)) for i := int32(0); i < diff; i++ { //併發刪除多餘的 active pods go func(ix int32) { defer wait.Done() if err := jm.podControl.DeletePod(job.Namespace, activePods[ix].Name, job); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion jm.expectations.DeletionObserved(jobKey) if !apierrors.IsNotFound(err) { klog.V(2).Infof("Failed to delete %v, decremented expectations for job %q/%q", activePods[ix].Name, job.Namespace, job.Name) activeLock.Lock() active++ activeLock.Unlock() errCh <- err utilruntime.HandleError(err) } } }(i) } wait.Wait() //若處於 active 狀態的 pods 數小於 job 設定的併發數,則需要創建出新的 pod } else if active < parallelism { wantActive := int32(0) //如果沒有宣告Completions,那麼active的pod應該等於parallelism,如果有pod已經完成了,那麼不再建立新的。 if job.Spec.Completions == nil { if succeeded > 0 { wantActive = active } else { wantActive = parallelism } // 如果聲明瞭Completions,那麼需要比較Completions和succeeded // 如果wantActive大於parallelism,那麼需要建立的Pod數等於parallelism } else { // Job specifies a specific number of completions. Therefore, number // active should not ever exceed number of remaining completions. wantActive = *job.Spec.Completions - succeeded if wantActive > parallelism { wantActive = parallelism } } //計算出 diff 數 diff := wantActive - active if diff < 0 { utilruntime.HandleError(fmt.Errorf("More active than wanted: job %q, want %d, have %d", jobKey, wantActive, active)) diff = 0 } //表示已經有足夠的pod,不需要再建立了 if diff == 0 { return active, nil } jm.expectations.ExpectCreations(jobKey, int(diff)) errCh = make(chan error, diff) klog.V(4).Infof("Too few pods running job %q, need %d, creating %d", jobKey, wantActive, diff) active += diff wait := sync.WaitGroup{} //建立的 pod 數依次為 1、2、4、8......,呈指數級增長 for batchSize := int32(integer.IntMin(int(diff), controller.SlowStartInitialBatchSize)); diff > 0; batchSize = integer.Int32Min(2*batchSize, diff) { errorCount := len(errCh) wait.Add(int(batchSize)) for i := int32(0); i < batchSize; i++ { //併發程建立pod go func() { defer wait.Done() //建立pod err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil { ... } //建立失敗的處理 if err != nil { defer utilruntime.HandleError(err) klog.V(2).Infof("Failed creation, decrementing expectations for job %q/%q", job.Namespace, job.Name) jm.expectations.CreationObserved(jobKey) activeLock.Lock() active-- activeLock.Unlock() errCh <- err } }() } wait.Wait() ... diff -= batchSize } } ... return active, nil } ``` 這個方法的邏輯十分的清晰,我們下面擼一擼~ 這段程式碼在開始用一個if判斷來校驗active的pod是否超過了parallelism,如果超過了需要算出超過了多少,存在diff欄位中;然後需要刪除多餘的pod,不過這個時候有個細節的地方,這裡會根據pod的狀態進行排序,會首先刪除一些不是ready狀態、unscheduled、pending狀態的pod; 若active的pod小於parallelism,那麼首先需要判斷Completions,如果沒有被設定,並且已經有pod執行成功了,那麼不需要建立新的pod,否則還是需要建立pod至parallelism指定個數;如果設定了Completions,那麼還需要根據pod完成的數量來做一個判斷需要建立多少新的pod; 如果需要建立的pod數小於active的pod數,那麼直接返回即可; 接下來會在一個for迴圈中迴圈併發建立pod,不過建立的數量是依次指數遞增,避免一下子建立太多pod。 ## 定時任務CronJob ### 基本使用 我們從一個例子開始,如下: ```yaml apiVersion: batch/v1beta1 kind: CronJob metadata: name: hello spec: schedule: "*/1 * * * *" jobTemplate: spec: template: spec: containers: - name: hello image: busybox args: - /bin/sh - -c - date; echo Hello from the Kubernetes cluster restartPolicy: OnFailure ``` 這個CronJob會每分鐘建立一個Pod: ```shell $ kubectl get pod NAME READY STATUS RESTARTS AGE hello-1596406740-tqnlb 0/1 ContainerCreating 0 8s ``` cronjob會記錄最近的排程時間: ```shell $ kubectl get cronjob hello NAME SCHEDULE SUSPEND ACTIVE LAST SCHEDULE AGE hello */1 * * * * False 1 16s 2m33s ``` **spec.concurrencyPolicy** 如果設定的間隔時間太短,那麼可能會導致任務還沒執行完成又建立了新的Pod。所以我們可以通過修改`spec.concurrencyPolicy`來定義處理策略: * Allow,這也是預設情況,這意味著這些 Job 可以同時存在; * Forbid,這意味著不會建立新的 Pod,該建立週期被跳過; * Replace,這意味著新產生的 Job 會替換舊的、沒有執行完的 Job。 如果某一次 Job 建立失敗,這次建立就會被標記為“miss”。當在指定的時間視窗內,miss 的數目達到 100 時,那麼 CronJob 會停止再建立這個 Job。 `spec.startingDeadlineSeconds`可以指定這個時間視窗。startingDeadlineSeconds=200意味著過去 200 s 裡,如果 miss 的數目達到了 100 次,那麼這個 Job 就不會被建立執行了。 ### cronjob原始碼分析 CronJob的原始碼在cronjob_controller.go中,主要實現是在Controller的syncAll方法中。 下面我們看看CronJob是在原始碼中如何建立執行的: **Controller#syncAll** ```go func (jm *Controller) syncAll() { //列出所有的job jobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { return jm.kubeClient.BatchV1().Jobs(metav1.NamespaceAll).List(context.TODO(), opts) } js := make([]batchv1.Job, 0) //遍歷jobListFunc然後將狀態正常的job放入到js集合中 err := pager.New(pager.SimplePageFunc(jobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { jobTmp, ok := object.(*batchv1.Job) if !ok { return fmt.Errorf("expected type *batchv1.Job, got type %T", jobTmp) } js = append(js, *jobTmp) return nil }) ... //列出所有的cronJobs cronJobListFunc := func(opts metav1.ListOptions) (runtime.Object, error) { return jm.kubeClient.BatchV1beta1().CronJobs(metav1.NamespaceAll).List(context.TODO(), opts) } //遍歷所有的jobs,根據ObjectMeta.OwnerReference欄位確定該job是否由cronJob所建立 //key為uid,value為job集合 jobsByCj := groupJobsByParent(js) klog.V(4).Infof("Found %d groups", len(jobsByCj)) //遍歷cronJobs err = pager.New(pager.SimplePageFunc(cronJobListFunc)).EachListItem(context.Background(), metav1.ListOptions{}, func(object runtime.Object) error { cj, ok := object.(*batchv1beta1.CronJob) if !ok { return fmt.Errorf("expected type *batchv1beta1.CronJob, got type %T", cj) } //進行同步 syncOne(cj, jobsByCj[cj.UID], time.Now(), jm.jobControl, jm.cjControl, jm.recorder) //清理所有已經完成的jobs cleanupFinishedJobs(cj, jobsByCj[cj.UID], jm.jobControl, jm.cjControl, jm.recorder) return nil }) if err != nil { utilruntime.HandleError(fmt.Errorf("Failed to extract cronJobs list: %v", err)) return } } ``` syncAll方法會列出所有job以及對應的cronJobs,然後按照cronJobs來進行歸類,然後遍歷這個列表呼叫syncOne方法進行同步,之後再呼叫cleanupFinishedJobs清理所有已經完成的jobs。 然後我們在看看syncOne是具體怎麼處理job的: **syncOne** ```go func syncOne(cj *batchv1beta1.CronJob, js []batchv1.Job, now time.Time, jc jobControlInterface, cjc cjControlInterface, recorder record.EventRecorder) { nameForLog := fmt.Sprintf("%s/%s", cj.Namespace, cj.Name) childrenJobs := make(map[types.UID]bool) //遍歷job列表 for _, j := range js { childrenJobs[j.ObjectMeta.UID] = true //檢視這個job是否是在Active列表中 found := inActiveList(*cj, j.ObjectMeta.UID) //如果這個job不是在Active列表中,並且這個job還沒有跑完,傳送一個異常事件。 if !found && !IsJobFinished(&j) { recorder.Eventf(cj, v1.EventTypeWarning, "UnexpectedJob", "Saw a job that the controller did not create or forgot: %s", j.Name) // 如果該job在Active列表中,並且已經跑完了,那麼從Active列表移除 } else if found && IsJobFinished(&j) { _, status := getFinishedStatus(&j) deleteFromActiveList(cj, j.ObjectMeta.UID) recorder.Eventf(cj, v1.EventTypeNormal, "SawCompletedJob", "Saw completed job: %s, status: %v", j.Name, status) } } //反向再遍歷Active列表,如果存在上面記錄的jobs,那麼就移除 for _, j := range cj.Status.Active { if found := childrenJobs[j.UID]; !found { recorder.Eventf(cj, v1.EventTypeNormal, "MissingJob", "Active job went missing: %v", j.Name) deleteFromActiveList(cj, j.UID) } } //上面做了cronJob的Active列表的修改,所以需要更新一下狀態 updatedCJ, err := cjc.UpdateStatus(cj) if err != nil { klog.Errorf("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) return } *cj = *updatedCJ //cronJob已經被刪除了,直接返回 if cj.DeletionTimestamp != nil { return } //cronJob處於suspend,直接返回 if cj.Spec.Suspend != nil && *cj.Spec.Suspend { klog.V(4).Infof("Not starting job for %s because it is suspended", nameForLog) return } //獲取最近的排程時間 times, err := getRecentUnmetScheduleTimes(*cj, now) if err != nil { recorder.Eventf(cj, v1.EventTypeWarning, "FailedNeedsStart", "Cannot determine if job needs to be started: %v", err) klog.Errorf("Cannot determine if %s needs to be started: %v", nameForLog, err) return } //等於0說明還沒有開始排程 if len(times) == 0 { klog.V(4).Infof("No unmet start times for %s", nameForLog) return } if len(times) > 1 { klog.V(4).Infof("Multiple unmet start times for %s so only starting last one", nameForLog) } //獲取列表中的最後一次時間 scheduledTime := times[len(times)-1] tooLate := false //如果設定了StartingDeadlineSeconds,那麼計算是否滿足條件 if cj.Spec.StartingDeadlineSeconds != nil { tooLate = scheduledTime.Add(time.Second * time.Duration(*cj.Spec.StartingDeadlineSeconds)).Before(now) } if tooLate { klog.V(4).Infof("Missed starting window for %s", nameForLog) recorder.Eventf(cj, v1.EventTypeWarning, "MissSchedule", "Missed scheduled time to start a job: %s", scheduledTime.Format(time.RFC1123Z)) return } //處理concurrencyPolicy策略 //如果設定的是Forbid,並且Active列表大於0,直接return if cj.Spec.ConcurrencyPolicy == batchv1beta1.ForbidConcurrent && len(cj.Status.Active) > 0 { klog.V(4).Infof("Not starting job for %s because of prior execution still running and concurrency policy is Forbid", nameForLog) return } //如果設定的是Replace,則刪除所有的Active列表,等後面重新建立 if cj.Spec.ConcurrencyPolicy == batchv1beta1.ReplaceConcurrent { for _, j := range cj.Status.Active { klog.V(4).Infof("Deleting job %s of %s that was still running at next scheduled start time", j.Name, nameForLog) job, err := jc.GetJob(j.Namespace, j.Name) if err != nil { recorder.Eventf(cj, v1.EventTypeWarning, "FailedGet", "Get job: %v", err) return } if !deleteJob(cj, job, jc, recorder) { return } } } //根據cronJob.spec.JobTemplate填充job的完整資訊 jobReq, err := getJobFromTemplate(cj, scheduledTime) if err != nil { klog.Errorf("Unable to make Job from template in %s: %v", nameForLog, err) return } //建立job jobResp, err := jc.CreateJob(cj.Namespace, jobReq) if err != nil { if !errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { recorder.Eventf(cj, v1.EventTypeWarning, "FailedCreate", "Error creating job: %v", err) } return } klog.V(4).Infof("Created Job %s for %s", jobResp.Name, nameForLog) recorder.Eventf(cj, v1.EventTypeNormal, "SuccessfulCreate", "Created job %v", jobResp.Name) ref, err := getRef(jobResp) if err != nil { klog.V(2).Infof("Unable to make object reference for job for %s", nameForLog) } else { //把建立好的job資訊放入到Active列表中 cj.Status.Active = append(cj.Status.Active, *ref) } cj.Status.LastScheduleTime = &metav1.Time{Time: scheduledTime} if _, err := cjc.UpdateStatus(cj); err != nil { klog.Infof("Unable to update status for %s (rv = %s): %v", nameForLog, cj.ResourceVersion, err) } return } ``` 在syncOne維護了cronJob的Active列表,在遍歷cronJob對應的job列表的時候會判斷該job是不是應該從Active列表中刪除,操作完之後會更新cronJob的狀態。 然後會檢視當千的cronJob是否已被刪除、是否處於suspend狀態、判斷是否最近有job被排程,並獲取最後一次排程時間判斷是否滿足StartingDeadlineSeconds條件等。 接下來會根據ConcurrencyPolicy來判斷是Forbid還是Replace。如果是Forbid那麼直接略過此次排程,如果是Replace那麼會刪除所有的Active列表,等後面重新建立。 最後呼叫CreateJob建立job。 ## 總結 這篇文章我們首先介紹了Job和CronJob的具體使用方法,以及其中需要注意的引數配置,然後通過原始碼來解釋相應的配置會產生什麼樣的結果。例如job來說,如果我們設定的completions小於parallelism,那麼在實際執行的時候實際完成的pod數量是可能超過completions的等等。通過原始碼我們對job以及cronjob也有了一個更好的理解。 ## Reference https://kubernetes.io/docs/concepts/workloads/controllers/job/ https://kubernetes.io/docs/concepts/workloads/controllers/cron-jobs/ https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#example-states https://kubernetes.feisky.xyz/concepts/objects/cronjob https://kubernetes.feisky.xyz/concepts/objects/job 《深入理解k8s》 《k8s in Ac