[原始碼解析] 深度學習分散式訓練框架 horovod (19) --- kubeflow MPI-operator

目錄

0x00 摘要

Horovod 是一款基於 AllReduce 的分散式訓練框架。憑藉其對 TensorFlow、PyTorch 等主流深度學習框架的支援,以及通訊優化等特點,Horovod 被廣泛應用於資料並行的訓練中。

本文是 horovod on k8s 的中間階段,是 Horovod 的主要相關部分,看看 Horovod on K8S 在社群內如何實現。

本文目的是:藉著分析學習 Horovod on K8S 功能,把相關概念梳理一遍,期望可以從中找出設計思路。所以成文方式是:整理學習了很多網上文章,然後自己分析程式碼。特此對各位作者深表感謝。

本系列其他文章連結如下:

[原始碼解析] 深度學習分散式訓練框架 Horovod (1) --- 基礎知識

[原始碼解析] 深度學習分散式訓練框架 horovod (2) --- 從使用者角度切入

[原始碼解析] 深度學習分散式訓練框架 horovod (3) --- Horovodrun背後做了什麼

[原始碼解析] 深度學習分散式訓練框架 horovod (4) --- 網路基礎 & Driver

[原始碼解析] 深度學習分散式訓練框架 horovod (5) --- 融合框架

[原始碼解析] 深度學習分散式訓練框架 horovod (6) --- 後臺執行緒架構

[原始碼解析] 深度學習分散式訓練框架 horovod (7) --- DistributedOptimizer

[原始碼解析] 深度學習分散式訓練框架 horovod (8) --- on spark

[原始碼解析] 深度學習分散式訓練框架 horovod (9) --- 啟動 on spark

[原始碼解析] 深度學習分散式訓練框架 horovod (10) --- run on spark

[原始碼解析] 深度學習分散式訓練框架 horovod (11) --- on spark --- GLOO 方案

[原始碼解析] 深度學習分散式訓練框架 horovod (12) --- 彈性訓練總體架構

[原始碼解析] 深度學習分散式訓練框架 horovod (13) --- 彈性訓練之 Driver

[原始碼解析] 深度學習分散式訓練框架 horovod (14) --- 彈性訓練發現節點 & State

[原始碼解析] 深度學習分散式訓練框架 horovod (15) --- 廣播 & 通知

[原始碼解析] 深度學習分散式訓練框架 horovod (16) --- 彈性訓練之Worker生命週期

[原始碼解析] 深度學習分散式訓練框架 horovod (17) --- 彈性訓練之容錯

[原始碼解析] 深度學習分散式訓練框架 horovod (18) --- kubeflow tf-operator

0x01 背景知識

首先,K8S 和 Kube-flow 部分請參見前文。

1.1 MPI

MPI(Message Passing Interface) 是一種可以支援點對點和廣播的通訊協議,具體實現的庫有很多,使用比較流行的包括 Open MpiIntel MPI 等等。

MPI 是一種訊息傳遞程式設計模型。訊息傳遞指使用者必須通過顯式地傳送和接收訊息來實現處理器間的資料交換。在這種並行程式設計中,每個控制流均有自己獨立的地址空間,不同的控制流之間不能直接訪問彼此的地址空間,必須通過顯式的訊息傳遞來實現。這種程式設計方式是大規模並行處理機(MPP)和機群(Cluster)採用的主要程式設計方式。由於訊息傳遞程式設計要求使用者很好地分解問題,組織不同控制流間的資料交換,平行計算粒度大,特別適合於大規模可擴充套件並行演算法。

MPI 是基於程序的並行環境。程序擁有獨立的虛擬地址空間和處理器排程,並且執行相互獨立。MPI 設計為支援通過網路連線的機群系統,且通過訊息傳遞來實現通訊,訊息傳遞是 MPI 的最基本特色。

1.2 Open-MPI

OpenMPI 是一種高效能訊息傳遞庫,最初是作為融合的技術和資源從其他幾個專案(FT-MPI, LA-MPI, LAM/MPI, 以及 PACX-MPI),它是 MPI-2 標準的一個開源實現,由一些科研機構和企業一起開發和維護。

因此,OpenMPI 能夠從高效能社群中獲得專業技術、工業技術和資源支援,來建立最好的 MPI 庫。OpenMPI 提供給系統和軟體供應商、程式開發者和研究人員很多便利。易於使用,並執行本身在各種各樣的作業系統,網路互連,以及排程系統。

1.3 MPI Operator

MPI OperatorKubeflow 的一個元件,是 Kubeflow 社群貢獻的另一個關於深度/機器學習的一個 Operator,主要就是為了 MPI 任務或者 Horovod 任務提供了一個多機管理工作。

Kubeflow 提供 mpi-operator,可使 allreduce 樣式的分散式訓練像在單個節點上進行培訓一樣簡單。

我們可以輕鬆地在 Kubernetes 上執行 allreduce 樣式的分散式訓練。在作業系統上安裝ksonnet 後,可安裝 MPI Operator。其後將安裝 MPIJob 和作業控制器,最後可以將 MPIJob 提交到 Kubernetes 叢集。

對於使用者,只要建立一個 MPIJob 的自定義資源物件,在 Template 配置好 Launcher 和 Worker 的相關資訊,就相當於描述好一個分散式訓練程式的執行過程了。

Mpi-operator 可以做到開箱即用,但是在生產叢集的應用,面對一些固定場景和業務的時候會有一定的限制。

我們看看其 Dockerfile,可以看到安裝了 MPI,hovorod 等等軟體。

# Install TensorFlow, Keras, PyTorch and MXNet
RUN pip install future typing
RUN pip install numpy \
tensorflow==${TENSORFLOW_VERSION} \
keras \
h5py
RUN pip install torch==${PYTORCH_VERSION} torchvision==${TORCHVISION_VERSION}
RUN pip install mxnet==${MXNET_VERSION} # Install Open MPI
RUN mkdir /tmp/openmpi && \
cd /tmp/openmpi && \
wget https://www.open-mpi.org/software/ompi/v4.0/downloads/openmpi-4.0.0.tar.gz && \
tar zxf openmpi-4.0.0.tar.gz && \
cd openmpi-4.0.0 && \
./configure --enable-orterun-prefix-by-default && \
make -j $(nproc) all && \
make install && \
ldconfig && \
rm -rf /tmp/openmpi # Install Horovod
RUN HOROVOD_WITH_TENSORFLOW=1 HOROVOD_WITH_PYTORCH=1 HOROVOD_WITH_MXNET=1 \
pip install --no-cache-dir horovod==${HOROVOD_VERSION}

0x02 設計思路

目前社群在 mpi-operator 主要用於 allreduce-style 的分散式訓練,因為 mpi-operator 本質上就是給使用者管理好多個程序之間的關係,所以天然支援的框架很多,包括 Horovod, TensorFlow, PyTorch, Apache MXNet 等等。

而 mpi-operator 的基本架構是通過 Mpi-job 的這種自定義資源物件來描述分散式機器學習的訓練任務,同時實現了 Mpijob 的 Controller 來控制,其中分為 Launcher 和 Worker 這兩種型別的工作負荷

其特點如下:

  • 為Horovod/MPI多機訓練準備的Operator

  • 多機任務分為多種角色

    • Launcher
    • Worker-N
  • 每個任務通過特定的RBAC

  • 每個任務會設定rsh_agent以及hostfile

  • Launcher中init-container會等worker就位後

2.1 架構圖

