[原始碼解析] 深度學習分散式訓練框架 horovod (12) --- 彈性訓練總體架構
0x00 摘要
Horovod 是Uber於2017年釋出的一個易於使用的高效能的分散式訓練框架,在業界得到了廣泛應用。
本系列將通過原始碼分析來帶領大家瞭解 Horovod。本文是系列第十二篇,看看horovod 如何實施彈性訓練。
彈性訓練使得Horovod具備執行時worker數量動態伸縮,而不需要重啟 或者 只是從儲存中的checkpoint恢復訓練。
本系列其他文章連結如下:
[原始碼解析] 深度學習分散式訓練框架 Horovod (1) --- 基礎知識
[原始碼解析] 深度學習分散式訓練框架 horovod (2) --- 從使用者角度切入
[原始碼解析] 深度學習分散式訓練框架 horovod (3) --- Horovodrun背後做了什麼
[原始碼解析] 深度學習分散式訓練框架 horovod (4) --- 網路基礎 & Driver
[原始碼解析] 深度學習分散式訓練框架 horovod (5) --- 融合框架
[原始碼解析] 深度學習分散式訓練框架 horovod (6) --- 後臺執行緒架構
[原始碼解析] 深度學習分散式訓練框架 horovod (7) --- DistributedOptimizer
[原始碼解析] 深度學習分散式訓練框架 horovod (8) --- on spark
[原始碼解析] 深度學習分散式訓練框架 horovod (9) --- 啟動 on spark
[原始碼解析] 深度學習分散式訓練框架 horovod (10) --- run on spark
[原始碼解析] 深度學習分散式訓練框架 horovod (11) --- on spark --- GLOO 方案
0x01 總述
1.1 問題點
我們思考下,Horovod 目前遇到了什麼問題?
- 無法自動調節容量(Auto Scale)。
- 因為計算資源也許會有彈性排程,所以應該考慮到如果叢集縮容了怎麼辦?如果擴容了怎麼?理想狀態應該是:在訓練過程中可以自動增加或者減少worker數量。而且在worker數量變化時,不會中斷訓練任務,做到平滑過渡。
- 目前Horovod無法在資源有限的情況下執行。假如一共需要100個GPU,暫時只有40個GPU到位,在這種情況下,Horovod就只能等待,不能用現有的40個GPU先在少量程序上開始訓練,從而無法快速開始模型迭代。
- 資源充裕時,Horovod 無法自動增加程序加速訓練。就上例而言,在理想狀態下,Horovoid應該先用這40個GPU構建一個環來啟動訓練,如果發現60個新GPU到位了就自動動態擴容,從而在下一個 epoch 開始就用100個GPU構建新的環開始訓練;
- 沒有容錯機制(Fault Tolerance)。目前如果某一個節點失敗,整個訓練會失敗,使用者只能從頭開始訓練。如果可以支援 auto scale,加上一些之前陸續儲存的 checkpoint,則Horovod可以重新選取一個好節點啟動這個worker,或者用剩下的節點構建一個環繼續訓練。
- 排程機制不靈活。
- 機器學習訓練任務一般時間較長,佔用算力大,而Horovod任務缺少彈效能力,不支援動態配置 worker,不支援高優先順序搶佔例項。因此當資源不足時,無法按需為其他高優先順序業務騰出資源, 只能等待任務自己主動終止或者出錯終止。
為了解決以上幾個問題,我們會思考很多的其他具體技術問題和細節,讓我們先羅列出來:
- 何時構建 checkpoint?哪一個階段是合適的?每一個 epoch 之後自動儲存?還是由使用者自行控制(這樣可以做到更好的)?
- 如何從 checkpoint恢復?
- checkpoint需要儲存哪些東西,即,對於horovod來說,哪些狀態是必須的?
- 如何監聽 worker 的工作情況?怎麼判斷機器出了問題?假如只是網路阻塞偶爾導致的怎麼辦?
- 需要構建一個通知機制;
- 如何知道叢集的富餘資源?如何發現可用節點?
- 如何構建新的通訊環 ring?
- 如果構建新ring,是由一個 master 完成?還是使用類似 gossip 這樣的協議?
- 是否有優先順序排程,這樣可以充分利用共享叢集資源空閒的資源。
- 新 worker 怎麼被 sync?
- 原有的active worker 節點怎麼處理?
- 出問題的 worker 節點怎麼處理?
- rank 0 怎麼廣播?
我們在本文以及後續各篇的分析中試著解答這些問題。
注:Horovod目前的排程機制依然不靈活,不支援搶佔。
1.1 角色
Horovod 在單機的多個 GPU 之上採用 NCCL 來通訊,在多機(CPU或者GPU)之間通過 Ring AllReduce 演算法進行通訊。Horovod 的彈性訓練是指多機的彈性訓練。
Horovod 彈性訓練有兩個角色:driver和 worker。driver 程序執行在 CPU 節點上,worker 程序可以執行在 CPU 節點或者 GPU 節點之上。
Driver 程序的作用是:
- 呼叫 Gloo 幫助 workers 構造一個 AllReduce 通訊環,或者說是通訊域。Driver 不參與具體構建通訊環,而是提供輔助資訊,從而worker可以建立環。
- Driver 程序需要給 Gloo 建立一個帶有 KVStore 的 RendezvousServer,其中 KVStore 用於儲存通訊域內每個節點的 host 和 其在邏輯通訊環分配的序號 rank 等資訊。
- 這個 RendezvousServer 執行在 Horovod 的 driver 程序裡。driver 程序拿到所有 worker 程序節點的地址和 GPU 卡數資訊後,會將其寫入RendezvousServer 的 KVStore 中,然後 worker 就可以呼叫 gloo 來訪問 RendezvousServer 構造通訊環。
- Driver 會在 worker 節點上啟動/重啟 worker 程序。
- Driver 會監控系統整體狀態。
worker 負責訓練和模型迭代。
- 每個 worker 節點會向 RendezvousServer 發起請求來得到自己的鄰居節點資訊,從而構造通訊環。
- 在這個通訊環之中,每個 worker 節點有一個左鄰居和一個右鄰居,在通訊過程中,每個 worker 只會向它的右鄰居傳送資料,只會從左鄰居接受資料。
具體組網機制如下:
+-------------------------------+
| Driver |
| |
| +------------------------+ |
| | RendezvousServer | |
| | | |
| | | |
| | host1, host2, host3 | |
| +------------------------+ |
+-------------------------------+
^ ^ ^
| | |
| | |
+-------------+ | +--------------+
| | |
| | |
| | |
v v v
+--------+----+ +-------+------+ +----+--------+
| Worker | | Worker | | Worker |
+------> | +------> | +---------> | | +------+
| | host1 | | host2 | | host3 | |
| +-------------+ +--------------+ +-------------+ |
| |
| |
| v
<--------------------------------------------------------------------------------+
我們下面詳細分析下各個部分。
1.2 容錯機制
Horovod 的容錯機制是基於 gloo 來實現的,對於錯誤來說,這可以被認為是一個被動操作。
Gloo 本身是不支援容錯的。當眾多worker之間對張量進行聚合操作時候,如果某一個worker失敗,則gloo不會處理異常,而是丟擲異常並且退出,這樣所有worker都會報異常退出。
為了不讓某一個 worker 的失敗導致整體訓練退出,Horovod 需要做兩方面工作:
- 不讓異常影響現有作業。Horovod 必須捕獲 gloo 丟擲的異常,於是就構建了一個python處理異常機制。
- Worker 在捕獲異常之後會將異常傳遞給對應的 Python API 處理,API 通過判斷異常型別決定是否繼續訓練。
- 如果異常資訊中包括 “HorovodAllreduce”、“HorovodAllgather” 或者 “HorovodBroadcast” 等關鍵字,說明這可能是某個worker死掉導致的通訊失敗,這種異常被Horovod認為是可以恢復的。
- 放棄失敗的worker,使用剩餘可用worker繼續訓練。
- 其他存活的 worker 停止當前的訓練,記錄當前模型迭代的步數。
- 此時gloo的runtime已經出現問題,通訊環已經破裂,無法在剩餘的 worker 之間繼續進行 AllReduce 操作。
- 為了可以繼續訓練,Horovod Driver 會重新初始化 gloo,啟動一個新的 rendezvous server,然後獲取存活的 worker 的資訊,利用這些worker組成新的通訊環。
- 當新的通訊環構造成功後,rank 0 worker 會把自己的模型廣播發給其他所有worker,這樣大家就可以在一個基礎上,接著上次停止的迭代開始訓練。
1.4 監控機制
容錯機制是被動操作,監控機制就是主動操作。
彈性就意味著分散式叢集的狀態會隨時發生變化,而 Horovod 本身和分散式叢集並沒有關聯,所以需要有一個外部途徑來讓 Horovod 隨時掌握叢集狀態。
這個外部途徑就是使用者需要在 Horovod 啟動命令中提供一個發現指令碼 discovery_host。discovery_host 由使用者編寫,負責發現可用的 worker 節點拓撲資訊。
Driver在執行之後會定期呼叫這個 bash 指令碼來對叢集監控,當worker發生變化時,discover_host 指令碼會返回最新的worker狀態,Driver 根據 discover_host 的返回值得到 worker 節點資訊:
- 如果Driver發現有worker失敗,就捕獲異常,根據存活的worker資訊來更新 RendezvousServer KVStore 的節點資訊,號召大家重新建立通訊環進行訓練。
- 如果Driver發現有新worker節點加入叢集,根據目前所有worker資訊來更新 RendezvousServer KVStore 的節點資訊,號召大家重新建立通訊環進行訓練。現有worker 節點收到通知後,會暫停當前訓練,記錄目前迭代步數,呼叫
shutdown
和init
重新構造通訊環。Driver也會在新節點上啟動worker,擴充程序數目。 - 當新的通訊環構造成功之後,rank 0 worker 會把自己的模型廣播發給其他所有worker,這樣大家就可以在一個基礎上,接著上次停止的迭代開始訓練。
這樣在訓練過程中,當 worker 數量有變化時,訓練依然繼續進行。
1.5 官方架構圖
官方的一個架構圖如下,我們會在後續文章中逐步講解圖中部分:
0x02 示例程式碼
2.1 python程式碼
我們從官方文件中找出 TF v2 的示例程式碼看看,其關鍵之處是使用 @hvd.elastic.run 對 train 做了一個封裝,並且傳入了一個 TensorFlowKerasState。
import tensorflow as tf
import horovod.tensorflow as hvd
hvd.init()
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')
dataset = ...
model = ...
optimizer = tf.optimizers.Adam(lr * hvd.size())
@tf.function
def train_one_batch(data, target, allreduce=True):
with tf.GradientTape() as tape:
probs = model(data, training=True)
loss = tf.losses.categorical_crossentropy(target, probs)
if allreduce:
tape = hvd.DistributedGradientTape(tape)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
# Initialize model and optimizer state so we can synchronize across workers
data, target = get_random_batch()
train_one_batch(data, target, allreduce=False)
# 使用 @hvd.elastic.run 對 train 做了一個封裝
@hvd.elastic.run
def train(state):
for state.epoch in range(state.epoch, epochs):
for state.batch in range(state.batch, batches_per_epoch):
data, target = get_random_batch()
train_one_batch(data, target)
if state.batch % batches_per_commit == 0:
state.commit()
state.batch = 0
def on_state_reset():
optimizer.lr.assign(lr * hvd.size())
# 這裡是新修改處,傳入了一個 TensorFlowKerasState
state = hvd.elastic.TensorFlowKerasState(model, optimizer, batch=0, epoch=0)
state.register_reset_callbacks([on_state_reset])
train(state)
2.2 指令碼執行
彈性訓練依然使用 horovodrun 這個命令列工具跑,和普通分散式訓練不同的是,彈性訓練不會在啟動命令中明確指定節點列表,而是是使用一個 發現機制 來在執行時發現節點。通用的做法是在啟動 Job 時候提供一個發現指令碼:
horovodrun -np 18 --host-discovery-script discover_hosts.sh python train.py
此指令碼用以實時反饋當前可用的 hosts 以及每個 hosts 上的 slots(下文使用 discover_hosts.sh
指代該指令碼,但其無需命名為 discover_hosts.sh
)。
discover_hosts.sh 指令碼必須有可執行許可權,在被執行時返回可用節點列表,一行一個節點資訊,結構為: ,例如:
$ sh ./discover_hosts.sh # 執行指令碼,輸出節點資訊
host-1:4
host-2:4
host-3:4
如果這個發現指令碼執行失敗(沒有可執行許可權)或者執行時返回非0錯誤碼,則訓練程序會立刻失敗,否則會一直重試直到超時(返回的slot列表不滿足最小可執行數)。
彈性訓練會一直等到所需最小slots數(-np)準備好之後,才會開始執行訓練程序,使用者可以通過 --min-np 和 --max-np 指定最小和最大的slots數,如:
horovodrun -np 8 --min-np 4 --max-np 12 --host-discovery-script discover_hosts.sh python train.py
如果可用slots數小於 --min-np 指定的數量時(比如某些節點故障,任務被搶佔等),任務會被暫停等待,直到更多的節點變為活躍,或者超時時間 HOROVOD_ELASTIC_TIMEOUT(預設設定為600秒)達到。另外,如果不指定 --min-np ,則最小slots數會被預設為 -np 所配置的數目。
需要 --max-np 的原因是為了限制程序數目(防止過度使用可用資源),另外在學習率和資料分割槽方面也可以作為參考點(在這些情況下需要有一個固定的參考配置)。同樣,如果不指定此引數,也會預設為 --np 。
0x03 邏輯流程
3.1 邏輯流程
我們先解析下彈性訓練的邏輯流程(為了實現彈性訓練的能力,Horovod Elastic 對 Horovod 的架構和實現進行了一定的修改),最大的差別就是:彈性訓練需要在增刪worker時候可以跟蹤和同步worker的狀態,具體修改如下。
- 聚合操作需要被定義在
hvd.elastic.run
函式之下。- 將你的主訓練程序程式碼(初始化之後的所有程式碼)用一個函式(我們暫時命名為 train_func)封裝起來,然後使用裝飾器 hvd.elastic.run 裝飾這個函式。
- 對於這個裝飾器修飾的 train_func 函式,它第一個引數,必須是 hvd.elastic.State 的例項。因為某些新加入的worker可能會處於某些不確定的狀態之中,所以在執行這個被裝飾函式 train_func 之前,這個狀態物件需要在所有worker中進行同步,以此確保所有的worker都達到一致狀態。
- 因為同步函式會用到集合通訊操作,並且新增worker後,活躍worker不會在此函式之前重置,所以不要在同步函式之前使用Horovod的集合操作(比如broadcast, allreduce, allgather)。
- 每個 worker 都有自己的狀態(state)。
- 把所有需要在workers之間同步的變數都放進 hvd.elastic.State (比如model parameters,optimizer state,當前epoch和batch進度等等)物件之中。
- 對於TensorFlow,Keras和PyTorch,已經提供預設的標準狀態實現。然而,如果使用者需要在某些場景廣播特殊型別,可以過載定製 hvd.elastic.State 這個物件。
- 在執行
hvd.elastic.run
函式前,此狀態物件將在所有workers中同步一次,用於保持一致性。
- 週期性呼叫 state.commit() 來把狀態(state)備份到記憶體。
- 定期備份非常有用。在某些worker發生意外錯誤時,定期備份可以避免因為狀態被損壞而在重新訓練時候無法恢復現場。比如,如果一個worker剛好在更新引數過程中突然出錯,此時部分梯度更新完畢,部分梯度可能只更新到一半,這個狀態是不可逆轉而又無法繼續。因此,當此狀態發生時,會丟擲一個 HorovodInternalError 異常,當 hvd.elastic.run 捕獲到這個異常後,會利用最新一次commit中恢復所有狀態。
- 因為commit狀態代價高昂(比如如引數量太大會導致耗時過長),所以需要在"每個batch的處理時間"與"如果出錯,訓練需要從多久前的狀態恢復"之間選取一個平衡點。比如,如果你每訓練10個batches就commit一次,你就把複製時間降低了10倍。但是當發生錯誤時,你需要回滾到10個batches前的狀態。
- Elastic Horowod可以通過執行我們稱之為“優雅地移除worker”操作來避免這些回滾。如果driver程序發現主機已可用或標記為刪除,它將向所有workers推送一個通知。於是在下次呼叫
state.commit()
或更輕量級的state.check_host_updates()
時,一個HostsUpdatedInterrupt
異常將被丟擲。此異常的處理方式與“HorovodInternalError”類似,只是引數狀態不會還原到上次commit,而是從當前實時引數中恢復。 - 一般來說,如果你的硬體設施是可靠與穩定的,並且你的編排系統會在任務節點移除時提供足夠的告警,你就可低頻次呼叫 state.commit() 函式,同時只在每個batch結束時呼叫相對不耗時的 state.check_host_updates() 來檢查節點變更情況。
- 在 hvd.elastic.State 物件中註冊一些回撥函式,以便當worker成員發生變化時給予響應
- 比如回撥函式可以處理如下情況:
- 當worker數量發生改變時,學習率需要根據新的world size進行相應改變。
- 對資料集進行重新分割槽。
- 這些回撥函式會在"Horovod被重啟之後"和"狀態在節點間同步之前"這兩個階段中間被呼叫。
- 比如回撥函式可以處理如下情況:
- worker 的增減會觸發其他 worker 上的重置(reset)事件,重置事件會啟用以下幾個操作(具體執行依據情況決定,不一定全部執行):
- 判斷該 worker 是否可以繼續執行。
- 將失效的 worker host 加入到黑名單,下一次組網不會使用blacklist中的host。
- 在新的 hosts 上啟動 worker 程序。
- 更新每個 worker 的 rank 資訊。
- 在重置之後,每個 worker 的狀態會被同步
3.2 入口點
從如下程式碼可知 hvd.elastic.run 就是 horovod/tensorflow/elastic.py 之中的 run 函式。
import horovod.tensorflow as hvd
@hvd.elastic.run
所以我們去這個檔案中探尋。
def run(func):
from tensorflow.python.framework.errors_impl import UnknownError
def wrapper(state, *args, **kwargs):
try:
return func(state, *args, **kwargs)
except UnknownError as e:
if 'HorovodAllreduce' in e.message or \
'HorovodAllgather' in e.message or \
'HorovodBroadcast' in e.message:
raise HorovodInternalError(e)
return run_fn(wrapper, _reset)
3.3 主邏輯
run_fn 函式是關於使用者程式碼的主要邏輯所在,位於 horovod/common/elastic.py。
其主要邏輯是:
- 初始化 notification_manager;
- 在 notification_manager 註冊 state;
- 執行 func 函式,就是使用者的訓練程式碼 train;
- 在worker程序出現 HorvodInternalError 錯誤或者 HostsUpdateInterrupt 節點增刪時,會捕獲這兩個錯誤,呼叫 reset 來進行容錯處理;
def run_fn(func, reset):
@functools.wraps(func)
def wrapper(state, *args, **kwargs):
notification_manager.init()
notification_manager.register_listener(state)
skip_sync = False
try:
while True:
if not skip_sync:
state.sync()
try:
return func(state, *args, **kwargs)
except HorovodInternalError:
state.restore()
skip_sync = False
except HostsUpdatedInterrupt as e:
skip_sync = e.skip_sync
reset()
state.on_reset()
finally:
notification_manager.remove_listener(state)
return wrapper
3.4 出錯處理
在出錯狀態下,當worker程序出現 HorvodInternalError (代表出現錯誤)或者 HostsUpdateInterrupt (代表有節點增刪)時,Horovod 會執行如下流程:
- 在 hvd.elastic.run 裝飾器中捕獲上述兩個錯誤;
- 如果丟擲的是 HorvodInternalError 錯誤,則會從最後的一次 commit 狀態中恢復;
- 重新初始化 Horovod context,然後啟動新的一輪的rendezvous,在rendezvous過程中,舊的worker會被優先被選舉為新的rank-0,因為舊的worker具有上次訓練中的最近狀態;
- 新的 rank-0 worker 會把狀態同步到其它workers;
- 繼續訓練;
至此,我們已經分析了horovod 彈性訓練基本架構,下一篇我們分析最主要的部件:Driver。
0xEE 個人資訊
★★★★★★關於生活和技術的思考★★★★★★
微信公眾賬號:羅西的思考
如果您想及時得到個人撰寫文章的訊息推送,或者想看看個人推薦的技術資料,敬請關注。
0xFF 參考
ElasticDL呼叫 Horovod 在Kubernetes上實現彈性 AllReduce(一)
kubernetes 培訓_在Kubernetes上使用horovod進行分散式深度學習培訓
在 Kubernetes 上彈性深度學習訓練利器 -- Elastic Training Operator
ElasticHorovod - 彈性、容錯的分散式訓練 (嚐鮮版)