[原始碼解析] PyTorch 流水線並行實現 (3)--切分資料和執行時系統

0x00 摘要

前幾篇文章我們介紹了 PyTorch 流水線並行的基本知識和自動平衡機制,本文我們介紹如何切分資料和執行時系統。

流水線並行其他文章連結如下:

[原始碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

[原始碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積

[原始碼解析] 深度學習流水線並行 GPipe(3) ----重計算

[原始碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[原始碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分割槽

[原始碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型

[原始碼解析] 深度學習流水線並行 PipeDream(4)--- 執行時引擎

[原始碼解析] 深度學習流水線並行 PipeDream(5)--- 通訊模組

[原始碼解析] 深度學習流水線並行 PipeDream(6)--- 1F1B策略

[原始碼解析] PyTorch 流水線並行實現 (1)--基礎知識

[原始碼解析] PyTorch 流水線並行實現 (2)--如何劃分模型

最後得出執行時系統如下:

0x01 分割小批次

我們首先看看如何把一個 mini-batch 分割為多個 micro-batches。

1.1 使用

從下面示例程式碼可以看出來,具體使用scatter方法進行了分割。

# Divide a mini-batch into micro-batches.
batches = microbatch.scatter(input, self.chunks) # Run pipeline parallelism.
pipeline = Pipeline(batches,
self.partitions,
self.devices,
copy_streams,
self._skip_layout,
checkpoint_stop) pipeline.run() # Merge the micro-batches into one mini-batch.
output = microbatch.gather(batches)
return output

1.2 PyTorch 基礎

我們先看看 PyTorch 的一些基礎程式碼。

1.2.1 chunk

chunk方法可以對張量分塊,返回一個張量列表,其引數是:

  • ensor :要分割的張量。
  • chunks : 分割的塊數
  • dim :沿著哪個軸分塊

具體舉例如下:

import numpy as np
import torch data = torch.from_numpy(np.random.rand(3, 5))
print(str(data)) for i, data_i in enumerate(data.chunk(3, 0)): # 沿0軸分為3塊
print(str(data_i)) 輸出
tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410],
[0.7889, 0.4480, 0.7607, 0.7903, 0.4118],
[0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64) tensor([[0.1208, 0.3428, 0.4586, 0.9372, 0.6410]], dtype=torch.float64)
tensor([[0.7889, 0.4480, 0.7607, 0.7903, 0.4118]], dtype=torch.float64)
tensor([[0.8391, 0.6649, 0.8338, 0.3477, 0.3953]], dtype=torch.float64)

1.2.2 cat

cat 的用法則是把張量拼接在一起,或者把一個張量列表拼接起來。

Z = torch.cat( (X,Y),0 )  # 按維數0拼接,就是豎著拼
Z = torch.cat( (X,Y),1 ) # 按維數1拼接,就是橫著拼

我們用示例看看:

X = torch.ones(2, 5)
Y = torch.ones(4, 5)
Z = torch.cat((X, Y), 0)
print(Z)

結果是:

tensor([[1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1.],
[1., 1., 1., 1., 1.]])

1.3 分割 & 聚合

具體回到分割批次,我們來看看Scatter 程式碼。

def scatter(input: TensorOrTensors, chunks: int) -> List[Batch]:
"""Splits an input mini-batch into multiple micro-batches."""
inputs: Iterable[TensorOrTensors] if isinstance(input, Tensor):
inputs = input.chunk(chunks) # 如果是張量,則直接分割
else:
rotated: List[Tensors] = [] for tensor in input: # 如果是張量陣列,則遍歷
tensors = tensor.chunk(chunks) # 對於每一個張量進行分割
rotated.append(cast(Tensors, tensors)) # 分割結果對映為 Tuple list inputs = zip(*rotated) # 把 list 之中的Tuple 分別聚合 return [Batch(x) for x in inputs] # 對映成 Batch 列表返回

gather 方法則是把scatter的結果重新聚集起來,就是一個逆向操作。

def gather(outputs: List[Batch]) -> TensorOrTensors:
"""Concatenates output micro-batches into a mini-batch."""
output: TensorOrTensors if outputs[0].atomic:
tensors = tuple(b.tensor for b in outputs)
output = torch.cat(tensors)
else:
rotated = [b.tensors for b in outputs]
output_buf = [] for tensors in zip(*rotated):
output_buf.append(torch.cat(tensors)) output = tuple(output_buf) return output

1.4 剖析

我們看看如何使用,下面程式碼是把ab這個張量列表打散,分割成兩個塊。

def test_scatter_tuple():
ab = (torch.ones(2, 1), torch.zeros(4, 2), torch.zeros(6, 3)) a, b = scatter(ab, chunks=2) assert a.tensors[0].size() == (1, 1)
assert b.tensors[0].size() == (1, 1)
assert a.tensors[1].size() == (2, 2)
assert b.tensors[1].size() == (2, 2)
assert a.tensors[2].size() == (3, 3)
assert b.tensors[2].size() == (3, 3)

我們畫個圖來看看。

    +-------------------------------------------------------------+
| ab |
| |
| +-----------+ +---------+ +----------+ |
| | | | | | 0 0 0 | |
| | | | 0 0 | | 0 0 0 | |
| | 1 | | 0 0 | | 0 0 0 | |
| | 1 | | 0 0 | | 0 0 0 | |
| | | | 0 0 | | 0 0 0 | |
| | | | | | 0 0 0 | |
| +-----------+ +---------+ +----------+ |
| |
+-------------------------------+-----------------------------+
|
|
|
a, b = scatter(ab, chunks=2)
|
|
|
|
|
v +------------------------------+ +-----------------------------+
| a | |b |
| +---+ +-----+ +--------+ | | +---+ +-----+ +--------+ |
| | 1 | | 0 0 | | 0 0 0 | | | | 1 | | 0 0 | | 0 0 0 | |
| +---+ | 0 0 | | 0 0 0 | | | +---+ | 0 0 | | 0 0 0 | |
| +-----+ | 0 0 0 | | | +-----+ | 0 0 0 | |
| +--------+ | | +--------+ |
+------------------------------+ +-----------------------------+

使用下面的示例程式碼也可以看到如何聚合。

def test_gather_tensors():
a = torch.zeros(1, 1)
b = torch.zeros(1, 1)
ab = gather([Batch(a), Batch(b)]) assert ab.size() == (2, 1) def test_gather_tuples():
a = (torch.zeros(1, 1), torch.zeros(2, 2))
b = (torch.zeros(1, 1), torch.zeros(2, 2))
ab = gather([Batch(a), Batch(b)]) assert isinstance(ab, tuple)
assert ab[0].size() == (2, 1)
assert ab[1].size() == (4, 2)

0x02 執行

我們接下來看看執行時的一些基礎設施,具體包括 Stream,Task,Worker。

2.1 Stream

Stream 類是用來封裝 CUDA stream 和 CPU stream。程式碼位於:torchgpipe/stream.py。

CUDA流表示一個GPU操作佇列,即某個裝置繫結的,按照順序執的核(kernel)序列。我們可以把一個流看作是GPU之上的一個任務。使用者向流的佇列上新增一系列操作,GPU會按照新增到流中的先後順序而依次執行這一系列操作。在同一個流之中,所有操作是序列序列化,因此這些操作永遠不會並行。因此,要想並行,兩個操作必須位於不同的 stream 中。不同流中的核函式可以交錯,甚至可能重疊。

class CPUStreamType:
pass # The placeholder on place of streams for the CPU device instead of CUDA.
CPUStream = CPUStreamType() # It represents both CUDA streams and the CPU stream.
AbstractStream = Union[torch.cuda.Stream, CPUStreamType]

本文用到的相關操作為 use_stream。

torch.cuda.stream(stream) 的作用是選擇給定流的上下文管理器。

@contextmanager
def use_stream(stream: AbstractStream) -> Generator[None, None, None]:
""":func:`torch.cuda.stream` for either CPU or CUDA stream."""
if not is_cuda(stream):
yield
return with torch.cuda.stream(as_cuda(stream)):
yield def is_cuda(stream: AbstractStream) -> bool:
"""Returns ``True`` if the given stream is a valid CUDA stream."""
return stream is not CPUStream def as_cuda(stream: AbstractStream) -> torch.cuda.Stream:
"""Casts the given stream as :class:`torch.cuda.Stream`."""
return cast(torch.cuda.Stream, stream)

2.2 Task

Task 表示如何在一個分割槽上計算微批次資料(micro-batch)。它由兩部分組成:

  • compute應在工作執行緒中併發執行。
  • finalize應在工作執行緒完成後執行。

可以理解為一個業務處理邏輯。如果有安卓經驗的同學,可以理解為類似於 業務Message。其實 Android message也叫task,其封裝了本任務攜帶的資訊和處理該任務的handler。

這裡的 Task 也是類似的,在構建Task 時候,就傳入了 compute 方法和finalize方法,舉例如下:

task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)

或者如下:

def compute(batch: Batch = batch,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> Batch:
with use_skip_tracker(skip_tracker):
return batch.call(partition) task = Task(streams[j], compute=compute, finalize=None)

具體Task定義如下,Task是繫結在 Stream 之上,即可以執行在任何device之上,這就用到了上一節的內容。

class Task:
"""A task represents how to compute a micro-batch on a partition. It consists of two parts: :meth:`compute` and :meth:`finalize`.
:meth:`compute` should be executed in worker threads concurrently.
:meth:`finalize` should be executed after when worker threads complete to
execute :meth:`compute`. :meth:`compute` might be boosted by worker threads. Because it produces
several CUDA API calls by user code. In PyTorch, parallel CUDA API calls
are not serialized through GIL. So more than one CUDA API call can be
produced at the same time.
""" def __init__(self,
stream: AbstractStream,
*,
compute: Callable[[], Batch],
finalize: Optional[Callable[[Batch], None]],
) -> None:
self.stream = stream
self._compute = compute
self._finalize = finalize def compute(self) -> Batch:
with use_stream(self.stream): # 繫結在stream之上
return self._compute() # 呼叫傳入的業務程式碼 def finalize(self, batch: Batch) -> None:
if self._finalize is None:
return
with use_stream(self.stream): # 繫結在stream之上
self._finalize(batch) # 呼叫傳入的業務程式碼

2.3 Worker

worker是用來執行task的,每個 device 有一個 worker 來負責執行這個 device 上的 task。如果有安卓經驗的同學,可以理解為是 Looper。

需要注意,worker只是一個函式,如果執行,還需要一個執行緒作為寄託。這就是後續 spawn_workers 的工作。

def worker(in_queue: InQueue,
out_queue: OutQueue,
device: torch.device,
grad_mode: bool,
) -> None:
"""The main loop of a worker thread."""
torch.set_grad_enabled(grad_mode) with use_device(device):
while True:
task = in_queue.get() # 從輸入佇列中獲取task if task is None:
break try:
batch = task.compute() # 計算task
except Exception:
exc_info = cast(ExcInfo, sys.exc_info())
out_queue.put((False, exc_info))
continue out_queue.put((True, (task, batch))) # 把task和計算結果放到輸出佇列 done = (False, None)
out_queue.put(done)

2.4 生成 worker

這裡使用了 @contextmanager 註解,這是實現了上下文管理協議的物件,主要用於儲存和恢復各種全域性狀態,關閉檔案等,併為try...except...finally提供了一個方便使用的封裝。

spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函式是 worker

spawn_workers 不止生成了若干 workers,也生成了一對訊息佇列 (in_queues, out_queues) ,這個 (in_queues, out_queues) 在Pipeline 生命週期之內全程都存在,具體來說是:

  • spawn_workers 內部會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是序列來執行業務操作。
in_queue, out_queue = workers[device]
  • 這些 queues 被新增到 (in_queues, out_queues) 之中。
in_queues.append(in_queue)
out_queues.append(out_queue)
  • 之後就是使用 (in_queues, out_queues) 作為各個task 之間傳遞資訊的上下文。

  • in_queues 裡面的順序就是 device 的順序,也就是partition的順序。out_queues 亦然。

具體程式碼如下:

@contextmanager
def spawn_workers(devices: List[torch.device],
) -> Generator[Tuple[List[InQueue], List[OutQueue]], None, None]:
"""Spawns worker threads. A worker thread is bound to a device."""
in_queues: List[InQueue] = []
out_queues: List[OutQueue] = [] # Spawn workers.
workers: Dict[torch.device, Tuple[InQueue, OutQueue]] = {} def normalize_device(device: torch.device) -> torch.device:
if device.type == 'cuda' and device.index is None:
return torch.device('cuda', index=torch.cuda.current_device()) if device.type == 'cpu' and device.index is not None:
return torch.device('cpu') return device for device in devices:
device = normalize_device(device) # 得到使用的裝置 try:
in_queue, out_queue = workers[device] # 臨時放置queue
except KeyError: # 如果 device 還沒有生成對應的queues,則生成
in_queue = Queue() # 生成新的queue
out_queue = Queue() # 取出queue
workers[device] = (in_queue, out_queue) # 賦值給workers t = Thread(
target=worker, # Thread的執行程式是 worker 函式
args=(in_queue, out_queue, device, torch.is_grad_enabled()),
daemon=True,
)
t.start() # 啟動工作執行緒 in_queues.append(in_queue) # 插入queue
out_queues.append(out_queue) # 插入queue try:
yield (in_queues, out_queues) # 返回給呼叫者
finally:
# Close workers.
for in_queue in set(in_queues):
in_queue.put(None) # Join running workers.
running = set(out_queues)
while running:
out_queue = running.pop()
ok, payload = out_queue.get() done = (False, None)
if (ok, payload) == done:
continue running.add(out_queue)

2.5 使用

2.5.1 何時生成worker

使用例子位於 torchgpipe/pipeline.py,在 Pipeline 類之中的 run 函式中會生成workers。我們可以看到,對於 Pipeline 來說,有意義的就是 (in_queues, out_queues)。

    def run(self) -> None:
"""Runs pipeline parallelism. It modifies the given batches in place. """
batches = self.batches
partitions = self.partitions
devices = self.devices
skip_layout = self.skip_layout m = len(batches)
n = len(partitions) skip_trackers = [SkipTrackerThroughPotals(skip_layout) for _ in batches] with spawn_workers(devices) as (in_queues, out_queues): # 生成 workers,並且得到佇列
for schedule in clock_cycles(m, n): # 這裡是按照演算法有次序的執行多個fence, compute
self.fence(schedule, skip_trackers)
# 把佇列傳遞進去
self.compute(schedule, skip_trackers, in_queues, out_queues)

2.5.2 剖析

Torchgpipe 使用了 Python 的 Queue 資料結構。

Queue 類實現了一個基本的先進先出(FIFO)容器。

A multi-producer, multi-consumer queue.

其主要方法是:

  • Queue.get([block, [timeout]]) 讀佇列,從佇列尾部移除元素,timeout為等待時間,如果佇列滿,則阻塞。
  • Queue.put(item, [block, [timeout]]) 寫佇列,將元素新增到序列尾端,timeout為等待時間,如果佇列空,則阻塞。

我個人更習慣於把 (in_queues, out_queues) 理解為類似 Linux 的 管道(Pipe)。

Linux 管道是一種最基本的IPC機制,作用於有血緣關係的程序之間,完成資料傳遞,具體特性如下:

  • 管道是由核函式管理的一個FIFO檔案,其實是一個緩衝區,相當於我們放入記憶體中的一個管道,兩個程序分別處於管道兩端,通過這個管道來傳遞資訊。
  • 管道的一端連線一個程序的輸出。這個程序會向管道中放入資訊。當管道被放滿資訊的時候,嘗試放入資訊的程序會等待,直到另一端的程序取出資訊。
  • 管道的另一端連線另一個程序的輸入,這個程序取出被放入管道的資訊。當管道中沒有資訊的話,從管道中讀取的程序會等待,直到另一端的程序放入資訊。

具體回到 TorchPipe,我們提前看看論文的內容:

對於這種細粒度的順序控制,torchgpipe把checkpointing 使用兩個單獨的autograd函式Checkpoint和Recompute來實現。在任務 \(F^{'}_{i,j}\) 的執行時間之內,生成一對具有共享記憶體的Checkpoint和Recompute。該共享記憶體在向後傳播中被使用,用於將通過執行Recompute生成的本地計算圖傳輸到Checkpoint來進行反向傳播。

於是,這裡就有很多並行處理的需求,於是我們可以看到 Pipeline 類的 compute 方法(省略部分程式碼)中有向 in_queues 之中放入 Task,從 out_queues 之中去除 Task 的執行結果

    def compute(self,
schedule: List[Tuple[int, int]],
skip_trackers: List[SkipTrackerThroughPotals],
in_queues: List[InQueue],
out_queues: List[OutQueue],
) -> None: # With checkpointing, the autograd graph looks like this diagram:
# ┌─────┸──────┐
# │ Copy │
# └─────┰──────┘ (fence)
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┃ (compute)
# ┌─────┸──────┐
# │ Wait │ [1] Synchronize the current stream with the copy stream.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Checkpoint │ [2] Compute a partition within checkpointing.
# └─────┰──────┘
# ┌─────┸──────┐
# │ Wait │ [3] Synchronize the copy stream with the current stream.
# └─────┰──────┘
# ┠ ─ ─ ─ ┐
# ┃ ┌─────┴─────┐
# ┃ │ Recompute │ [4] Schedule the recomputation at backpropagation.
# ┃ └─────┬─────┘
# ┠ ─ ─ ─ ┘
# ┃
# ─ ─ ─ ╂ ─ ─ ─ ─ ─ ─ ─ ─ ─
# ┌─────┸──────┐ (fence)
# │ Copy │
# └─────┰──────┘
for i, j in schedule: # 並行執行
batch = batches[i]
partition = partitions[j] # Synchronize with the copied input. ([1] in the diagram) # Determine whether checkpointing or not.
if checkpoint:
def function(input: TensorOrTensors,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> TensorOrTensors:
with use_skip_tracker(skip_tracker):
return partition(input) chk = Checkpointing(function, batch)
# 生成一個Task
task = Task(streams[j], compute=chk.checkpoint, finalize=chk.recompute)
del function, chk else:
def compute(batch: Batch = batch,
partition: nn.Sequential = partition,
skip_tracker: SkipTrackerThroughPotals = skip_trackers[i],
) -> Batch:
with use_skip_tracker(skip_tracker):
return batch.call(partition)
# 生成一個Task
task = Task(streams[j], compute=compute, finalize=None)
del compute # Compute tasks in parallel. ([2] in the diagram)
in_queues[j].put(task) # 給第j個partition放入一個新的task。因為 i, j 已經在clock演算法中設定了,所以前向傳播就是按照這個來走的。 for i, j in schedule:
ok, payload = out_queues[j].get() # 取出第j個partition的執行結果
# ....... # 省略後續程式碼

2.6 總結

我們總結梳理一下大致業務邏輯(後文還會細化):

  1. 系統呼叫 spawn_workers 來生成若干 workers。
  2. spawn_workers 為每個 device 生成了一個 Thread,這個 Thread 的執行函式是 worker。spawn_workers 內部也會針對每一個device生成一個 in_queue, out_queue。所以可保證每個device之上是序列來執行業務操作。
  3. 這些 queues 被新增到 (in_queues, out_queues) 之中。然後把 (in_queues, out_queues) 返回給 Pipeline 主執行緒。之後就是使用 (in_queues, out_queues) 作為各個task 之間傳遞資訊的上下文。
  4. Pipeline 主執行緒得到 (in_queues, out_queues) 之後,如果要通過 compute 方法執行一個Task,就找到其device對應的in_queue,把Task插進去。
  5. Worker Thread 阻塞在 in_queue 之上,如果發現有內容,就讀取 Task,執行Task。
  6. Worker Thread 把執行結果插入到 out_queue之中。
  7. Pipeline 的 compute 方法會取出 out_queue 之中的執行結果,進行後續處理。

如下圖所示:

                           +-------------------------------------------------------------------------+
| 1 |
| +--------------------------------------------------------------+ |
| | 3 (in_queues, out_queues) | |
| v | v
+--------------------------------+---------+ +------+----+-----------------------------------------------------------------------+
| Pipeline | | | spawn_workers |
| | | | |
| | | | +-------------------------------------+ |
| | | | | workers | |
| | | | | | t = Thread( |
| + | | | | target=worker, |
| spawn_workers(devices) | | | device 1 : in_queue 1, out_queue 1 | args=(in_queue, out_queue, device), |
| | | | | daemon=True, |
| | | | device 2 : in_queue 2, out_queue 2 | ) |
| +--------------------------------------+ | | | | t.start() |
| | compute | | | | device 3 : in_queue 3, out_queue 3 | + |
| | | | | | | | |
| | | | 4 | | | | |
| | in_queues[j].put(task) +-----------------------+ | +-------------------------------------+ | |
| | | | | +-----------------------------------------------------------------------------------+
| | | | | | 2
| | ok, payload = out_queues[j].get()<--------+ | +---------------------+ |
| | | | | | | in_queues | v
| +--------------------------------------+ | | | | |
| | | +------------> in_queue 1 +--------+ +---------------------------------------------------------------------+
+------------------------------------------+ | | in_queue 2 | | | Thread |
| | in_queue 3 | | | |
| | | | 5 | +------------------------------------------------------------+ |
| 7 +---------------------+ | | | Worker | |
| +---------------------+ | | | | |
| | out_queues | | | | device 1 task = in_queue.get() | |
| | | | task | | | |
+------------------+ out_queue 1 <--+ | +----------------------> in_queue 1 batch = task.compute() | |
(True, (task,,batch)) | out_queue 2 | | | | | |
| out_queue 3 +---------------------------+ out_queue 1 out_queue.put((True, (task, batch))) | |
| | 6 | | | |
+---------------------+ | +------------------------------------------------------------+ |
+---------------------------------------------------------------------+

手機如下:

至此,我們分析瞭如何切分資料和一些執行時機制,下一篇我們結合論文看看具體實現。

0xFF 參考

Markdown公式用法大全

markdown中公式編輯教程

https://docs.nvidia.com/cuda/cuda-runtime-api/stream-sync-behavior.html#stream-sync-behavior

CUDA學習:基礎知識小結

CUDA隨筆之Stream的使用

NVIDIA解決方案架構師深度解析大規模引數語言模型Megatron-BERT

Accelerating Wide & Deep Recommender Inference on GPUs

HugeCTR: High-Performance Click-Through Rate Estimation Training

https://discuss.pytorch.org/t/how-to-prefetch-data-when-processing-with-gpu/548

https://github.com/NVIDIA/apex/

https://github.com/justheuristic/prefetch_generator

https://pytorch.org/tutorials/intermediate/model_parallel_turotial.html

https://pytorch.org/docs/stable/autograd.html

https://pytorch.org/docs/notes/cuda.html

https://zhuanlan.zhihu.com/p/61765561

https://pytorch.apachen.org/docs/1.7/64.html

https://zhidx.com/p/217999.html