kubernetes的rolling update機制解析
commit:d577db99873cbf04b8e17b78f17ec8f3a27eca30 Date: Fri Apr 10 23:45:36 2015 -0700 ##0.命令列和依賴的基礎知識 Synopsis Perform a rolling update of the given ReplicationController.
Replaces the specified controller with new controller, updating one pod at a time to use the new PodTemplate. The new-controller.json must specify the same namespace as the existing controller and overwrite at least one (common) label in its replicaSelector.
kubectl rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC
Examples // Update pods of frontend-v1 using new controller data in frontend-v2.json. $ kubectl rolling-update frontend-v1 -f frontend-v2.json
// Update pods of frontend-v1 using JSON data passed into stdin. $ cat frontend-v2.json | kubectl rolling-update frontend-v1 -f -
ReplicationController,簡稱rc,是kubernet體系中某一種型別pod的集合,rc有一個關鍵引數叫做replicas,也是就是pod的數量。 那麼rc有什麼用呢?這是為了解決在叢集上一堆pod中有些如果掛了,那麼就在別的宿主機上把容器啟動起來,並讓業務流量匯入到正確啟動的pod上。也就是說,rc保證了叢集服務的可用性,當你有很多個服務啟動在一個叢集中,你需要用程式去監控這些服務的執行狀況,並動態保證服務可用。 rc和pod的對應關係是怎麼樣的?rc通過selector來選擇一些pod作為他的控制範圍。只要pod的標籤(label)符合seletor,則屬於這個rc,下面是pod和rc的示例。 xx-controller.json "spec":{ "replicas":1, "selector":{ "name":"redis", "role":"master" },
xx-pod.json "labels": { "name": "redis" },
kubernetes被我們簡稱為k8s,如果對其中的基礎概念有興趣可以看這篇 ##1.kubctl入口 /cmd/kubectl/kubctl.go func main() { runtime.GOMAXPROCS(runtime.NumCPU()) cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr) if err := cmd.Execute(); err != nil { os.Exit(1) } }
##2.實際呼叫 原始碼在pkg包內,/pkg/kubectl/cmd/cmd.go,每個子命令都實現統一的介面,rollingupdate這行是: cmds.AddCommand(NewCmdRollingUpdate(f, out))
這個函式的實現在:/pkg/kubectl/cmd/rollingupdate.go func NewCmdRollingUpdate(f *cmdutil.Factory, out io.Writer) *cobra.Command { cmd := &cobra.Command{ Use: "rolling-update OLD_CONTROLLER_NAME -f NEW_CONTROLLER_SPEC", // rollingupdate is deprecated. Aliases: []string{"rollingupdate"}, Short: "Perform a rolling update of the given ReplicationController.", Long: rollingUpdate_long, Example: rollingUpdate_example, Run: func(cmd *cobra.Command, args []string) { err := RunRollingUpdate(f, out, cmd, args) cmdutil.CheckErr(err) }, } }
可以看到實際呼叫時的執行函式是RunRollingUpdate,算是進入正題了 func RunRollingUpdate(f *cmdutil.Factory, out io.Writer, cmd *cobra.Command, args []string) error { ... mapper, typer := f.Object() // TODO: use resource.Builder instead obj, err := resource.NewBuilder(mapper, typer, f.ClientMapperForCommand()). NamespaceParam(cmdNamespace).RequireNamespace(). FilenameParam(filename). Do(). Object() if err != nil { return err } newRc, ok := obj.(*api.ReplicationController) if !ok { return cmdutil.UsageError(cmd, "%s does not specify a valid ReplicationController", filename) }
這是建立一個新的rc的程式碼,其中resource是kubneter所有資源(pod,service,rc)的基類。可以看到新的rc從json引數檔案中獲取所有資訊,然後轉義為ReplicationController這個類。 if oldName == newName { return cmdutil.UsageError(cmd, "%s cannot have the same name as the existing ReplicationController %s", filename, oldName) }
var hasLabel bool for key, oldValue := range oldRc.Spec.Selector { if newValue, ok := newRc.Spec.Selector[key]; ok && newValue != oldValue { hasLabel = true break } } if !hasLabel { return cmdutil.UsageError(cmd, "%s must specify a matching key with non-equal value in Selector for %s", filename, oldName) }
這裡可以看到,對於新的rc和舊的rc,有2項限制,一個是新舊名字需要不同,另一個是rc的selector中需要至少有一項的值不一樣。 updater := kubectl.NewRollingUpdater(newRc.Namespace, client)
// fetch rc oldRc, err := client.ReplicationControllers(newRc.Namespace).Get(oldName) if err != nil { return err }
... err = updater.Update(out, oldRc, newRc, period, interval, timeout) if err != nil { return err }
在做rolling update的時候,有兩個條件限制,一個是新的rc的名字需要和舊的不一樣,第二是至少有個一個標籤的值不一樣。其中namespace是k8s用來做多租戶資源隔離的,可以先忽略不計。 ##3. 資料結構和實現 這段程式碼出現了NewRollingUpdater,是在上一層的/pkg/kubectl/rollingupdate.go這個檔案中,更加接近主體了 // RollingUpdater provides methods for updating replicated pods in a predictable, // fault-tolerant way. type RollingUpdater struct { // Client interface for creating and updating controllers c client.Interface // Namespace for resources ns string }
可以看到這裡的RollingUpdater裡面是一個k8s的client的結構來向api server傳送命令 func (r *RollingUpdater) Update(out io.Writer, oldRc, newRc *api.ReplicationController, updatePeriod, interval, timeout time.Duration) error { oldName := oldRc.ObjectMeta.Name newName := newRc.ObjectMeta.Name retry := &RetryParams{interval, timeout} waitForReplicas := &RetryParams{interval, timeout} if newRc.Spec.Replicas <= 0 { return fmt.Errorf("Invalid controller spec for %s; required: > 0 replicas, actual: %s\n", newName, newRc.Spec) } desired := newRc.Spec.Replicas sourceId := fmt.Sprintf("%s:%s", oldName, oldRc.ObjectMeta.UID)
// look for existing newRc, incase this update was previously started but interrupted rc, existing, err := r.getExistingNewRc(sourceId, newName) if existing { fmt.Fprintf(out, "Continuing update with existing controller %s.\n", newName) if err != nil { return err } replicas := rc.ObjectMeta.Annotations[desiredReplicasAnnotation] desired, err = strconv.Atoi(replicas) if err != nil { return fmt.Errorf("Unable to parse annotation for %s: %s=%s", newName, desiredReplicasAnnotation, replicas) } newRc = rc } else { fmt.Fprintf(out, "Creating %s\n", newName) if newRc.ObjectMeta.Annotations == nil { newRc.ObjectMeta.Annotations = map[string]string{} } newRc.ObjectMeta.Annotations[desiredReplicasAnnotation] = fmt.Sprintf("%d", desired) newRc.ObjectMeta.Annotations[sourceIdAnnotation] = sourceId newRc.Spec.Replicas = 0 newRc, err = r.c.ReplicationControllers(r.ns).Create(newRc) if err != nil { return err } } // +1, -1 on oldRc, newRc until newRc has desired number of replicas or oldRc has 0 replicas for newRc.Spec.Replicas < desired && oldRc.Spec.Replicas != 0 { newRc.Spec.Replicas += 1 oldRc.Spec.Replicas -= 1 fmt.Printf("At beginning of loop: %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) fmt.Fprintf(out, "Updating %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas) if err != nil { return err } time.Sleep(updatePeriod) oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas) if err != nil { return err } fmt.Printf("At end of loop: %s replicas: %d, %s replicas: %d\n", oldName, oldRc.Spec.Replicas, newName, newRc.Spec.Replicas) } // delete remaining replicas on oldRc if oldRc.Spec.Replicas != 0 { fmt.Fprintf(out, "Stopping %s replicas: %d -> %d\n", oldName, oldRc.Spec.Replicas, 0) oldRc.Spec.Replicas = 0 oldRc, err = r.resizeAndWait(oldRc, retry, waitForReplicas) // oldRc, err = r.resizeAndWait(oldRc, interval, timeout) if err != nil { return err } } // add remaining replicas on newRc if newRc.Spec.Replicas != desired { fmt.Fprintf(out, "Resizing %s replicas: %d -> %d\n", newName, newRc.Spec.Replicas, desired) newRc.Spec.Replicas = desired newRc, err = r.resizeAndWait(newRc, retry, waitForReplicas) if err != nil { return err } } // Clean up annotations if newRc, err = r.c.ReplicationControllers(r.ns).Get(newName); err != nil { return err } delete(newRc.ObjectMeta.Annotations, sourceIdAnnotation) delete(newRc.ObjectMeta.Annotations, desiredReplicasAnnotation) newRc, err = r.updateAndWait(newRc, interval, timeout) if err != nil { return err } // delete old rc fmt.Fprintf(out, "Update succeeded. Deleting %s\n", oldName) return r.c.ReplicationControllers(r.ns).Delete(oldName)
}
這段程式碼很長,但做的事情很簡單:
如果新的rc沒有被建立,就先創一下,如果已經建立了(在上次的rolling_update中建立了但超時了) 用幾個迴圈,把新的rc的replicas增加上去,舊的rc的replicas降低下來,主要呼叫的函式是resizeAndWait和updateAndWait
##4. 底層呼叫 接上一節的resizeAndWait,程式碼在/pkg/kubectl/resize.go,這裡的具體程式碼就不貼了 其餘的所有呼叫都發生/pkg/client這個目錄下,這是一個http/json的client,主要功能就是向api-server傳送請求 整體來說,上面的wait的實現都是比較土的,就是發一個update請求過去,後面輪詢的呼叫get來檢測狀態是否符合最終需要的狀態。 ##5. 總結 先說一下這三個時間引數的作用: update-period:新rc增加一個pod後,等待這個period,然後從舊rc縮減一個pod poll-interval:這個函式名來源於linux上的poll呼叫,就是每過一個poll-interval,向服務端發起請求,直到這個請求成功或者報失敗 timeout:總操作的超時時間 rolling update主要是客戶端這邊實現的,分析完了,但還是有一些未知的問題,例如:
api-server, cadvisor, kubelet, proxy, etcd這些服務端元件是怎麼互動的?怎麼保證在服務一直可用的情況下增減pod? 是否有可能在pod增減的時候插入自己的一些程式碼或者過程?因為我們目前的架構中沒有使用k8s的proxy,需要自己去呼叫負載均衡的系統給這些pod導流量 對於具體的pod,我們怎麼去做內部程式的健康檢查?在業務不可用的情況下向k8s系統傳送訊息,幹掉這個pod,在別的機器上建立新的來替代。
本文轉移開源中國-kubernetes的rolling update機制解析