其架構圖如下:

2.2 角色

主要分兩種角色。

  • Worker 本質上是 StatefulSet,在分散式訓練的過程中,訓練任務通常是有狀態的,StatefulSet 正是管理這些的 Workload 的物件。
  • Launcher 相當於一個啟動器的角色,它會等Worker都就位之後,去啟動MPI的任務。通常會是一個比較輕量化的 Job,他主要完成幾條命令的傳送就可以了,通常是把命令通過 ssh/rsh 來發送接受命令,在 mpi-operator 裡使用的是 kubectl 來給 Worker 傳送命令。

這裡我們有了一個疑問,為什麼 MPI-Operator 於 TF-Operator 相比沒有 service 概念?

應該是因為 MPI-Operator 都是內部運作,不需要外部訪問,所以不需要新增 Service。

即 MPI-Operator 用這個啟動,就不需要service 了。因為 MPI-Operator 利用 API 獲得了 pod 資訊,kubectl-delivery 的已經將 kubectl 放入到 Launcher 容器內,之後可以通過 kubectl 來給 Worker 傳送 mpirun 的命令

%s/kubectl cp %s/hosts ${POD_NAME}:/etc/hosts_of_nodes
%s/kubectl exec ${POD_NAME},

2.3 主要過程

其主要過程包括:

  1. MPIJob Controller 會根據每一份 MPIJob 的配置,生成一個 launcher pod 和對應個數的 worker pod;
  2. MPIJob Controller 會針對每一份 MPIJob 生成一份 ConfigMap,其中包含兩份指令碼,一為反應該任務所有 worker pod 的 hostfile,一為 kubexec.sh 指令碼;
  3. Launcher pod 上的 mpirun 會利用由 ConfigMap 中的 kubexel 在 worker pod 中拉起程序;需要注意的是,kubectl的執行有賴於 MPIJob Controller 預先建立的 RBAC 資源(如果對應的 Role 中沒有給 launcher pod 配置在 worker pod 上的執行許可權,launcher pod 在執行kubectl exec` 時會被拒絕);

2.4 CRD 的定義

Mpi-operator 裡面擴展出來的新 CRD,名為MPIJob,他的具體定義可以在這裡找到:mpijob-v1alpha2-crd.yaml.

對於使用者,只要建立一個 Mpijob 的自定義資源物件,在 Template 配置好 LauncherWorker 的相關資訊,就相當於描述好一個分散式訓練程式的執行過程了。

簡單介紹下該新 CRD Spec 的組成:

  • launcher:目前只是一個,只執行啟動 mpijob 的 pod,不執行 workload;
  • worker:可以是一個也可以是多個,真正執行 workload 的 Pod;
  • slotsPerWorker:每個 worker 執行的 slots 數目;
  • backoffLimit:最多重試次數;
  • cleanPodPolicy:任務結束時,清除 Pod 策略;
  • runPolicy:多機任務執行策略;

具體Spec樣例如下:

apiVersion: kubeflow.org/v1alpha2
kind: MPIJob
metadata:
name: tensorflow-mnist
spec:
slotsPerWorker: 1
cleanPodPolicy: Running
mpiReplicaSpecs:
Launcher:
replicas: 1
template:
spec:
containers:
- image: horovod-cpu:latest
name: mpi-launcher
command:
- mpirun
args:
- -np
- "2"
- --allow-run-as-root
- -bind-to
- none
- -map-by
- slot
- -x
- LD_LIBRARY_PATH
- -x
- PATH
- -mca
- pml
- ob1
- -mca
- btl
- ^openib
- python
- /examples/tensorflow_mnist.py
resources:
limits:
cpu: 1
memory: 2Gi
Worker:
replicas: 2
template:
spec:
containers:
- command:
- ""
image: horovod-cpu:latest
name: mpi-worker
resources:
limits:
cpu: 2
memory: 4Gi

有了 MPI-Operator 的定義,就可以具體執行。一般來說新的 CRD 都是無法複用 Kubernetes 現有資源型別的情況,那麼就會通過 operator 進行轉換,轉換成 Kubernetes 可以識別的資源型別。

  • 比如上面的Launcher 會被轉換成 Kubernetes 中的 job 資源型別。
  • worker 會被轉換成 Kubernetes 中的 Statefulset,進而通過 informers 的機制來監控 Kubernetes 中的 Job 和 Statefulset 這兩個資源更新 MPIJob 的資源狀態。

下面我們以兩個典型的操作來介紹如何執行的:

2.5 建立

當用戶建立了一個 MPIJob,其中包含一個 Launcher,2 個 Worker 這樣的配置,進行多機訓練時,當是如何進行的呢?下面依次介紹:

  • 與一般的 controller 寫法相同,監聽 MPIJob 建立,並將其放入佇列中;
  • 多執行緒從佇列中去處新的 mpijob,進行處理,判斷 launcher 和 worker 是否存在,如果不存在就進行建立,具體可以參考這個函式;
  • Mpijob 啟動的順序是先啟動 Worker 再啟動 Launcher
  • 建立 launcher 和 worker 的同時,會在 launcher job 建立時新增一個額外的 init container,這個 init container 主要的工作就是監控所有的 worker 都已經就位了,然後執行執行後面 launcher job 裡面定義的命令;
  • 除此之外,為了能夠幫使用者減少一些額外的配置,基於 worker pod 的名字,會將其加入到一個 configmap 中,並 mount 到每個 pod 中,這樣通過環境變數將 hostfile 設定為這個 mount 的 configmap 路徑,就可以發現多機程式,進而去連結了;
  • 前面也介紹過 rsh agent,預設是用 sshd,這個要設定面祕鑰登入,設定起來會稍顯麻煩,那麼在 Kubernetes 中執行有沒有更簡單的辦法?答案是有的。我們通過 kubectl 命令來達到同樣的效果,參見此處程式碼。在這裡會建立一個可執行程式,然後去通知 worker pod 去執行相應的命令等操作;
  • 其中 kubectl-delivery 的作用主要是將 kubectl 放入到 Launcher 容器內,之後可以通過 kubectl 來給 Worker 傳送 mpirun 的命令。

至此,mpijob 就被轉換成 Kubernetes 可以識別的型別,並開始運行了。

2.6 終止

MPIJob 的終止 終止分為兩種型別,分別是正確,或者是出錯了。

  • 針對正常終止,Launcher Job 的狀態會變成 Completed 狀態,mpi-operator 會發現監聽的 job 狀態變化,進而去找到對應的 mpijob,並更新其狀態,程式碼在這裡;
  • 針對異常終止,某一個 worker 或者 launcher 出現了錯誤,那麼會進行重試(筆者注:這裡面的重試其實沒有意義),如果超過了backoffLimit,那麼就會認為是 failed 狀態,執行上面步驟中同樣的函式,並更新 mpijob 狀態為 failed;
  • 當 mpijob 終止了,就會通過cleanPodPolicy去刪除沒用的 pod;

0x03 實現

3.1 K8S CRD 基本概念

首先,我們需要介紹下 K8S 一些概念,我們程式設計主要涉及這麼幾個概念:

  1. informer:監聽apiserver中特定資源變化,然後會儲存到一個執行緒安全的local cache中,最後回撥我們自己實現的event handler。

  2. local cache:informer實時同步apiserver(也就是etcd)中的資料到記憶體中儲存,可以有效降低apiserver的查詢壓力,但缺點就是實時性不好,本地會比遠端的資料落後一點點但會最終與etcd一致,所以需要根據情況具體分析是走Local cache還是apiserver實時獲取資料。

  3. Lister:提供了CURD操作訪問local cache。

  4. controller:一個邏輯概念,就是指排程某種資源的實現而已,需要我們自己開發。Controller做的事情主要包括:

    1. 實現event handler處理資源的CURD操作
    2. 在event handler,可以使用workqueue類庫實現相同資源物件的連續event的去重,以及event處理異常後的失敗重試,通常是建議使用的。
  5. Workqueue:一個單獨的類庫,是可選使用的,但通常都會使用,原因上面說了。我們需要在實現event handler的時候把發生變化的資源標識放入workqueue,供下面的processor消費。

  6. Clientset:預設clientset只能CRUD k8s提供的資源型別,比如deployments,daemonset等;生成的程式碼為我們自定義的資源(CRD)生成了單獨的clientset,從而讓我們使用結構化的程式碼CURD自定義資源。也就是說,想操作內建資源就用k8s自帶的clientset,想操作CRD就用生成程式碼裡的clientset。

  7. Processor:我們實現的go協程,消費workqueue中的事件,workqueue提供了按資源標識的去重。

3.2 入口

MPI-Operator 的入口是 Run 函式。

  • 這裡最重要的就是使用 NewMPIJobController 來生成一個 controller;
  • 然後呼叫 controller.Run 來執行;
func Run(opt *options.ServerOption) error {
cfg, err := clientcmd.BuildConfigFromFlags(opt.MasterURL, opt.Kubeconfig) // Create clients.
kubeClient, leaderElectionClientSet, mpiJobClientSet, volcanoClientSet, err := createClientSets(cfg) // Add mpi-job-controller types to the default Kubernetes Scheme so Events
// can be logged for mpi-job-controller types.
err = kubeflowScheme.AddToScheme(clientgokubescheme.Scheme) // Set leader election start function.
run := func(ctx context.Context) {
var kubeInformerFactory kubeinformers.SharedInformerFactory
var kubeflowInformerFactory informers.SharedInformerFactory
var volcanoInformerFactory volcanoinformers.SharedInformerFactory
if namespace == metav1.NamespaceAll {
kubeInformerFactory = kubeinformers.NewSharedInformerFactory(kubeClient, 0)
kubeflowInformerFactory = informers.NewSharedInformerFactory(mpiJobClientSet, 0)
volcanoInformerFactory = volcanoinformers.NewSharedInformerFactory(volcanoClientSet, 0)
} else {
kubeInformerFactory = kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
kubeflowInformerFactory = informers.NewSharedInformerFactoryWithOptions(mpiJobClientSet, 0, informers.WithNamespace(namespace))
volcanoInformerFactory = volcanoinformers.NewSharedInformerFactoryWithOptions(volcanoClientSet, 0, volcanoinformers.WithNamespace(namespace))
} var podgroupsInformer podgroupsinformer.PodGroupInformer
if opt.GangSchedulingName != "" {
podgroupsInformer = volcanoInformerFactory.Scheduling().V1beta1().PodGroups()
}
controller := controllersv1.NewMPIJobController(
kubeClient,
mpiJobClientSet,
volcanoClientSet,
kubeInformerFactory.Core().V1().ConfigMaps(),
kubeInformerFactory.Core().V1().ServiceAccounts(),
kubeInformerFactory.Rbac().V1().Roles(),
kubeInformerFactory.Rbac().V1().RoleBindings(),
kubeInformerFactory.Core().V1().Pods(),
podgroupsInformer,
kubeflowInformerFactory.Kubeflow().V1().MPIJobs(),
opt.KubectlDeliveryImage,
opt.GangSchedulingName) go kubeInformerFactory.Start(ctx.Done())
go kubeflowInformerFactory.Start(ctx.Done())
if opt.GangSchedulingName != "" {
go volcanoInformerFactory.Start(ctx.Done())
} // Set leader election start function.
isLeader.Set(1)
if err = controller.Run(opt.Threadiness, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
}

3.3 NewMPIJobController

NewMPIJobController 的作用是生成了MPIJobController,並且配置了一系列 Informer。

Informer 的作用是 監聽apiserver中特定資源變化,然後會儲存到一個執行緒安全的local cache中,最後回撥我們自己實現的event handler

這裡基本看名字就可以確認其作用。

// NewMPIJobController returns a new MPIJob controller.
func NewMPIJobController(...) *MPIJobController { // Create event broadcaster.
eventBroadcaster := record.NewBroadcaster() var podgroupsLister podgroupslists.PodGroupLister controller := &MPIJobController{
kubeClient: kubeClient,
kubeflowClient: kubeflowClient,
volcanoClient: volcanoClientSet,
......
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"),
....
} controller.updateStatusHandler = controller.doUpdateJobStatus // Set up an event handler for when MPIJob resources change.
mpiJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addMPIJob,
UpdateFunc: func(old, new interface{}) {
controller.enqueueMPIJob(new)
},
}) configMapInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newConfigMap := new.(*corev1.ConfigMap)
controller.handleObject(new)
}
})
serviceAccountInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newServiceAccount := new.(*corev1.ServiceAccount)
controller.handleObject(new)
}
})
roleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newRole := new.(*rbacv1.Role)
controller.handleObject(new)
}
})
roleBindingInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newRoleBinding := new.(*rbacv1.RoleBinding)
controller.handleObject(new)
}
})
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newPod := new.(*corev1.Pod)
controller.handleObject(new)
}
})
if podgroupsInformer != nil {
podgroupsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.handleObject,
UpdateFunc: func(old, new interface{}) {
newPolicy := new.(*podgroupv1beta1.PodGroup)
controller.handleObject(new)
}
})
}
return controller
}

3.4 MPIJobController

我們順理成章的來到了 MPIJobController,基本如下:

  • 與一般的 controller 寫法相同,監聽 MPIJob 建立,並將其放入佇列中;
  • 多執行緒從佇列中去處新的 mpijob,進行處理,判斷 launcher 和 worker 是否存在,如果不存在就進行建立;

看看其定義,基本就是配置了各種 InformerSynced,Lister。

// MPIJobController is the controller implementation for MPIJob resources.
type MPIJobController struct {
// kubeClient is a standard kubernetes clientset.
kubeClient kubernetes.Interface
// kubeflowClient is a clientset for our own API group.
kubeflowClient clientset.Interface
// volcanoClient is a clientset for volcano.sh API.
volcanoClient volcanoclient.Interface configMapLister corelisters.ConfigMapLister
configMapSynced cache.InformerSynced
serviceAccountLister corelisters.ServiceAccountLister
serviceAccountSynced cache.InformerSynced
roleLister rbaclisters.RoleLister
roleSynced cache.InformerSynced
roleBindingLister rbaclisters.RoleBindingLister
roleBindingSynced cache.InformerSynced
podLister corelisters.PodLister
podSynced cache.InformerSynced
podgroupsLister podgroupslists.PodGroupLister
podgroupsSynced cache.InformerSynced
mpiJobLister listers.MPIJobLister
mpiJobSynced cache.InformerSynced // queue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
queue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
// The container image used to deliver the kubectl binary.
kubectlDeliveryImage string
// Gang scheduler name to use
gangSchedulerName string // To allow injection of updateStatus for testing.
updateStatusHandler func(mpijob *kubeflow.MPIJob) error
}

大致邏輯如下:

+-----------------------------+
| MPIJobController |
| | +---> addMPIJob
| | |
| mpiJobInformer +--------+
| | |
| | +---> enqueueMPIJob
| |
| |
| serviceAccountInformer +-------> handleObject(ServiceAccount)
| |
| |
| roleInformer +-------------> handleObject(Role)
| |
| |
| roleBindingInformer +--------> handleObject(RoleBinding)
| |
| |
| podInformer +--------------> handleObject(Pod)
| |
| |
| podgroupsInformer +----------> handleObject(PodGroup)
| |
+-----------------------------+

3.5 響應 new Job 訊息

上文看到,mpiJobInformer 設定了兩個訊息響應函式,其中 addMPIJob 負責處理新 Job 的生成。

   // Set up an event handler for when MPIJob resources change.
mpiJobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.addMPIJob,
UpdateFunc: func(old, new interface{}) {
controller.enqueueMPIJob(new)
},
})

addMPIJob 的作用就是 生成了一個 kubeflow.MPIJob,然後呼叫 c.enqueueMPIJob 加入到 queue 之中。

// When a mpiJob is added, set the defaults and enqueue the current mpiJob.
func (c *MPIJobController) addMPIJob(obj interface{}) {
mpiJob := obj.(*kubeflow.MPIJob) // Set default for the new mpiJob.
scheme.Scheme.Default(mpiJob)
// Add a created condition.
err := updateMPIJobConditions(mpiJob, common.JobCreated, mpiJobCreatedReason, msg)
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg)
mpiJobsCreatedCount.Inc()
c.enqueueMPIJob(mpiJob)
}

具體如下:

+-----------------------------+                            kubeflow.MPIJob
| MPIJobController |
| | +---> addMPIJob +------------+
| | | |
| mpiJobInformer +--------+ v
| | |
| | +---------------------> enqueueMPIJob
| |
| |
| serviceAccountInformer +-------> handleObject(ServiceAccount)
| |
| |
| roleInformer +-------------> handleObject(Role)
| |
| |
| roleBindingInformer +--------> handleObject(RoleBinding)
| |
| |
| podInformer +--------------> handleObject(Pod)
| |
| |
| podgroupsInformer +----------> handleObject(PodGroup)
| |
+-----------------------------+

enqueueMPIJob 的程式碼如下,其中,c.queue 就是之前設定的 workqueue.NewNamedRateLimitingQueue。

// enqueueMPIJob takes a MPIJob resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than MPIJob.
func (c *MPIJobController) enqueueMPIJob(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.queue.AddRateLimited(key)
}

3.5 主迴圈

上面是建立和監聽,下面是處理訊息了,處理訊息就是在主迴圈之中完成的。

3.5.1 訊息迴圈

在 runWorker 的主迴圈就是一直執行 processNextWorkItem。

// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// work queue.
func (c *KubectlDeliveryController) runWorker() {
for c.processNextWorkItem() {
}
}

3.5.2 processNextWorkItem

processNextWorkItem 從 queue 之中獲取一個訊息,然後呼叫了 syncHandler 進行處理。

// processNextWorkItem will read a single work item off the work queue and
// attempt to process it, by calling the syncHandler.
func (c *KubectlDeliveryController) processNextWorkItem() bool {
obj, shutdown := c.queue.Get()
if shutdown {
return false
} // We wrap this block in a func so we can defer c.queue.Done.
err := func(obj interface{}) error {
defer c.queue.Done(obj)
var key string
var ok bool if key, ok = obj.(string); !ok {
c.queue.Forget(obj)
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// MPIJob resource to be synced.
if err := c.syncHandler(key); err != nil {
c.queue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s", key, err.Error())
} c.queue.Forget(obj)
return nil
}(obj) return true
}

邏輯擴充套件如下:

+-----------------------------+                            kubeflow.MPIJob
| MPIJobController |
| | +---> addMPIJob +------------+ +-------> runWorker +-------+
| | | | | |
| mpiJobInformer +--------+ v | |
| | | | v
| | +---------------------> enqueueMPIJob +---+ processNextWorkItem <--+
| | +
| | | ^
| | | +--------------+ |
| | +---> | | |
| | | workqueue +-------+
| queue +-------------------------------------------------> | |
| | +--------------+
| |
| |
| serviceAccountInformer +-------> handleObject(ServiceAccount)
| |
| |
| roleInformer +-------------> handleObject(Role)
| |
| |
| roleBindingInformer +--------> handleObject(RoleBinding)
| |
| |
| podInformer +--------------> handleObject(Pod)
| |
| |
| podgroupsInformer +----------> handleObject(PodGroup)
| |
+-----------------------------+

手機如下:

3.5.3 syncHandler

processNextWorkItem 主要就是呼叫了 syncHandler,其作用主要是:同步狀態,生成 worker 或者 launcher。

就相當於是建立資源,子資源建立順序如下:

  • 建立 configmap, 包含 discover_host 指令碼 , hostfile 檔案。
  • 建立 workers,包含 service 和 pod。
  • 建立 launcher,掛載 configmap。hostfile 後續會隨著拓撲關係修改。

具體邏輯如下:

  • 依據 namespace/name 得到 MPIJob。
  • 如果已經結束,就刪除 pods;
  • 從 MPIJob 取得 launcher Job;
  • 如果 launcher Job 沒有結束,則
    • 獲得 MPIJob 的 ConfigMap;
    • 獲得 ServiceAccount;
    • 獲得 Role;
    • 獲得 RoleBinding;
    • 獲得 PodGroup;
  • 建立 worker;
  • 如果 launcher 目前為空,則建立 Launcher;Mpijob 啟動的順序是先啟動 Worker 再啟動 Launcher
  • 更新各種狀態

程式碼如下:

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the MPIJob resource
// with the current status of the resource.
func (c *MPIJobController) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name.
namespace, name, err := cache.SplitMetaNamespaceKey(key) // Get the MPIJob with this namespace/name.
sharedJob, err := c.mpiJobLister.MPIJobs(namespace).Get(name) // Whether the job is preempted, and requeue it
requeue := false
// If the MPIJob is terminated, delete its pods according to cleanPodPolicy.
if isFinished(mpiJob.Status) {
if isSucceeded(mpiJob.Status) && isCleanUpPods(mpiJob.Spec.CleanPodPolicy) {
// set worker StatefulSet Replicas to 0.
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
mpiJob.Status.ReplicaStatuses[common.ReplicaType(kubeflow.MPIReplicaTypeWorker)].Active = 0
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
}
if !requeue {
if isFailed(mpiJob.Status) && isCleanUpPods(mpiJob.Spec.CleanPodPolicy) {
// set worker StatefulSet Replicas to 0.
if err := c.deleteWorkerPods(mpiJob); err != nil {
return err
}
}
return c.updateStatusHandler(mpiJob)
} else {
launcher, err := c.getLauncherJob(mpiJob)
if err == nil && launcher != nil && isPodFailed(launcher) {
// In requeue, should delete launcher pod
err = c.kubeClient.CoreV1().Pods(launcher.Namespace).Delete(launcher.Name, &metav1.DeleteOptions{})
}
}
} // Get the launcher Job for this MPIJob.
launcher, err := c.getLauncherJob(mpiJob) var worker []*corev1.Pod
// We're done if the launcher either succeeded or failed.
done := launcher != nil && isPodFinished(launcher)
if !done {
workerSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]
workerReplicas := int32(0)
if workerSpec != nil && workerSpec.Replicas != nil {
workerReplicas = *workerSpec.Replicas
}
isGPULauncher := isGPULauncher(mpiJob) // Get the ConfigMap for this MPIJob.
if config, err := c.getOrCreateConfigMap(mpiJob, workerReplicas, isGPULauncher);
// Get the launcher ServiceAccount for this MPIJob.
if sa, err := c.getOrCreateLauncherServiceAccount(mpiJob); sa == nil || err != nil
// Get the launcher Role for this MPIJob.
if r, err := c.getOrCreateLauncherRole(mpiJob, workerReplicas); r == nil || err != nil
// Get the launcher RoleBinding for this MPIJob.
if rb, err := c.getLauncherRoleBinding(mpiJob); rb == nil || err != nil // Get the PodGroup for this MPIJob
if c.gangSchedulerName != "" {
if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas+1); podgroup == nil || err != nil {
return err
}
} worker, err = c.getOrCreateWorker(mpiJob) if launcher == nil {
launcher, err = c.kubeClient.CoreV1().Pods(namespace).Create(c.newLauncher(mpiJob, c.kubectlDeliveryImage, isGPULauncher))
}
} // Finally, we update the status block of the MPIJob resource to reflect the
// current state of the world.
err = c.updateMPIJobStatus(mpiJob, launcher, worker) return nil
}

邏輯如下

+-----------------------------+                            kubeflow.MPIJob
| MPIJobController |
| | +---> addMPIJob +------------+ +-------> runWorker +-------+
| | | | | |
| mpiJobInformer +--------+ v | |
| | | | v
| | +---------------------> enqueueMPIJob +---+ processNextWorkItem <--+
| | + +
| | | ^ |
| | | +--------------+ | |
| | +---> | | | |
| | | workqueue +-------+ |
| queue +-------------------------------------------------> | | |
| | +--------------+ v
| | syncHandler
| | +
| PodLister | |
| | |
| | v
| | getOrCreateWorker
| serviceAccountInformer +-------> handleObject(ServiceAccount) +
| | |
| | |
| roleInformer +-------------> handleObject(Role) v
| | newLauncher
| | +
| roleBindingInformer +--------> handleObject(RoleBinding) |
| | |
| | v
| podInformer +--------------> handleObject(Pod) updateMPIJobStatus
| |
| |
| podgroupsInformer +----------> handleObject(PodGroup)
| |
+-----------------------------+

手機上如下:

3.5.4 建立 worker

建立 worker分為兩個階段。

3.5.4.1 getOrCreateWorker

首先是 getOrCreateWorker,大致功能如下:

  • 從 spec 之中獲取 Replicas;
  • 依據 job 名字獲取到 selector;
  • 依據 selector 獲取到 pod list;
  • 遍歷 workerReplicas,逐一呼叫 newWorker 生成 pod;
// getOrCreateWorkerStatefulSet gets the worker StatefulSet controlled by this
// MPIJob, or creates one if it doesn't exist.
func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1.Pod, error) {
var (
workerPrefix string = mpiJob.Name + workerSuffix
workerPods []*corev1.Pod = []*corev1.Pod{}
i int32 = 0
workerReplicas *int32
)
if worker, ok := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; ok && worker != nil {
workerReplicas = worker.Replicas
} // Remove Pods when replicas are scaled down
selector, err := workerSelector(mpiJob.Name)
podFullList, err := c.podLister.List(selector) if len(podFullList) > int(*workerReplicas) {
for _, pod := range podFullList {
indexStr := strings.TrimLeft(pod.Name, fmt.Sprintf("%s-", workerPrefix))
index, err := strconv.Atoi(indexStr)
if err == nil {
if index >= int(*workerReplicas) {
err = c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{})
}
}
}
} for ; i < *workerReplicas; i++ {
name := fmt.Sprintf("%s-%d", workerPrefix, i)
pod, err := c.podLister.Pods(mpiJob.Namespace).Get(name) // If the worker Pod doesn't exist, we'll create it.
if errors.IsNotFound(err) {
worker := newWorker(mpiJob, name, c.gangSchedulerName)
pod, err = c.kubeClient.CoreV1().Pods(mpiJob.Namespace).Create(worker)
} workerPods = append(workerPods, pod)
} return workerPods, nil
}
3.5.4.2 newWorker

newWorker 的作用就是建立一個 Pod。

// newWorker creates a new worker StatefulSet for an MPIJob resource. It also
// sets the appropriate OwnerReferences on the resource so handleObject can
// discover the MPIJob resource that 'owns' it.
func newWorker(mpiJob *kubeflow.MPIJob, name, gangSchedulerName string) *corev1.Pod {
labels := defaultWorkerLabels(mpiJob.Name) podSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker].Template.DeepCopy() // keep the labels which are set in PodTemplate
if len(podSpec.Labels) == 0 {
podSpec.Labels = make(map[string]string)
} for key, value := range labels {
podSpec.Labels[key] = value
}
setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]) container := podSpec.Spec.Containers[0]
if len(container.Command) == 0 {
container.Command = []string{"sleep"}
container.Args = []string{"365d"}
} // We need the kubexec.sh script here because Open MPI checks for the path
// in every rank.
container.VolumeMounts = append(container.VolumeMounts, corev1.VolumeMount{
Name: configVolumeName,
MountPath: configMountPath,
})
podSpec.Spec.Containers[0] = container scriptMode := int32(0555)
podSpec.Spec.Volumes = append(podSpec.Spec.Volumes, corev1.Volume{
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: mpiJob.Name + configSuffix,
},
Items: []corev1.KeyToPath{
{
Key: kubexecScriptName,
Path: kubexecScriptName,
Mode: &scriptMode,
},
},
},
},
}) // add SchedulerName to podSpec
if gangSchedulerName != "" {
podSpec.Spec.SchedulerName = gangSchedulerName if podSpec.Annotations == nil {
podSpec.Annotations = map[string]string{}
}
// we create the podGroup with the same name as the mpijob
podSpec.Annotations[podgroupv1beta1.KubeGroupNameAnnotationKey] = mpiJob.Name
} return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: mpiJob.Namespace,
Labels: podSpec.Labels,
Annotations: podSpec.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind),
},
},
Spec: podSpec.Spec,
}
}

3.5.5 建立 Launcher

3.5.5.1 newLauncher

我們先看看 newLauncher,這是主要函式,其主要邏輯就是:

  • 生成了 InitContainers。其目的是:建立 launcher 和 worker 的同時,會在 launcher job 建立時新增一個額外的 InitContainers,這個 InitContainers 主要的工作就是監控所有的 worker 都已經就位了,然後執行執行後面 launcher job 裡面定義的命令;

    • 這裡生成 container 時候,用到了 kubectlDeliveryName,就是 "kubectl-delivery"。其中 kubectl-delivery 的作用主要是將 kubectl 放入到 Launcher 容器內,之後可以通過 kubectl 來給 Worker 傳送 mpirun 的命令;
    • 另外,前面也介紹過 rsh agent,預設是用 sshd,這個要設定面祕鑰登入,設定起來會稍顯麻煩,那麼在 Kubernetes 中執行有沒有更簡單的辦法?答案是有的。我們通過 kubectl 命令來達到同樣的效果,參見此處程式碼。在這裡會建立一個可執行程式,然後去通知 worker pod 去執行相應的命令等操作;
  • 生成 OMPI_MCA_plm_rsh_agent 和 OMPI_MCA_orte_default_hostfile 資訊,這些是配置檔案的地址,分別對應 discovery_hosts.sh 和 /etc/mpi/kubexec.sh ;
  • 生成 Pod;
// newLauncher creates a new launcher Job for an MPIJob resource. It also sets
// the appropriate OwnerReferences on the resource so handleObject can discover
// the MPIJob resource that 'owns' it.
func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, kubectlDeliveryImage string, isGPULauncher bool) *corev1.Pod {
launcherName := mpiJob.Name + launcherSuffix
labels := map[string]string{
labelGroupName: "kubeflow.org",
labelMPIJobName: mpiJob.Name,
labelMPIRoleType: launcher,
} podSpec := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.DeepCopy()
// copy the labels and annotations to pod from PodTemplate
if len(podSpec.Labels) == 0 {
podSpec.Labels = make(map[string]string)
}
for key, value := range labels {
podSpec.Labels[key] = value
}
// add SchedulerName to podSpec
if c.gangSchedulerName != "" {
if podSpec.Spec.SchedulerName != "" && podSpec.Spec.SchedulerName != c.gangSchedulerName {
klog.Warningf("%s scheduler is specified when gang-scheduling is enabled and it will be overwritten", podSpec.Spec.SchedulerName)
}
podSpec.Spec.SchedulerName = c.gangSchedulerName if podSpec.Annotations == nil {
podSpec.Annotations = map[string]string{}
}
// we create the podGroup with the same name as the mpijob
podSpec.Annotations[podgroupv1beta1.KubeGroupNameAnnotationKey] = mpiJob.Name
}
// 監控所有的 worker 都已經就位了,然後執行執行後面 launcher job 裡面定義的命令
podSpec.Spec.ServiceAccountName = launcherName
podSpec.Spec.InitContainers = append(podSpec.Spec.InitContainers, corev1.Container{
Name: kubectlDeliveryName,
Image: kubectlDeliveryImage,
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{
{
Name: kubectlTargetDirEnv,
Value: kubectlMountPath,
},
{
Name: "NAMESPACE",
Value: mpiJob.Namespace,
},
},
VolumeMounts: []corev1.VolumeMount{
{
Name: kubectlVolumeName,
MountPath: kubectlMountPath,
},
{
Name: configVolumeName,
MountPath: configMountPath,
},
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(initContainerCpu),
corev1.ResourceMemory: resource.MustParse(initContainerMem),
corev1.ResourceEphemeralStorage: resource.MustParse(initContainerEphStorage),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(initContainerCpu),
corev1.ResourceMemory: resource.MustParse(initContainerMem),
corev1.ResourceEphemeralStorage: resource.MustParse(initContainerEphStorage),
},
},
}) container := podSpec.Spec.Containers[0]
container.Env = append(container.Env,
corev1.EnvVar{
Name: "OMPI_MCA_plm_rsh_agent",
Value: fmt.Sprintf("%s/%s", configMountPath, kubexecScriptName),
},
corev1.EnvVar{
Name: "OMPI_MCA_orte_default_hostfile",
Value: fmt.Sprintf("%s/%s", configMountPath, hostfileName),
},
) if !isGPULauncher {
container.Env = append(container.Env,
// We overwrite these environment variables so that users will not
// be mistakenly using GPU resources for launcher due to potential
// issues with scheduler/container technologies.
corev1.EnvVar{
Name: "NVIDIA_VISIBLE_DEVICES",
Value: "",
},
corev1.EnvVar{
Name: "NVIDIA_DRIVER_CAPABILITIES",
Value: "",
})
} container.VolumeMounts = append(container.VolumeMounts,
corev1.VolumeMount{
Name: kubectlVolumeName,
MountPath: kubectlMountPath,
},
corev1.VolumeMount{
Name: configVolumeName,
MountPath: configMountPath,
})
podSpec.Spec.Containers[0] = container setRestartPolicy(podSpec, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher]) scriptsMode := int32(0555)
hostfileMode := int32(0444)
podSpec.Spec.Volumes = append(podSpec.Spec.Volumes,
corev1.Volume{
Name: kubectlVolumeName,
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
corev1.Volume{
Name: configVolumeName,
VolumeSource: corev1.VolumeSource{
ConfigMap: &corev1.ConfigMapVolumeSource{
LocalObjectReference: corev1.LocalObjectReference{
Name: mpiJob.Name + configSuffix,
},
Items: []corev1.KeyToPath{
{
Key: kubexecScriptName,
Path: kubexecScriptName,
Mode: &scriptsMode,
},
{
Key: hostfileName,
Path: hostfileName,
Mode: &hostfileMode,
},
{
Key: discoverHostsScriptName,
Path: discoverHostsScriptName,
Mode: &scriptsMode,
},
},
},
},
})
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: launcherName,
Namespace: mpiJob.Namespace,
Labels: podSpec.Labels,
Annotations: podSpec.Annotations,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind),
},
},
Spec: podSpec.Spec,
}
}

邏輯如下:

+-----------------------------+                            kubeflow.MPIJob
| MPIJobController |
| | +---> addMPIJob +------------+ +-------> runWorker +-------+
| | | | | |
| mpiJobInformer +--------+ v | |
| | | | v
| | +---------------------> enqueueMPIJob +---+ processNextWorkItem <--+
| | + +
| | | ^ |
| | | +--------------+ | |
| | +---> | | | |
| | | workqueue +-------+ |
| queue +-------------------------------------------------> | | v
| | +--------------+ syncHandler
| | +
| | |
| PodLister | |
| | v
| | getOrCreateWorker +----> newWorker +----> Pod
| | +
| serviceAccountInformer +-------> handleObject(ServiceAccount) |
| | |
| | v
| roleInformer +-------------> handleObject(Role) +--------------------------+----------------------+
| | | newLauncher |
| | | |
| roleBindingInformer +--------> handleObject(RoleBinding) | OMPI_MCA_plm_rsh_agent +---------> Pod
| | | OMPI_MCA_orte_default_hostfile |
| | | kubexecScript hostfile discoverHostsScript |
| podInformer +--------------> handleObject(Pod) | |
| | +--------------------------+----------------------+
| | |
| podgroupsInformer +----------> handleObject(PodGroup) |
| | v
+-----------------------------+ updateMPIJobStatus

手機如下:

3.5.6 利用 ConfigMap 簡化配置

K8S configMap的主要作用就是為了讓映象 和 配置檔案解耦,以便實現映象的可移植性和可複用性。

因為一個configMap其實就是一系列配置資訊的集合,將來可直接注入到Pod中的容器使用,它通過兩種方式實現給Pod傳遞配置引數:

  • 將環境變數直接定義在configMap中,當Pod啟動時,通過env來引用configMap中定義的環境變數。
  • 將一個完整配置檔案封裝到configMap中,然後通過共享卷的方式掛載到Pod中,實現給應用傳參。

Horovod 為了能夠幫使用者減少一些額外的配置,基於 worker pod 的名字,會將其加入到一個 configmap 中,並 mount 到每個 pod 中,這樣通過環境變數將 hostfile 設定為這個 mount 的 configmap 路徑,就可以發現多機程式,進而去連結了。

3.5.6.1 getOrCreateConfigMap

這個是在 運行了 worker 之後才處理的,即 先執行 worker pod 了,然後在響應訊息時候,再次呼叫 getOrCreateConfigMap 才會有執行的 worker pod 資訊。

  • 這裡會先呼叫 updateDiscoverHostsInConfigMap 生成 discovery host 檔案內容;
  • 然後 newConfigMap 具體生成了 kubectl 的執行命令 和 host file,具體就是 hostfileName 和 kubexecScriptName;
// getOrCreateConfigMap gets the ConfigMap controlled by this MPIJob, or creates
// one if it doesn't exist.
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, isGPULauncher bool) (*corev1.ConfigMap, error) {
newCM := newConfigMap(mpiJob, workerReplicas, isGPULauncher)
podList, err := c.getRunningWorkerPods(mpiJob)
updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, isGPULauncher) cm, err := c.configMapLister.ConfigMaps(mpiJob.Namespace).Get(mpiJob.Name + configSuffix)
// If the ConfigMap doesn't exist, we'll create it.
if errors.IsNotFound(err) {
cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Create(newCM)
} // If the ConfigMap is not controlled by this MPIJob resource, we
// should log a warning to the event recorder and return.
if !metav1.IsControlledBy(cm, mpiJob) {
msg := fmt.Sprintf(MessageResourceExists, cm.Name, cm.Kind)
c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceExists, msg)
return nil, fmt.Errorf(msg)
} // If the ConfigMap is changed, update it
if !reflect.DeepEqual(cm.Data, newCM.Data) {
cm, err = c.kubeClient.CoreV1().ConfigMaps(mpiJob.Namespace).Update(newCM)
} return cm, nil
}
3.5.6.2 newConfigMap

newConfigMap 具體生成了 kubectl 的執行命令,之後可以通過 kubectl 來給 Worker 傳送 mpirun 的命令。

// newConfigMap creates a new ConfigMap containing configurations for an MPIJob
// resource. It also sets the appropriate OwnerReferences on the resource so
// handleObject can discover the MPIJob resource that 'owns' it.
func newConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, isGPULauncher bool) *corev1.ConfigMap {
kubexec := fmt.Sprintf(`#!/bin/sh
set -x
POD_NAME=$1
shift
%s/kubectl exec ${POD_NAME}`, kubectlMountPath)
if len(mpiJob.Spec.MainContainer) > 0 {
kubexec = fmt.Sprintf("%s --container %s", kubexec, mpiJob.Spec.MainContainer)
}
kubexec = fmt.Sprintf("%s -- /bin/sh -c \"$*\"", kubexec) // If no processing unit is specified, default to 1 slot.
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
}
var buffer bytes.Buffer
if isGPULauncher {
buffer.WriteString(fmt.Sprintf("%s%s slots=%d\n", mpiJob.Name, launcherSuffix, slots))
}
for i := 0; i < int(workerReplicas); i++ {
buffer.WriteString(fmt.Sprintf("%s%s-%d slots=%d\n", mpiJob.Name, workerSuffix, i, slots))
} return &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: mpiJob.Name + configSuffix,
Namespace: mpiJob.Namespace,
Labels: map[string]string{
"app": mpiJob.Name,
},
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(mpiJob, kubeflow.SchemeGroupVersionKind),
},
},
Data: map[string]string{
hostfileName: buffer.String(),
kubexecScriptName: kubexec,
},
}
}
3.5.6.3 命令例子

生成的執行命令例子如下:

# Launcher 容器中執行的命令,就是給 Worker 下發 mpirun 的命令
/opt/kube/kubectl exec mpi-ea4304c32617ec5dvx89ht1et9-worker-0 -- /bin/sh -c PATH=/usr/local/bin:$PATH ; export PATH ; LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH ; export LD_LIBRARY_PATH ; DYLD_LIBRARY_PATH=/usr/local/lib:$DYLD_LIBRARY_PATH ; export DYLD_LIBRARY_PATH ; /usr/local/bin/orted -mca ess "env" -mca ess_base_jobid "2828599296" -mca ess_base_vpid 1 -mca ess_base_num_procs "2" -mca orte_node_regex "mpi-ea[4:4304]c32617ec5dvx89ht1et9-launcher-kljzn,mpi-ea[4:4304]c32617ec5dvx89ht1et9-worker-0@0(2)" -mca orte_hnp_uri "2828599296.0;tcp://6.16.105.7:36055" -mca plm "rsh" --tree-spawn -mca orte_parent_uri "2828599296.0;tcp://6.16.105.7:36055" -mca orte_default_hostfile "/etc/mpi/hostfile" -mca plm_rsh_agent "/etc/mpi/kubexec.sh" -mca hwloc_base_binding_policy "none" -mca rmaps_base_mapping_policy "slot" -mca pmix "^s1,s2,cray,isolated"
3.5.6.4 命令說明

有一個問題是:命令中為什麼要有 -- 號

因為kubectl exec可以執行容器命令。

格式為:

kubectl exec -it <podName> -c <containerName> -n <namespace> -- shell comand

例如我們建立一個testfile檔案:

kubectl exec -it <podname> -c <container name> -n <namespace> -- touch /usr/local/testfile

需要注意的是:

shell命令前,要加 -- 號,不然shell命令中的引數,不能識別。否則雖然執行了kubectl exec 命令,但後續的一些操作並沒有在容器內執行,而是在本地執行了。

此時擴充套件如下:

+-----------------------------+                            kubeflow.MPIJob
| MPIJobController |
| | +---> addMPIJob +------------+ +-------> runWorker +-------+
| | | | | |
| mpiJobInformer +--------+ v | |
| | | | v
| | +---------------------> enqueueMPIJob +---+ processNextWorkItem <--+
| | + +
| | | ^ |
| | | +--------------+ | |
| | +---> | | | |
| | | workqueue +-------+ |
| queue +-------------------------------------------------> | | v
| | +--------------+ syncHandler
| | +
| | |
| PodLister | getOrCreateConfigMap |
| | + v
| | | getOrCreateWorker +----> newWorker +----> Pod
| | | +
| serviceAccountInformer +-------> handleObject(ServiceAccount) v |
| | +-------+---------+ |
| | | newConfigMap | v
| roleInformer +-------------> handleObject(Role) | | +-------------+-----------------------------------+
| | | kubectl exec | | newLauncher |
| | | | | |
| roleBindingInformer +--------> handleObject(RoleBinding) +-------+---------+ | OMPI_MCA_plm_rsh_agent +---------> Pod
| | | | OMPI_MCA_orte_default_hostfile |
| | +----------> | kubexecScript hostfile discoverHostsScript |
| podInformer +--------------> handleObject(Pod) | |
| | +--------------------------+----------------------+
| | |
| podgroupsInformer +----------> handleObject(PodGroup) |
| | v
+-----------------------------+ updateMPIJobStatus

手機如下:

0x04 彈性訓練

彈性訓練應該是後來才加入的。我們嘗試著梳理一下過程。

4.1 之前問題

此前,MPI-Operator 和 Elastic Horovod 存在幾個相容性上的問題。

  1. MPI-Operator 尚不提供 discover_hosts.sh,這一點直接導致 Elastic Horovod 無法使用
  2. 當用戶將 worker replicas 調小之後,controller 不會對“額外”的 worker pod 採取任何措施,這會導致 worker pod 無法釋放,訓練任務的例項規模也就無法縮小
  3. 當用戶增大 worker replica 後,controller 並不會為 launcher pod 的 Role 配置新增 worker 的執行許可權,這會導致 launcher pod 上的 horovodrun 在試圖利用 kubectl 在新建立的 worker pod 上執行程序時被 Kubernetes 的許可權管理機制拒絕

4.2 方案

以下應該是騰訊雲團隊提出的方案。

基於這些存在的相容性問題,我們在社群上提出了 Elastic Horovod on MPIJob:https://github.com/kubeflow/mpi-operator/pull/335 。配合對 Horovod 的修改 https://github.com/horovod/horovod/pull/2199 ,能夠在 Kubernetes 上實現 Horovod 的彈性訓練。

在該方案中,最關鍵的問題在於如何在 launcher pod 上實現 discover_hosts.sh 的功能。而在 Kubernetes 上實現該功能的關鍵,在於如何獲取當前處在 Running 狀態的 worker pods。這裡有兩種思路。

1.MPIJob Controller 構建 discover_hosts.sh並通過 ConfigMap 同步至 launcher pod

  • MPIJob Controller 本身就在監聽 pods 相關的資訊,利用 controller 內的 podLister,可以很快地列出每一個 MPIJob 的 worker pods;
  • 根據 pods 的 status.phase,controller 在篩選出 Running 狀態的 worker pods 之後,就可以構建出一份反映當前 worker pods 狀態的 discover_hosts.sh
  • 通過 ConfigMap,controller 可以將 discover_hosts.shhostfilekubexec.sh 指令碼一樣同步至 launcher pod。
  • 利用 launcher pod 內已有的 kubectl 向 APIServer 實時獲取 worker pod 資訊

2.Launcher pod 自身已經綁定了 pods 的 “get” 和 “list” 許可權,通過 kubectl 或者其他 Kubernetes client 的直接呼叫,即可獲取對應 pod 資訊,通過一樣的篩選標準也可以返回 Elastic Horovod 期待的資訊。

考慮到第二種思路無法限制使用者執行 discover_hosts.sh 的頻率,如果使用者執行過於頻繁或是 MPIJob 規模較大的情況下,會對 Kubernetes 叢集造成較大的壓力,第一種思路在管控上更為全面。

一種對思路二的修正是將 kubectl 或是 client 改為一個 podLister 執行在 launcher pod 中,從而降低對 APIServer 的壓力。然而這種方式使得 launcher pod 中運行了兩個程序。當這個 podLister 程序失效時,缺乏合適的機制將其重新拉起,會造成後續的彈性訓練失效。

因此,我們提議中選擇了第一種思路,這樣一來,controller 通過 ConfigMap 將 discover_hosts.sh 同步至 launcher pod 內,並掛載於 /etc/mpi/discover_hosts.sh 下。同時,該提議中也對 controller 針對另外兩個相容性問題做了相應的修改。這些修改並不會影響到非 Elastic 模式的 MPI 任務,使用者只需忽略 discover_hosts.sh 即可。

當然這種方案也存在一定的問題。ConfigMap 同步至 launcher pod 存在一定的延遲。然而一方面,這個延遲時間是 Kubernetes 管理員可以進行調整的。另一方面相比整個訓練所花的時間,同時也相比 Elastic Horovod 在重置上所花的時間,這一部分延遲也是可以接受的。

4.3 實現

4.3.1 定義

discoverHostsScriptName 具體定義在這裡

const (
controllerAgentName = "mpi-job-controller"
configSuffix = "-config"
configVolumeName = "mpi-job-config"
configMountPath = "/etc/mpi"
kubexecScriptName = "kubexec.sh"
hostfileName = "hostfile"
discoverHostsScriptName = "discover_hosts.sh" // 這裡
kubectlDeliveryName = "kubectl-delivery"
kubectlTargetDirEnv = "TARGET_DIR"
kubectlVolumeName = "mpi-job-kubectl"
kubectlMountPath = "/opt/kube"
launcher = "launcher"
worker = "worker"
launcherSuffix = "-launcher"
workerSuffix = "-worker"
gpuResourceNameSuffix = ".com/gpu"
labelGroupName = "group-name"
labelMPIJobName = "mpi-job-name"
labelMPIRoleType = "mpi-job-role"
initContainerCpu = "100m"
initContainerEphStorage = "5Gi"
initContainerMem = "512Mi"
)

4.2 程式碼

// updateDiscoverHostsInConfigMap updates the ConfigMap if the content of `discover_hosts.sh` changes.
func updateDiscoverHostsInConfigMap(configMap *corev1.ConfigMap, mpiJob *kubeflow.MPIJob, runningPods []*corev1.Pod, isGPULauncher bool) {
slots := 1
if mpiJob.Spec.SlotsPerWorker != nil {
slots = int(*mpiJob.Spec.SlotsPerWorker)
} // Sort the slice of Pods to make sure the order of entries in `discover_hosts.sh` is maintained.
sort.Slice(runningPods, func(i, j int) bool {
return runningPods[i].Name < runningPods[j].Name
}) discoverHosts := "#!/bin/sh"
if isGPULauncher {
discoverHosts = fmt.Sprintf("%s\necho %s%s:%d\n", discoverHosts, mpiJob.Name, launcherSuffix, slots)
}
for _, p := range runningPods {
discoverHosts = fmt.Sprintf("%s\necho %s:%d", discoverHosts, p.Name, slots)
} oldDiscoverHosts, exist := configMap.Data[discoverHostsScriptName]
if exist {
if oldDiscoverHosts == discoverHosts {
return
}
}
configMap.Data[discoverHostsScriptName] = discoverHosts
}

具體更新 host 資訊是在 getOrCreateConfigMap 之中。因為這時候知道了 pods 的資訊變化,於是:

  • 利用 controller 內的 podLister,可以很快地列出每一個 MPIJob 的 worker pods;
  • 根據 pods 的 status.phase,controller 在篩選出 Running 狀態的 worker pods 之後,就可以構建出一份反映當前 worker pods 狀態的 discover_hosts.sh
func (c *MPIJobController) getOrCreateConfigMap(mpiJob *kubeflow.MPIJob, workerReplicas int32, isGPULauncher bool) (*corev1.ConfigMap, error) {
newCM := newConfigMap(mpiJob, workerReplicas, isGPULauncher)
podList, err := c.getRunningWorkerPods(mpiJob) updateDiscoverHostsInConfigMap(newCM, mpiJob, podList, isGPULauncher)

0xEE 個人資訊

★★★★★★關於生活和技術的思考★★★★★★

微信公眾賬號:羅西的思考

如果您想及時得到個人撰寫文章的訊息推送,或者想看看個人推薦的技術資料,敬請關注。

0xFF 參考

tensorflow學習筆記(十九):分散式Tensorflow

在 Kubernetes 上彈性深度學習訓練利器-Elastic Training Operator

在阿里雲上搭建Kubeflow Pipelines

開發你的機器學習工作流

像Google一樣構建機器學習系統3 - 利用MPIJob執行ResNet101

揭祕|一探騰訊基於Kubeflow建立的多租戶訓練平臺背後的技術架構

https://blog.csdn.net/weixin_43970890/article/details/113863716

[KubeFlow] MPI-Operator 深度解讀

在 Amazon EKS 上優化分散式深度學習效能的最佳實踐

雲原生AI平臺的加速與實踐

雲原生的彈性 AI 訓練系列之一:基於 AllReduce 的彈性分散式訓練實踐

MPI on Kubernetes

Kubeflow/tf-operator原始碼分析

MPI,OpenMPI 與深度學習

通過shell執行kubectl exec並在對應pod容器內執行shell命令

k8s系列 – CRD自定義資源與Controller實現(完結篇)

TensorFlow分散式全套(原理,部署,例項)

Kubernetes Operator最佳實踐

星辰·太極機器學習平臺-背後技術架構解密