[原始碼解析] PyTorch 流水線並行實現 (1)--基礎知識

0x00 摘要

本系列開始介紹PyTorch的流水線並行實現。實質上,PyTorch就是 GPipe 的PyTorch版本。這些開源軟體在互相借鑑思路,互相學習,從 PyTorch 的原始碼註釋中,可以見到我們之前介紹的部分框架/庫的引用或者論文連結。

流水線並行其他文章連結如下:

[原始碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

[原始碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積

[原始碼解析] 深度學習流水線並行 GPipe(3) ----重計算

[原始碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[原始碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分割槽

[原始碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型

[原始碼解析] 深度學習流水線並行 PipeDream(4)--- 執行時引擎

[原始碼解析] 深度學習流水線並行 PipeDream(5)--- 通訊模組

[原始碼解析] 深度學習流水線並行 PipeDream(6)--- 1F1B策略

本文圖來自論文和github原始碼。

0x01 歷史

我們首先介紹一下來龍去脈。

1.1 GPipe

從前面系列文章我們知道,GPipe是Google Brain釋出的可伸縮流水線並行庫,它允許高效地訓練大型的消耗記憶體的模型。其論文是:

`GPipe: Efficient Training of Giant Neural Networks using Pipeline Parallelism
<https://arxiv.org/abs/1811.06965>`

GPipe實質是一個模型並行的庫,當模型的大小對於單個GPU來說太大時,訓練大型模型可能會導致記憶體不足。為了訓練如此大的模型,GPipe把一個多層網路分割成若干個複合層,然後每個複合層被部署到GPU/TPU之上。但是這若干個複合層只能順序並行,這就嚴重影響了訓練速度。所以GPipe引入了流水線並行機制(pipeline parallelism),在不同的GPU裝置之間對層進行流水線處理。另外,GPipe 也使用了重新計算這個技巧來降低記憶體,這樣可以訓練更大的模型。

Gpipe首先將模型分片到不同的裝置上,其中每個裝置承載模型的一個分片。碎片可以是單個層或一系列層。然後 Gpipe將一小批資料分割成微批次,並將微批次給承載第一個碎片的裝置。每個裝置上的層做如下操作:

  • 對接受到的微批次進行處理,並將輸出傳送到後續裝置。
  • 同時,它已準備好處理來自上一個裝置的微批次。

通過以這種方式對輸入進行管道連線,Gpipe能夠減少裝置的空閒時間。可以簡化大型模型的訓練。

1.2 torchgpipe

因為 GPipe 是基於 TensorFlow 的庫(這是Google的產品嘛),所以kakaobrain的一些工程師就用PyTorch 來實現了 GPipe,並且開源出來,這就是 torchgpipe,其地址為:https://github.com/kakaobrain/torchgpipe,使用者可以通過 pip install torchgpipe 進行安裝使用。

該作者團隊還發表了一篇論文,具體如下:https://arxiv.org/pdf/2004.09910.pdf

1.3 fairscale

FairScale是Facebook的一個PyTorch擴充套件庫,用於高效能和大規模培訓。該庫擴充套件了PyTorch的基本功能,同時添加了新的SOTA縮放技術。FairScale以可組合模組和簡易API的形式來提供了最新的分散式訓練技術。這些API可以作為研究人員的基本工具,因為其可以使用有限的資源來訓練大模型。

其開源地址為:https://github.com/facebookresearch/fairscale.

從其內部原始碼結構和文件看,個人認為其是Facebook 內部對於市面上各種最新深度學習庫的一個收集/實驗/探索的試驗田。如果實驗成熟,Facebook 就把部分程式碼合併到 PyTorch 之中

比如從下面可以看到,Facebook 正在/曾經試驗如下:

fairscale.nn.pipe is forked from torchgpipe, Copyright 2019, Kakao Brain, licensed under Apache License.

fairscale.nn.model_parallel is forked from Megatron-LM, Copyright 2020, NVIDIA CORPORATION, licensed under Apache License.

fairscale.optim.adascale is forked from AdaptDL, Copyright 2020, Petuum, Inc., licensed under Apache License.

fairscale.nn.misc.flatten_params_wrapper is forked from PyTorch-Reparam-Module, Copyright 2018, Tongzhou Wang, licensed under MIT License.

1.4 PyTorch

從 2021-03-18,PyTorch 1.8.0 Release Notes 之中可以看到。

  • Upstream fairscale.nn.Pipe into PyTorch as torch.distributed.pipeline (#44090)

這裡正式引入了 pipeline。

https://github.com/pytorch/pytorch/pull/44090 的內容是:

Stack from ghstack:

  • #44090 Pull in fairscale.nn.Pipe into PyTorch.

This is an initial commit pulling in the torchgpipe fork at https://github.com/facebookresearch/fairscale.

The purpose of this commit is to just pull in the code and ensure all tests and builds work fine. We will slowly modify this to match our intended API mentioned in #44827. Follow up PRs would

address further changes needed on top of the initial commit..

We're pulling the code into the torch.distributed._pipeline.sync package. The package is private on purpose since there is a lot of work (ex: docs, API changes etc.) that needs to go in before we can actually officially support this.

需要注意一點是,torchgpipe 這部分程式碼被合併到 torch/distributed/pipeline/sync 之下,這說明後續 PyTorch 也許會合並一個 async 實現,沒準就是 PipeDream

1.5 基礎版本

因為這部分原始碼在 PyTorch 之中基本未做改變。所以,我們還是以 torchgpipe 原始程式碼作為例子來進行說明。

本文只是選取重要功能進行講解,像 Deferred Batch NormalizationSkip Connections就不會分析,讀者如果有興趣,可以自行研究。

0x02 基礎知識

在 torchgpipe 和 fairscale 的原始碼內部,都涉及和介紹了大量相關知識,瞭解這些知識有助於我們更好的學習原始碼,也可以回頭對 GPipe 的分析做相應印證。

某些知識可能與之前文章有所重複,但是torchpipe和fairscale給出了自己的見解,我們可以再學習一下。

2.1 流水線並行

GPipe將一個模型拆分為多個分割槽,並將每個分割槽放置在不同的裝置之上,這樣可以增加內容容量。例如,我們可以拆分一個佔用40GB CUDA記憶體的模型分為4個分割槽,每個分割槽佔用10GB。

這種方法稱為“模型並行"。然而,典型的深度學習模型由連續的層組成。換句話說,後面的層在前一層完成之前是不會工作的。如果一個模型是由完全連續的層構成,即使我們將模型擴充套件到兩個或多個層上,同一時間也只能使用一個裝置。

GPipe將一個小批量(mini-batch)拆分為多個微批量(micro-batches),以使裝置儘可能並行工作,這被稱為“流水線並行"。

基本上,流水線並行是一個小型資料並行的棧。當每個分割槽處理完一個微批次後,它可以將輸出拋到下一個分割槽並立即開始下一個微批次的工作,這樣分割槽就可以重疊。

因為每個分割槽都必須等待前一個分割槽輸入作為第一個微批次來處理,所以流水線之上仍然有空閒時間,我們稱之為 “bubble"。

通過選擇較小尺寸的微批次,可以減少“bubble"。但通常,較大的批量可以更有效地利用GPU。因此,如果選擇的微批量太小,GPU可能未得到充分利用。另外,更快的分割槽應該等待相鄰的較慢分割槽,分割槽之間的不平衡也可能導致GPU利用率不足。因此,總體效能由最慢的分割槽決定。

2.2 Checkpointing

2.2.1 基本概念

Checkpointing 是一種用於減少訓練期間GPU記憶體使用的技術。這是通過避免在向前傳遞期間儲存中間啟用張量來實現的。具體而言,Checkpointing 在正向傳播過程中,只會記住分割槽邊界處的張量,所有其他中間張量都不會記住,而是在向後傳播過程中跟蹤原始輸入來重新計算向前傳播。因此,隱藏層消耗的記憶體僅為帶有檢查點的單個微批次所需要的數量。

Checkpointing 是效能和記憶體之間的折衷,因為如果完全重計算,則所花費的時間與正向傳播所花費的時間相同。但 Checkpointing 減少了儲存大型啟用張量的需要,從而允許我們增加批量大小,增加模型的淨吞吐量。

2.2.2 使用

在 GPipe之中,Checkpointing 應用於每個分割槽,以最小化模型的總體記憶體消耗。

Checkpointing 會極大減少記憶體使用,但總體訓練速度會降低25%左右。您可以處理如何在模型上應用檢查點。Checkpointing 只有三種選擇,不能夠指定某些特定點:

  • "always" :在所有微批次上應用檢查點。

  • "except_last" : 在最後一個微批次之外應用檢查點。

  • "never" :從不應用檢查點。

@pytest.mark.parametrize('checkpoint', ['never', 'always', 'except_last'])

通常,在最後一個微批次上的檢查點可能沒有用處,因為儲存的記憶體將立即重建。這就是為什麼我們選擇"except_last"作為預設選項。如果您決定根本不使用檢查點,那麼<torch.nn.DataParallel>可能比GPipe更有效。

2.2.3 實現概述

Checkpointing 已經作為“torch.utils.checkpoint.checkpoint_wrapper"API的一部分實現,通過該API可以包裝前向過程中的不同模組。

Checkpointing 通過重寫“torch.autograd.Function"來實現。在處理模組前向傳遞的“forward"函式中,如果使用“no_grad",我們可以在很長一段時間內(即直到反向傳播之前)防止正向圖的建立和中間啟用張量的物化。相反,在後向傳播期間,會再次執行前向傳播,然後執行後向傳播。

前向傳播過程的輸入使用上下文物件儲存,然後在後向傳播過程中訪問該上下文物件以檢索原始輸入。PyTorch還儲存了RNG(Random Number Generator)的狀態,用於前向傳播和後向傳播,如 Dropout layers 所需。

以下是幾個注意點:

  1. 記憶體節省完全取決於檢查點所包裝的模型和分段。每個backprop由幾個迷你前向傳播(mini-forward)和backprop過程組成。收益完全取決於每層啟用值的記憶體佔用。

  2. 使用BatchNormalization時,您可能需要凍結統計資料的計算,因為我們運行了兩次正向傳遞。

  3. 確保輸入張量的'requires_grad'欄位設定為True。為了觸發後向傳播功能,輸出需要設定此欄位。通過在輸入張量設定這個欄位,我們可以確保將其傳播到輸出,並觸發'backward'函式。

2.3 微批次的數目

微批量大小的選擇會影響GPU的利用率。較小的微批量可以減少等待先前微批次輸出的延遲,但較大的微批量可以更好地利用GPU。因此,關於微批次數量,存在了一個權衡,即每個微批次的GPU利用率和bubble總面積之間的權衡,使用者需要為模型找到最佳的微批次數量。

與大的微批次相比,在處理許多小的微批次時,GPU可能會減慢速度。如果每個CUDA核心太便宜而無法計算,那麼GPU將無法得到充分利用,因此太小的微批次將導致利用率不足。另一方面,當每個微批次的尺寸減小時,氣泡的面積也相應減少。理想情況下,使用者應該選擇可以提高GPU利用率的最大數量的微批次。

作為補充說明,批次尺寸越小,效能越差。大量的微批次可能會對使用BatchNorm的模型的最終效能產生負面影響,就像 torch.nn.DataParallel 那樣。

2.4 檢查重計算

GPipe中的檢查點執行兩次前向傳播。第二個前向傳播稱為“重新計算"。

諸如<torch.nn.BatchNorm2d>之類的模組在每次正向傳播時,如果更新其批處理統計資訊,可能就會導致問題。因此,在重新計算期間,不應再次更新正在執行的估計值。為了避免再次更新執行估計,模組的“forward"方法需要能夠檢測到這是重新計算。

~torchgpipe.is_recomputing方法可以檢測重新計算,在重執行期間,這個方法會返回True

   class Counter(nn.Module):
def __init__(self):
super().__init__()
self.counter = 0 def forward(self, input):
if not is_recomputing():
self.counter += 1
return input

另外,如果把~torchgpipe.GPipe 的成員變數 deferred_batch_norm設定為 True,則可以阻止再次更新執行統計。

0x03 使用

3.1 示例

要使用GPipe訓練模組,只需將其用 torchgpipe.GPipe 來包裝即可,但是使用者的模組必須是<torch.nn.Sequential> 的例項。

GPipe 會將自動將模組分割為多個分割槽,分割槽是在單個裝置上一起執行的一組連續層,其中:

balance引數確定每個分割槽中的層數。

chunks引數指定微批處理的數量。

下面的示例程式碼顯示瞭如何將具有四層的模組拆分為兩個分割槽,每個分割槽有兩層。此程式碼還將一個小批次 mini-batch 拆分為8個微批次(micro-batches)

   from torchgpipe import GPipe

   model = nn.Sequential(a, b, c, d)
model = GPipe(model, balance=[2, 2], chunks=8) # 1st partition: nn.Sequential(a, b) on cuda:0
# 2nd partition: nn.Sequential(c, d) on cuda:1 for input in data_loader:
output = model(input)

~torchgpipe.GPipe使用CUDA進行訓練。使用者不需要自己將模組移動到GPU,因為~torchgpipe.GPipe自動把每個分割槽移動到不同的裝置上。預設情況下,可用的GPU從cuda:0開始,並且按順序為每個分割槽選擇可用GPU。使用者也可以利用device 引數指定使用的GPU。

   model = GPipe(model,
balance=[2, 2],
devices=[4, 2], # Specify GPUs.
chunks=8)

3.2 輸入輸出

與典型module不同,GPipe之中,輸入裝置與輸出裝置不同,除非只有一個分割槽。這是因為第一個分割槽和最後一個分割槽被放置在不同的裝置上。因此,必須將輸入和目標移動到相應的裝置。可以通過 torchgpipe.GPipe.devices 的屬性來完成,這個屬性儲存了每個分割槽的裝置列表.

   in_device = model.devices[0]
out_device = model.devices[-1] for input, target in data_loader:
# input on in_device
input = input.to(in_device, non_blocking=True) # target on out_device
target = target.to(out_device, non_blocking=True) # output on out_device
output = model(input)
loss = F.cross_entropy(output, target)
loss.backward()
...

3.3 巢狀序列(Nested Sequentials)

~torchgpipe.GPipe拆分一個<torch.nn.Sequential>module時候,它將模組的每個子模組視為單一的、不可分割的層。然而,模型事實上並不一定這樣,有些子模組可能是另一個順序模組,可能需要進一步拆分它們。

GPipe 不會支援這些巢狀的 Sequentials module,所以使用者需要把module打平(flatten the module)。還好,這在PyTorch中並不難。以下程式碼段顯示了巢狀順序模組如何展平:

   _3_layers = nn.Sequential(...)  # len(_3_layers) == 3
_4_layers = nn.Sequential(...) # len(_4_layers) == 4
model = nn.Sequential(_3_layers, _4_layers) # len(model) == 2 def flatten_sequential(module):
def _flatten(module):
for name, child in module.named_children():
if isinstance(child, nn.Sequential):
for sub_name, sub_child in _flatten(child):
yield (f'{name}_{sub_name}', sub_child)
else:
yield (name, child)
return nn.Sequential(OrderedDict(_flatten(module))) model = flatten_sequential(model) # len(model) == 7
model = GPipe(model, balance=[2, 3, 2], chunks=4)

3.4 典型模型並行

典型的模型並行(Typical Model Parallelism)是GPipe的一個特例。模型並行性是相當於禁用了微批處理和檢查點的GPipe,可以通過chunks=1checkpoint='never' 來做到。

model = GPipe(model, balance=[2, 2], chunks=1, checkpoint='never')

至此,庫的歷史和基本知識已經介紹完畢,下一篇我們介紹 Auto balance。

0xFF 參考

Markdown公式用法大全

markdown中公式編輯教程

https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

CUDA學習:基礎知識小結

CUDA隨筆之Stream的使用

NVIDIA解決方案架構師深度解析大規模引數語言模型Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

https://github.com/NVIDIA/apex/

https://github.com/justheuristic/prefetch_generator

https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

https://pytorch.org/docs/stable/autograd.html

https://pytorch.org/docs/notes/cuda.html

https://zhuanlan.zhihu.com/p/61765561

https://pytorch.apachen.org/docs/1.7/64.html

https://zhidx.com/p/217999.html