[原始碼解析] PyTorch 分散式(1) --- 資料載入之DistributedSampler
0x00 摘要
為了更好的介紹引數伺服器Paracel的資料載入,我們臨時插入兩篇PyTorch的資料載入(因為字數太長,所以拆成兩篇),主要是從分散式的角度進行切入。本文只算是開胃甜點,後續會有專門系列分析PyTorch分散式。
引數伺服器系列其他文章如下:
[原始碼解析] 機器學習引數伺服器ps-lite 之(1) ----- PostOffice
[原始碼解析] 機器學習引數伺服器ps-lite(2) ----- 通訊模組Van
[原始碼解析] 機器學習引數伺服器ps-lite 之(3) ----- 代理人Customer
[原始碼解析]機器學習引數伺服器ps-lite(4) ----- 應用節點實現
[原始碼解析] 機器學習引數伺服器 Paracel (1)-----總體架構
[原始碼解析] 機器學習引數伺服器 Paracel (2)--------SSP控制協議實現
0x01 資料載入
1.1 加速途徑
當分散式訓練時候,為了加速訓練,有三個層面的工作需要處理。
- 資料載入層面
- 多機通訊層面
- 程式碼層面
在資料層面,可以使用多程序並行載入來加速資料預處理過程,也有利用GPU特點來加速,比如Nvidia DALI 通過將資料預處理放到 GPU 處理來解決 CPU 瓶頸問題。
在多機通訊層面,有各種集合通訊庫可以利用,比如NCCL,OpenMPI, Gloo 等。
在程式碼層面,可以使用框架提供的分散式API,或者利用 Horovod 來改造單機版程式碼,使其支援分散式任務。
接下來我們就看看資料層面如何加速。
1.2 並行處理
AI框架的資料處理主要如下並行處理:
- 資料載入/處理使用CPU。
- 訓練使用GPU。
在理想狀態下,應該是每輪迭代訓練之前,CPU就完成載入,準備好訓練資料,這樣訓練就可以持續無縫迭代。
然而,GPU算力每年會提升一倍,CPU的提升速度遠遠落後於GPU,所以CPU會是拖後腿的那個角色。這裡不僅僅是CPU算力不足的問題,也包括村儲存中讀取資料速度不足的問題。
因此,機器學習對於資料載入和前期預處理的要求越來越高,必須在GPU計算時間內,完成下一迭代資料的準備工作,不能讓GPU因為等待訓練資料而空閒。
1.3 流水線
對於機器學習訓練,載入資料可以分為三個步驟:
- 將資料從磁碟或者分散式儲存載入到主機(CPU)。
- 將資料從主機可分頁記憶體傳輸到主機固定記憶體。
- 將資料從主機固定記憶體轉移到主機GPU。
因此,流行的深度學習框架會依據載入步驟的特點和異構硬體的特點來進行流水線處理,從而提高資料處理過程的吞吐量。
流水線一般包括多個運算元,每個運算元內部由資料佇列組成一個緩衝區,上游運算元完成處理之後會傳給給下游運算元進行處理。這樣每個運算元任務會彼此獨立,運算元內部可以使用細粒度的多執行緒/多程序來並行加速,每個運算元可以獨立控制處理速度和記憶體以適配不同網路對於處理速度的需求。
如果運算元內部資料佇列不為空,模型就會一直源源不斷獲得資料,就不會因為等待訓練資料而產生瓶頸。
下面是序列處理邏輯:
+------+ +-----------+ +---------------------------+
| | | | | |
| Data +----------> | Load Data +---------> | Transfer to Pinned Memory |
| | | | | |
+------+ +-----------+ +---------------------------+
下面是並行流水線邏輯:
+------------+
+--------+ | |
| | | Process 1 |
| Data 1 +--------> | +------+
| | | Load Data | |
+--------+ | | |
+------------+ |
|
|
|
+------------+ | +-----------------------------------+
+--------+ | | | | |
| | | Process 2 | +------> | Pin-memory process |
| Data 2 +--------> | | | |
| | | Load Data +-------------> | |
+--------+ | | | Transfer to Pinned Memory |
+------------+ +-----> | |
| | |
| +-----------------------------------+
|
+--------+ +------------+ |
| | | | |
| Data 3 +--------> | Process 3 +-------+
| | | |
+--------+ | Load Data |
| |
+------------+
1.4 GPU
本文到現在是解決CPU側的資料傳輸問題,即:從磁碟載入資料,從可分頁到固定記憶體。
但是,從固定記憶體到GPU的資料傳輸(tensor.cuda()
)也可以使用CUDA流進行流水線處理。
另外,深度學習應用程式需要複雜的多階段資料處理管道,包括載入、解碼、裁剪、調整大小和許多其他增強功能。這些目前在 CPU 上執行的資料處理管道已經成為瓶頸,限制了訓練和推理的效能和可擴充套件性。
Nvidia DALI 通過將資料預處理放到 GPU 處理來解決 CPU 瓶頸問題,使用者可以依據自己模型的特點,構建基於 GPU 的 pipeline,或者基於CPU的pipeline。
接下來我們就介紹PyTorch的資料載入,而且主要是從分散式的角度進行切入。
0x02 PyTorch分散式載入
2.1 DDP
pytorch為資料分散式訓練提供了多種選擇。隨著應用從簡單到複雜,從原型到產品,常見的開發軌跡可以是:
- 如果資料和模型能放入單個GPU,使用單裝置訓練,此時不用擔心訓練速度;
- 如果伺服器上有多個GPU,並且你在程式碼修改量最小的情況下加速訓練,使用單個機器多GPU DataParallel;
- 如果你想進一步加速訓練並且願意寫一點程式碼來啟動,使用單個機器多個GPU DistributedDataParallel;
- 如果應用程式跨機器邊界擴充套件,使用多機器DistributedDataParallel和啟動指令碼;
- 如果預期有錯誤(比如OOM)或者資源在訓練過程中可以動態連線和分離,使用torchelastic來啟動分散式訓練。
與本文最相關的部分就是DDP,Distributed Data-Parallel Training(DDP)是一個廣泛採用的單程式多資料訓練方法。使用DDP,模型會被複制到每個程序,然後每個模型副本會被輸入資料樣本的不同子集。DDP負責梯度通訊以保持模型副本的同步,並將其與梯度計算重疊以加快訓練速度。
2.2 分散式載入
我們首先要看看分散式載入的總體結構。
給出示例程式碼,可以看到主要使用了 DataSet, DistributedSampler,DataLoader 這三個實體。
sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), sampler=sampler)
for epoch in range(start_epoch, n_epochs):
if is_distributed:
sampler.set_epoch(epoch)
train(loader)
這三個概念的邏輯關係如下:
- Dataset : 從名字可以知道,是資料集的意思。負責對原始訓練資料的封裝,將其封裝成 Python 可識別的資料結構,Dataset的派生類必須提供介面一邊獲取單個數據。
- Sampler : 從名字可知,是取樣器,負責取樣方式或者說是取樣策略,實現某種提取/取樣策略從Dataset之中拿到資料索引,供DataLoade使用。可以認為,Sampler 是指揮者,負責決定戰鬥在哪裡開展。
- DataLoader : 負責依據索引來從資料集中載入資料。支援 Map-style 和 Iterable-style 兩種Dataset,支援單程序/多程序載入。Loader 就是具體作戰的鬥士,負責按照 Sampler的命令進行戰鬥。
具體如下圖,簡要說就是:
- DataSet 把資料集數目發給DistributedSampler。
- Sampler 按照某種規則傳送資料indices給Loader。
- Loader 依據indices載入資料。
- Loader 把資料發給模型,進行訓練。
+------------------------+ +-----------+
|DistributedSampler | |DataLoader |
| | 2 indices | |
| Some strategy +-------------------> | |
| | | |
|-------------+----------| | |
^ | | 4 data +-------+
| | -------------->+ train |
1 | length | | +-------+
| | |
+-------------+----------+ | |
|DataSet | | |
| +---------+ | 3 Load | |
| | Data +-------------------------> | |
| +---------+ | | |
| | | |
+------------------------+ +-----------+
因為資料集不是分散式訓練重點,所以本文接下來主要分析 Sampler。
Sampler 的重點就是:如何讓每個worker在資料集中只加載自己所屬的部分,並且worker之間實現對資料集的正交分配。
0x03 DistributedSampler
對於資料並行和分散式訓練,DistributedSampler 負責其資料取樣的任務。
DistributedSampler 是 Sampler 的派生類。當 DistributedDataParallel 使用DistributedSampler 時,每個並行的程序都會得到一個DistributedSampler 例項,這個DistributedSampler 例項會給DataLoader傳送指示,從而 DataLoader 載入具體資料。
DistributedSampler 載入策略負責只提供載入資料集中的一個子集,這些DistributedSampler 提供的子集之間不重疊,不交叉。
3.1 初始化
__init__
初始化程式碼主要是設定了本worker節點的各種資訊,比如資料集dataset,rank(全域性GPU序號),num_replicas 副本數目。並且計算出來所有樣本數目total_size。
幾個引數如下:
- dataset : 就是取樣的資料集。
- num_replicas :參與分散式訓練的程序數目,如果沒有設定,則從group之中得到world_size作為程序數目。
- rank : 當前程序的序號,如果沒有設定,則從group之中得到。
- shuffle :取樣是否需要打亂indices。
- seed :如果需要打亂,則設定一個random seed。
- drop_last :如果不能均勻分割資料,是否需要把無法分配的尾部資料丟掉。
- epoch :每次epoch都會shuffle資料集,如何保持shuffle之後資料集一致性?就是通過epoch完成。
具體程式碼如下,省略了異常處理。
class DistributedSampler(Sampler[T_co]):
def __init__(self, dataset: Dataset, num_replicas: Optional[int] = None,
rank: Optional[int] = None, shuffle: bool = True,
seed: int = 0, drop_last: bool = False) -> None:
self.dataset = dataset
self.num_replicas = num_replicas
self.rank = rank
self.epoch = 0
self.drop_last = drop_last
# If the dataset length is evenly divisible by # of replicas, then there
# is no need to drop any data, since the dataset will be split equally.
if self.drop_last and len(self.dataset) % self.num_replicas != 0: # type: ignore[arg-type]
# Split to nearest available length that is evenly divisible.
# This is to ensure each rank receives the same amount of data when
# using this Sampler.
self.num_samples = math.ceil(
# `type:ignore` is required because Dataset cannot provide a default __len__
# see NOTE in pytorch/torch/utils/data/sampler.py
(len(self.dataset) - self.num_replicas) / self.num_replicas # type: ignore[arg-type]
)
else:
self.num_samples = math.ceil(len(self.dataset) / self.num_replicas) # type: ignore[arg-type]
self.total_size = self.num_samples * self.num_replicas
self.shuffle = shuffle
self.seed = seed
3.2 迭代方法
DistributedSampler 被實現成一個迭代器(類似於迴圈),因此會用到 python 抽象類的魔法方法:
__len__(self)
: 當被len()
函式呼叫時的行為,一般返回迭代器中元素的個數。__iter__(self)
: 當迭代容器中元素時的行為,實際上是返回是一個迭代器(通常是迭代器本身),每一次迭代得到的結果會被用來作為下一次迭代的初始值。
__iter__
程式碼的一個技術細節是:
indices = indices[self.rank:self.total_size:self.num_replicas]
當一個list之中有雙引號,比如 list[start:end:step]
,其意義是:
- start: 起始位置
- end: 結束位置
- step: 步長
我們用一個例子來看看,比如:
a = [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15]
print(a[0:15:3])
print(a[1:15:3])
print(a[2:15:3])
得到:
[1, 4, 7, 10, 13]
[2, 5, 8, 11, 14]
[3, 6, 9, 12, 15]
因為 indices[self.rank:self.total_size:self.num_replicas]
之中,num_replicas 實際就是rank的總數,所以這裡每個worker就會嚴格返回自己rank對應的那部分資料序號。
總結一下DistributedSampler的分配方法是:每段連續的 num_replicas
個數據被拆成一個一個,分給 num_replicas
個程序,而且是通過每個worker 的 rank 來獲取資料,這樣就達到了不重疊不交叉的目的,但也要注意的是:這樣每個程序拿到的資料是不連續的。
具體程式碼如下:
class DistributedSampler(Sampler[T_co]):
def __iter__(self) -> Iterator[T_co]:
if self.shuffle: # 如果需要shuffle,則會基於epoch和seed進行處理
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch)
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
else: # 否則直接返回資料集長度序列
indices = list(range(len(self.dataset))) # type: ignore[arg-type]
# 是否需要補齊資料
if not self.drop_last:
# add extra samples to make it evenly divisible
padding_size = self.total_size - len(indices)
if padding_size <= len(indices):
indices += indices[:padding_size]
else:
indices += (indices * math.ceil(padding_size / len(indices)))[:padding_size]
else:
# remove tail of data to make it evenly divisible.
indices = indices[:self.total_size]
assert len(indices) == self.total_size
# subsample
# 依據自己的rank,依次返回自己的資料序號
indices = indices[self.rank:self.total_size:self.num_replicas]
assert len(indices) == self.num_samples
return iter(indices)
def __len__(self) -> int:
return self.num_samples
def set_epoch(self, epoch: int) -> None:
r"""
Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
use a different random ordering for each epoch. Otherwise, the next iteration of this
sampler will yield the same ordering.
Args:
epoch (int): Epoch number.
"""
self.epoch = epoch
內部變數之間邏輯如下:
- 從資料集獲取長度length;
- 從配置得到 num_replicas(有幾個rank),本身rank;
- 依據 資料集長度 和 num_replicas得到 num_samples 和 total_size;
- 最終給出 indices = indices[rank: total_size: num_replicas];
- 返回 indices 給DataLoader
+-----------------------------------------------------------+
| DistributedSampler |
| |
| 2 2 |
| rank +---+ num_replicas |
| + | + |
| | | | 3 |
| | | | |
| | | v |
| | | num_samples = ceil(len(dataset)/ num_replicas) |
| | | + |
| | | | |
| | | | 3 |
| | | v |
| | | total_size = num_samples * num_replicas |
| | | + |
| |4 |4 |4 |
| | | | |
| v v v |
| +-+----+------------+--------------------------------+ | +-------------+
| | | | indices | |
| | indices = indices[rank: total_size: num_replicas] +------------->+ DataLoader |
| | ^ | | 5 | |
| | | | | +-------------+
| | | | |
| +----------------------------------------------------+ |
+-----------------------------------------------------------+
|
1 | length
+------+--------+
| DataSet |
+---------------+
3.3 shuffle資料集
每次epoch都會shuffle資料集,但是不同程序如何保持shuffle之後資料集一致性?
DistributedSampler 使用當前的epoch作為隨機數種子,在計算index之前就進行配置,從而保證不同程序都使用同樣的隨機數種子,這樣shuffle出來的資料就能確保一致。
3.3.1 使用
從下面程式碼可以看出來,如果需要分散式訓練,就對 sampler 設定 epoch。
sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, shuffle=(sampler is None), ...,
sampler=sampler)
for epoch in range(start_epoch, n_epochs):
if is_distributed:
sampler.set_epoch(epoch) # 這設定epoch
train(loader)
3.3.2 python
具體對應 DistributedSampler 的實現。
設定 epoch 很簡單,就是配置下。
def set_epoch(self, epoch: int) -> None:
r"""
Sets the epoch for this sampler. When :attr:`shuffle=True`, this ensures all replicas
use a different random ordering for each epoch. Otherwise, the next iteration of this
sampler will yield the same ordering.
Args:
epoch (int): Epoch number.
"""
self.epoch = epoch
設定 random 種子的具體使用是在迭代函式之中:
def __iter__(self) -> Iterator[T_co]:
if self.shuffle:
# deterministically shuffle based on epoch and seed
g = torch.Generator()
g.manual_seed(self.seed + self.epoch) # 這裡設定隨機種子
indices = torch.randperm(len(self.dataset), generator=g).tolist() # type: ignore[arg-type]
else:
indices = list(range(len(self.dataset))) # type: ignore[arg-type]
# 省略其他程式碼
3.3.3 C++
我們也可以提前看看在C++ 程式碼的DistributedRandomSampler,這是C++ API,也起到python同樣作用。
我們可以看到設定種子和shuffle如下:
void DistributedRandomSampler::reset(optional<size_t> new_size) {
size_ = new_size.value_or(size_);
populate_indices();
std::mt19937 rand(epoch_);
std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
sample_index_ = begin_index_;
}
3.3.4 小結
我們擴充套件目前邏輯如下:
- 從資料集獲取長度length;
- 從配置得到 num_replicas(有幾個rank),本身rank,epoch;
- 用 epoch 來設定random seed;
- 利用random seed 對資料集 indices 進行打亂,後續就會一直使用 這個打亂的indices;
- 依據 資料集長度 和 num_replicas得到 num_samples 和 total_size;
- 結合之上各種資料條件,最終給出 indices = indices[rank: total_size: num_replicas];
- 返回 indices 給DataLoader
+-----------------------------------------------------------------+
| DistributedSampler |
| |
| |
| 2 3 |
| epoch +------> manual_seed(seed + epoch) +---------> indices |
| + |
| | |
| | |
| 2 2 | |
| rank +---+ num_replicas 4 | |
| + | + | |
| | | | 5 | |
| | | | | |
| | | v | |
| | | num_samples = ceil(len(dataset)/ num_replicas) | |
| | | + | |
| | | | | |
| | | | 5 | |
| | | v | |
| | | total_size = num_samples * num_replicas | |
| | | + | |
| |6 |6 |6 | |
| | | | | |
| v v v | |
| +-+----+------------+--------------------------------+ | |
| | | | |
| | indices = indices[rank: total_size: num_replicas] | <----+ |
| | ^ + | |
| | | | | |
| | | | | |
| +----------------------------------------------------+ |
+-----------------------------------------------------------------+
| |
1 | length 7 v indices
|
+-------+--------+ +-------------+
| | | |
| DataSet | | DataLoader |
| | | |
+----------------+ +-------------+
3.4 Sampler in C++
因為某些公司是C++開發,他們也有迫切的使用pytorch的需求,所以pytorch也提供了C++ API,我們接下來就看看如何實現。
3.4.1 定義
其類定義在:torch\csrc\api\include\torch\data\samplers\distributed.h
我們可以看到,DistributedSampler 是基類,主要成員變數是:
size_t size_ 檔案大小
size_t num_replicas_ 副本數目
size_t rank_ 本sampler 對應哪個程序或者GPU
size_t epoch 本次訓練的epoch
bool allow_duplicates_ 是否允許備份
接下來是兩個派生類: DistributedRandomSampler 和 DistributedSequentialSampler。
/// A `Sampler` that selects a subset of indices to sample from and defines a
/// sampling behavior. In a distributed setting, this selects a subset of the
/// indices depending on the provided num_replicas and rank parameters. The
/// `Sampler` performs a rounding operation based on the `allow_duplicates`
/// parameter to decide the local sample count.
template <typename BatchRequest = std::vector<size_t>>
class DistributedSampler : public Sampler<BatchRequest> {
public:
DistributedSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true)
: size_(size),
num_replicas_(num_replicas),
rank_(rank),
epoch_(0),
allow_duplicates_(allow_duplicates) {}
/// Set the epoch for the current enumeration. This can be used to alter the
/// sample selection and shuffling behavior.
void set_epoch(size_t epoch) {
epoch_ = epoch;
}
size_t epoch() const {
return epoch_;
}
protected:
size_t local_sample_count() {
if (allow_duplicates_) {
return (size_ + num_replicas_ - 1) / num_replicas_;
} else {
return size_ / num_replicas_;
}
}
size_t size_;
size_t num_replicas_;
size_t rank_;
size_t epoch_;
bool allow_duplicates_;
};
/// Select samples randomly. The sampling order is shuffled at each `reset()`
/// call.
class TORCH_API DistributedRandomSampler : public DistributedSampler<> {
public:
DistributedRandomSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true);
/// Resets the `DistributedRandomSampler` to a new set of indices.
void reset(optional<size_t> new_size = nullopt) override;
/// Returns the next batch of indices.
optional<std::vector<size_t>> next(size_t batch_size) override;
/// Serializes the `DistributedRandomSampler` to the `archive`.
void save(serialize::OutputArchive& archive) const override;
/// Deserializes the `DistributedRandomSampler` from the `archive`.
void load(serialize::InputArchive& archive) override;
/// Returns the current index of the `DistributedRandomSampler`.
size_t index() const noexcept;
private:
void populate_indices();
size_t begin_index_;
size_t end_index_;
size_t sample_index_;
std::vector<size_t> all_indices_;
};
/// Select samples sequentially.
class TORCH_API DistributedSequentialSampler : public DistributedSampler<> {
public:
DistributedSequentialSampler(
size_t size,
size_t num_replicas = 1,
size_t rank = 0,
bool allow_duplicates = true);
/// Resets the `DistributedSequentialSampler` to a new set of indices.
void reset(optional<size_t> new_size = nullopt) override;
/// Returns the next batch of indices.
optional<std::vector<size_t>> next(size_t batch_size) override;
/// Serializes the `DistributedSequentialSampler` to the `archive`.
void save(serialize::OutputArchive& archive) const override;
/// Deserializes the `DistributedSequentialSampler` from the `archive`.
void load(serialize::InputArchive& archive) override;
/// Returns the current index of the `DistributedSequentialSampler`.
size_t index() const noexcept;
private:
void populate_indices();
size_t begin_index_;
size_t end_index_;
size_t sample_index_;
std::vector<size_t> all_indices_;
};
3.4.2 實現
類的具體實現位於:torch\csrc\api\src\data\samplers\distributed.cpp
3.4.2.1 DistributedRandomSampler
我們首先看看DistributedRandomSampler。
其作用就是依據本worker 的 rank_獲取打亂的index。我們按照邏輯順序講解各個函式。
- 初始化時候會呼叫 reset(size_) 進行 shuffle。
- reset 函式作用是讓sampler指向一組新的indices:
- 首先呼叫 populate_indices() 設定對應本rank的起始index,終止index。
- populate_indices 函式之中,會對資料index 進行設定,即配置了 all_indices_,也同時配置了本rank的起始index,終止index。
- 然後對 all_indices_ 進行shuffle。
- next 函式就相對簡單了,因為主要工作被reset做了,所以此時資料已經被隨機打亂了,所以找到起始位置,返回資料中對行數即可。
因為下面用到了 iota 函式,可能有的同學不熟悉,這裡說明下iota的作用:
std::vector<int> test;
test.resize(10);
std::iota(test.begin(), test.end(), 5);// 將從 5 開始的 10 次遞增值賦值給 test
//test結果:5 6 7 8 9 10 11 12 13 14
具體程式碼如下:
DistributedRandomSampler::DistributedRandomSampler(
size_t size,
size_t num_replicas,
size_t rank,
bool allow_duplicates)
: DistributedSampler(size, num_replicas, rank, allow_duplicates),
begin_index_(0),
end_index_(0),
sample_index_(0) {
// shuffle first time.
reset(size_);
}
// 每次載入新epoch時候,都要呼叫reset
void DistributedRandomSampler::reset(optional<size_t> new_size) {
size_ = new_size.value_or(size_);
populate_indices();
std::mt19937 rand(epoch_);
// 對於資料進行shuffle
std::shuffle(all_indices_.begin(), all_indices_.end(), rand);
sample_index_ = begin_index_;
}
void DistributedRandomSampler::populate_indices() {
size_t num_local_samples = local_sample_count();
// 得到樣本數量
size_t sample_count =
num_replicas_ == 1 ? size_ : num_local_samples * num_replicas_;
all_indices_.resize(sample_count);
// std::iota 的作用是用順序遞增的值賦值指定範圍內的元素
// 這裡是給all_indices_設定從0開始到sample_count這些數值
std::iota(std::begin(all_indices_), std::end(all_indices_), 0);
// 如果sample count大於size_,則需要給多出來的那些index再賦一些數值
for (size_t i = size_; i < sample_count; ++i) {
// we may have added duplicate samples to make all
// replicas to have the same number of samples.
all_indices_[i] = i - size_;
}
begin_index_ = rank_ * num_local_samples; // 對應本rank的起始index
end_index_ = begin_index_ + num_local_samples; // 對應本rank的終止index
sample_index_ = begin_index_;
}
size_t DistributedRandomSampler::index() const noexcept {
return sample_index_;
}
// 注意,每次載入新epoch時候,都要呼叫reset,因此對於next函式來說,工作已經很小
optional<std::vector<size_t>> DistributedRandomSampler::next(
size_t batch_size) {
if (sample_index_ == end_index_) { // 已經提取完資料
return nullopt;
}
size_t end = sample_index_ + batch_size; // 本次迭代的終止位置
if (end > end_index_) {
end = end_index_;
}
auto iter = all_indices_.begin(); // 因為此時資料已經被隨機打亂了,找到起始位置即可
std::vector<size_t> res(iter + sample_index_, iter + end); // 從所有資料中提取前面若干行
sample_index_ = end;
return res;
}
3.4.2.2 DistributedSequentialSampler
然後看看 DistributedSequentialSampler。
其作用就是依據本worker 的 rank_獲取順序的index。我們按照邏輯順序講解各個函式。
- reset 函式就簡單多了,使用populate_indices按照順序設定index即可。
- next 函式就相對複雜,不但要順序返回index,還需要設定下次的起始位置。
DistributedSequentialSampler::DistributedSequentialSampler(
size_t size,
size_t num_replicas,
size_t rank,
bool allow_duplicates)
: DistributedSampler(size, num_replicas, rank, allow_duplicates),
begin_index_(0),
end_index_(0),
sample_index_(0) {
populate_indices(); // 這裡會設定本rank對應的起始位置
}
void DistributedSequentialSampler::reset(optional<size_t> new_size) {
size_t size = new_size.value_or(size_);
if (size != size_) {
size_ = size;
populate_indices();
} else {
sample_index_ = begin_index_;
}
}
void DistributedSequentialSampler::populate_indices() {
begin_index_ = rank_ * local_sample_count(); // 本rank對應的起始位置
end_index_ = begin_index_ + local_sample_count();
sample_index_ = begin_index_;
}
size_t DistributedSequentialSampler::index() const noexcept {
return sample_index_;
}
optional<std::vector<size_t>> DistributedSequentialSampler::next(
size_t batch_size) {
if (sample_index_ == end_index_) { // 已經迴圈結束
return nullopt;
}
size_t end = sample_index_ + batch_size; // 本次的終止行
if (end > end_index_) {
end = end_index_;
}
std::vector<size_t> res(end - sample_index_); // 返回的vector大小
// 給res設定從sample_index_開始遞增(end - sample_index_)這麼大的這些數值,這就是順序返回了index
std::iota(std::begin(res), std::end(res), sample_index_);
if (end >= size_) {
for (size_t& index : res) { //遍歷 vector,得到本次的index
index = index % size_;
}
}
sample_index_ = end; // 設定下次開始行
return res;
}
0xFF 參考
卷積神經網路的並行化模型--One weird trick for parallelizing convolutional neural networks
PyTorch 原始碼解讀之 torch.utils.data:解析資料處理全流程
pytorch(分散式)資料並行個人實踐總結——DataParallel/DistributedDataParallel