[原始碼解析] 深度學習流水線並行 PipeDream(3)--- 轉換模型

0x00 摘要

在前文中,我們介紹了PipeDream的總體架構,Profile階段和計算分割槽階段。本文我們介紹模型轉換階段。

流水線並行其他文章連結如下:

[原始碼解析] 深度學習流水線並行Gpipe(1)---流水線基本實現

[原始碼解析] 深度學習流水線並行GPipe (2) ----- 梯度累積

[原始碼解析] 深度學習流水線並行 GPipe(3) ----重計算

[原始碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[原始碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分割槽

0x01 前言

1.1 改進

模型轉換階段是PipeDream相對於GPipe的一個改進,讓我們分析一下。

  • GPipe的流水線劃分(模型具體層的分配),可以理解為是一個程式執行前的,介於動態和靜態之間的一個預處理,對於使用者來說不是透明的。
  • PipeDream的模型層分配,則是依據profile結果把同一個stage的所有層打包統一生成一個Pytorch模型python檔案,也屬於預處理。但是無疑比GPipe更方便清晰,使用者也可以二次手動調整。

PipeDream 模型轉換的基本思路是:

  • 從模型檔案之中載入模型DAG圖進入記憶體。
  • 按照stage對圖進行處理,把整體DAG圖分離開來。因為在前文中,已經把模型的層分配到了各個Stage之上,所以本階段就是使用 partition_graph 把每個Stage所包含的層分離出來。
  • 對個每個stage的子圖來應用模板檔案,每個stage子圖生成一個python檔案。在main函式之中,對於每個子圖,將其轉換為一個Pytorch Module,就對應著一個python檔案。就是說,每一層都是這個 Module 的一個子模組。
  • 融合模型,把各個Stage的子圖合併起來,生成總體的模型檔案。前一部分中,生成了若干module的python檔案,都對應了一個subgraph,本節的作用就是把這些子圖合併成一個大圖,對應到python程式碼,就是生成一個新python檔案,裡面把各個subgraph的python 引入,生成一個總module檔案。
  • 輸出一個 init 檔案,這樣更容易處理。
  • 生成相關配置檔案,比如資料並行配置檔案,模型並行配置檔案。

具體如下圖:

Model File +-----------+  +-----------+  +------------+  +-----------+  +------------+  +----------+
| Edge 1 | | Edge 2 | | Edge 3 | | Edge 4 | | Edge 5 | | Edge 6 |
+-----------+ +-----------+ +------------+ +-----------+ +------------+ +----------+
+-----------+ +-----------+ +------------+ +-----------+ +------------+
|Node 1 | |Node 2 | | Node 3 | |Node 4 | |Node 5 |
| stage 1 | | stage 1| | stage 2| | stage 2| | stage 3|
+-----+-----+ +------+----+ +--------+---+ +-+---------+ +------+-----+
| | | | |
| | | | |
+---------------------------------------------------------------------------------------------------+
| | | | |
Subgraphs +----+ +----+ | | |
| | | | |
v v v v v
+----+-----+-----+ +----+--------+-+ +-------+-------+
| Subgraph 1 | | Subgraph 2 | | Subgraph 3 |
| | | | | |
| Node 1 | | Node 3 | | Node 5 |
| | | | | |
| Node 2 | | Node 4 | | |
| | | | | |
+-------+--------+ +---------+-----+ +-------+-------+
| | |
| | |
+---------------------------------------------------------------------------------------------------+
| | |
Modules | | |
v v v
+-------+-------+ +-------+-------+ +------+--------+
| | | | | |
| Module 1 | | Module 2 | | Module 3 |
| | | | | |
+-------+-------+ +-------+-------+ +--------+------+
| | |
| | |
+----------------------------------------------------------------------------------------------------+
| | |
Python files | | |
v v v
+-----+------+ +------+------+ +------+------+
| | | | | |
| stage1.py | | stage2.py | | stage3.py |
| | | | | |
+-----+------+ +------+------+ +------+------+
| | |
| | |
| | |
+-------------------------------------------------------+
|
|
v
+--------+--------+
| gnmt.py |
| |
+-----------------+

手機如下:

1.2 前文回顧

為了更好的說明,我們要回憶下上文的輸出。

輸出檔案如下(摘錄部分),可以看到,輸出檔案依然和profile輸出檔案類似,是一個圖。關鍵之處在於給每一個節點加上了stage,本文環節就要把這個輸出檔案換成成一個Pytorch模型,或者說是一套python檔案

node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000 -- stage_id=0
node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000 -- stage_id=1
node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1
node7 -- LSTM(2048, 1024) -- forward_compute_time=3.190, backward_compute_time=5.348, activation_size=6553600.0, parameter_size=50364416.000 -- stage_id=2
node8 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node10 -- Dropout(p=0.2) -- forward_compute_time=0.064, backward_compute_time=0.128, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node11 -- LSTM(1024, 1024) -- forward_compute_time=2.491, backward_compute_time=4.203, activation_size=6553600.0, parameter_size=33587200.000 -- stage_id=3
node12 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=3
node14 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4
node15 -- Dropout(p=0.2) -- forward_compute_time=0.059, backward_compute_time=0.121, activation_size=6291456.0, parameter_size=0.000 -- stage_id=4
node16 -- LSTM(1024, 1024) -- forward_compute_time=2.492, backward_compute_time=4.201, activation_size=6553600.0, parameter_size=33587200.000 -- stage_id=4
node17 -- __getitem__(0) -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=5
node19 -- Add -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=6291456.0, parameter_size=0.000 -- stage_id=5
node1 -- node4
node4 -- node5
node2 -- node5
node5 -- node6
node6 -- node7
node7 -- node8
node8 -- node10
node10 -- node11
node11 -- node12
node12 -- node14
node8 -- node14
node14 -- node15
node15 -- node16
node16 -- node17
node17 -- node19

0x02 合成模型

具體合成模型程式碼在 optimizer/convert_graph_to_model.py。

2.1 主體邏輯

主體邏輯如下:

  • 獲取配置
  • 從graph檔案中載入,得到一個圖
  • 分割圖為一系列子圖
  • 把子圖轉換成模組
  • 合併子圖,生成一個總體模型檔案
  • 生成__init__.py
  • 生成配置檔案

我們先看看原始碼,後續會仔細分析

if __name__ == '__main__':
parser = argparse.ArgumentParser(
description="Convert profile graphs to generated model description")
parser.add_argument('-f', "--profile_filename", required=True,
help="Input profile filename")
parser.add_argument("--model_template_filename", default="templates/model.py.template",
help="Model template filename")
parser.add_argument("--init_template_filename", default="templates/__init__.py.template",
help="__init__.py template filename")
parser.add_argument("--conf_template_filename", default="templates/conf.json.template",
help="Conf template filename")
parser.add_argument("--stage_to_num_ranks_map", type=str, default=None,
help="Stage split")
parser.add_argument('-n', "--model_name", required=True,
help="Name of model class")
parser.add_argument('-a', "--arch", required=True,
help="Human-readable architecture name")
parser.add_argument('-o', "--output_directory", required=True,
help="Full path of output model directory")
args = parser.parse_args() # mkdir output_directory.
subprocess.check_output("mkdir -p %s" % args.output_directory, shell=True) # 從graph檔案中載入,得到一個圖
input_node = graph.Node("input_node", node_desc="Input")
full_graph = graph.Graph.from_str(open(args.profile_filename, 'r').read())
initialize_weights = (args.arch == "vgg16" or args.arch == "resnet50")
input_node.stage_id = 0
sinks = full_graph.sinks()
# Remove all unneeded sinks that are not used, makes code generation easier.
for sink in sinks:
if sink.node_desc.startswith("__getitem__"):
full_graph.remove_node(sink) # 分割圖為一系列子圖
subgraphs = full_graph.partition_graph() # 把子圖轉換成模組
for i, subgraph in enumerate(subgraphs):
module_name = "Stage%d" % i
module_filename = "stage%d.py" % i
if len(subgraphs) == 1:
module_name = args.model_name
module_filename = "%s.py" % args.arch
num_inputs, num_outputs = convert_subgraph_to_module(subgraph, full_graph, len(subgraphs),
module_name, initialize_weights,
args.model_template_filename,
os.path.join(args.output_directory,
module_filename))
print("Done generating %s..." % module_filename) # 合併子圖,生成一個總體模型檔案。
model = []
import_statements = ["from .%s import %s" % (args.arch, args.model_name)]
pytorch_modules = None
if len(subgraphs) > 1:
python_modules, pytorch_modules, subgraph_inputs, subgraph_outputs = \
fuse_subgraphs_to_module(full_graph, subgraphs, args.model_name,
initialize_weights,
args.model_template_filename,
os.path.join(args.output_directory,
"%s.py" % args.arch))
model = ["(%s(), [%s], [%s])" % (x[0],
", ".join(["\"%s\"" % y for y in x[1]]),
", ".join(["\"%s\"" % y for y in x[2]]))
for x in zip(pytorch_modules, subgraph_inputs,
subgraph_outputs)]
model.append("(criterion, [\"%s\"], [\"loss\"])" % subgraph_outputs[-1][0])
import_statements.extend(
["from .%s import %s" % (python_module, pytorch_module)
for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)])
else:
inputs = ["\"input%d\"" % i for i in range(num_inputs)]
assert(num_outputs == 1)
model.append("(%s.%s(), [%s], [\"output\"])" % (args.arch, args.model_name, ", ".join(inputs)))
model.append("(criterion, [\"output\"], [\"loss\"])") # 生成__init__.py
with open(os.path.join(args.output_directory, "__init__.py"), 'w') as f1, \
open(args.init_template_filename, 'r') as f2:
template = f2.read()
init = template % {
"arch": args.arch,
"import_statements": "\n".join(import_statements),
"model": ",\n ".join(model),
"full_model": "%s()" % args.model_name
}
f1.write(init) # 生成配置檔案
if args.stage_to_num_ranks_map is not None:
stage_to_num_ranks_map = args.stage_to_num_ranks_map.split(",")
stage_to_num_ranks_map = [(int(x.split(":")[0]), int(x.split(":")[1]))
for x in stage_to_num_ranks_map]
num_stages = 0
for (stage_id, replication_factor) in stage_to_num_ranks_map:
num_stages += replication_factor
assert(len(stage_to_num_ranks_map) == len(pytorch_modules))
num_modules = len(pytorch_modules) + 1 # Add 1 for criterion.
elif pytorch_modules is None:
num_stages = 1
num_modules = 2 # Add 1 for criterion.
else:
num_stages = len(pytorch_modules)
num_modules = len(pytorch_modules) + 1 # Add 1 for criterion.
all_template_args = []
all_template_args.append({
"module_to_stage_map": [0] * num_modules,
"stage_to_rank_map": str({"0": list(range(num_stages))}).replace("'", "\"")
})
all_template_args.append({
"module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
"stage_to_rank_map": str({str(i): [i] for i in range(num_modules-1)}).replace("'", "\"")
})
if args.stage_to_num_ranks_map is not None:
stage_to_rank_map = {}
ranks_so_far = 0
for i in range(num_modules-1):
stage_to_rank_map[str(i)] = list(range(ranks_so_far,
ranks_so_far + stage_to_num_ranks_map[i][1]))
ranks_so_far += stage_to_num_ranks_map[i][1]
stage_to_rank_map = str(stage_to_rank_map).replace("'", "\"")
all_template_args.append({
"module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
"stage_to_rank_map": stage_to_rank_map
})
for conf_filename, template_args in zip(
["dp_conf.json", "mp_conf.json", "hybrid_conf.json"], all_template_args):
with open(os.path.join(args.output_directory, conf_filename), 'w') as f1, \
open(args.conf_template_filename, 'r') as f2:
template = f2.read()
conf = template % template_args
f1.write(conf)

2.2 支撐邏輯

首先我們看看兩個陣列。

  • declaration_whitelist 是一個白名單,如果某節點在這個白名單之中,則不需要在 init 函式之中進行處理。
  • declaration_specialcase 陣列包括一些特殊定義,如果某節點是需要特殊定義的,就進行特殊轉換,比如import,層定義等等。
declaration_whitelist = [
"hidden",
"__getitem__",
"Add",
"Mul",
"Concat",
"Input",
"Size",
"View",
"Transpose",
"self.get_seq_lens"
] declaration_specialcase = [
"EmuBidirLSTM",
"RecurrentAttention",
"Classifier",
"MaskConv",
"ResizeInput",
"InferenceBatchSoftmax",
"BatchRNN",
"SequenceWise"
]

其次我們看看get_input_names方法。

get_input_names 函式遍歷graph的節點,找到這個子圖的輸入。

def get_input_names(graph, full_graph, check_stages=True):
# Figure out the inputs to this sub-graph, which are the predecessors of
# nodes in the sub-graph not in the sub-graph.
# input_names is a dict mapping each predecessor's node_id to assigned
# variable name.
nodes = graph.nodes
input_names = {}
counter = 0
for node_id in nodes:
if (node_id in full_graph.in_edges and
len(full_graph.in_edges[node_id]) > 0):
for in_node in full_graph.in_edges[node_id]:
if in_node.stage_id != nodes[node_id].stage_id and check_stages:
# Skip hidden inputs.
if full_graph.nodes[in_node.node_id].node_desc.startswith("hidden"):
continue
input_names[in_node.node_id] = "input%d" % counter
counter += 1
else:
if graph.nodes[node_id].node_desc.startswith("Input"):
input_names[node_id] = "input%d" % counter
counter += 1
return input_names

0x03 模型轉換

我們接下來看看具體模型轉換。

3.1 分離子圖

首先,main函式需要按照stage來分離子圖。

因為在前文中,已經把模型的層分配到了各個Stage之上,所以本階段就是使用 partition_graph 把每個Stage所包含的層分離出來。

input_node = graph.Node("input_node", node_desc="Input")
full_graph = graph.Graph.from_str(open(args.profile_filename, 'r').read())
initialize_weights = (args.arch == "vgg16" or args.arch == "resnet50")
input_node.stage_id = 0
sinks = full_graph.sinks()
# Remove all unneeded sinks that are not used, makes code generation easier.
# 去除沒有使用的sink
for sink in sinks:
if sink.node_desc.startswith("__getitem__"):
full_graph.remove_node(sink) subgraphs = full_graph.partition_graph()

partition_graph 對應的程式碼具體邏輯為:

  • 遍歷節點,找到所有的stage。
  • 得到所有stage id之後,按照stage id來構建子圖,具體就是針對給定的stage,在所有節點中查詢對應stage的節點,構建一個子圖。
    def partition_graph(self):
stage_ids = set()
# 遍歷節點,找到所有的stage
for node_id in self.nodes:
stage_ids.add(self.nodes[node_id].stage_id)
# stage_ids 為 {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
if len(stage_ids) == 1:
return [self.copy()]
subgraphs = []
# 按照stage構建子圖
for stage_id in stage_ids:
subgraphs.append(self.partition_graph_helper(stage_id))
return subgraphs # 針對給定的stage,在所有節點中查詢對應stage的節點,構建一個子圖
def partition_graph_helper(self, stage_id):
subgraph = Graph()
for node1_id in self.nodes:
if self.nodes[node1_id].stage_id == stage_id:
subgraph.add_node(self.nodes[node1_id])
if node1_id not in self.edges: continue
for node2 in self.edges[node1_id]:
if node2.stage_id == stage_id:
subgraph.add_edge(self.nodes[node1_id], node2)
return subgraph

得到子圖為:

subgraphs = {list: 10} 

 00 = {Graph}
'node4' = {Node} node4 -- Embedding(32320, 1024, padding_idx=0) -- forward_compute_time=0.073, backward_compute_time=6.949, activation_size=6291456.0, parameter_size=132382720.000 -- stage_id=0
'node1' = {Node} node1 -- Input0 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000 -- stage_id=0
'node2' = {Node} node2 -- Input1 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000 -- stage_id=0
'node3' = {Node} node3 -- Input2 -- forward_compute_time=0.000, backward_compute_time=0.000, activation_size=0.0, parameter_size=0.000 -- stage_id=0
__len__ = {int} 4 01 = {Graph} node5
edges = {dict: 1} {'node5': [<graph.graph.Node object at 0x7f9c5be91438>]}
in_edges = {dict: 1} {'node6': [<graph.graph.Node object at 0x7f9c5be91470>]}
nodes = {dict: 2} {'node5': <graph.graph.Node object at 0x7f9c5be91470>, 'node6': <graph.graph.Node object at 0x7f9c5be91438>}
'node5' = {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000 -- stage_id=1
'node6' = {Node} node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1
__len__ = {int} 2 ......

3.2 轉換模型

在main函式之中,對於每個子圖,將其轉換為一個Pytorch Module,就對應著一個python檔案。

就是說,每一層都是這個 Module 的一個子模組。

for i, subgraph in enumerate(subgraphs): # 遍歷每一個子圖
module_name = "Stage%d" % i
module_filename = "stage%d.py" % i
if len(subgraphs) == 1:
module_name = args.model_name
module_filename = "%s.py" % args.arch
# 把這個子圖轉換成一個module
num_inputs, num_outputs = convert_subgraph_to_module(subgraph, full_graph, len(subgraphs),
module_name, initialize_weights,
args.model_template_filename,
os.path.join(args.output_directory,
module_filename))
print("Done generating %s..." % module_filename)

轉換模型邏輯如下:假如輸入為一個graph,裡面包含了若干nodes,convert_subgraph_to_module 會把這個graph 轉換成為一個module。

graph = {Graph}
edges = {dict: 1}
in_edges = {dict: 1}
marked_nodes = {set: 2}
nodes = {dict: 2}
'node5' = {Node} node5 -- EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024)) -- forward_compute_time=5.247, backward_compute_time=0.016, activation_size=12582912.0, parameter_size=67174400.000 -- stage_id=1
'node6' = {Node} node6 -- Dropout(p=0.2) -- forward_compute_time=0.077, backward_compute_time=0.196, activation_size=12582912.0, parameter_size=0.000 -- stage_id=1
__len__ = {int} 2

我們逐一分析。

3.2.1 轉換Module

轉換Module邏輯如下:

  • get_input_names 函式遍歷graph,找到這個子圖的輸入。
  • 如果節點在輸入中,則構建forward函式定義部分,為後續生成程式碼做準備。得到 function_definition 類似為 ['out0 = input0.clone()', 'out1 = input1.clone()']。
  • 遍歷圖中的節點,做如下操作,基本就是依據節點性質,生成各種python語句:
    • 得到每一層的相關資訊,比如名字,輸出,是不是inplace操作。
    • 如果某節點是需要特殊定義的,就進行特殊轉換,比如import,層定義等等
    • 歸併import語句
    • 如果節點描述不在宣告白名單之中,則記錄,後續會在init方法生成時候對這些節點生成構建語句。
    • 得到節點入邊
    • 如果節點在內建運算子之中,直接構造python語句。
    • 如果不是內建運算,就直接設定,比如 'out2 = self.layer2(out0, out1)'
  • 確保模組輸出是按照原始模型的順序輸出。
  • 如果需要初始化權重,則做處理。
  • 應用模版檔案生成模型,就是把前面生成的各種python語句填充到模版檔案之中。
  • 寫入模型python檔案。

下面程式碼註解中,有部分執行時變數的列印。

def convert_subgraph_to_module(graph, full_graph, num_subgraphs, module_name, initialize_weights,
model_template_filename, output_filename):
model_template = open(model_template_filename, 'r').read()
nodes = graph.topological_sort()
import_statements = []
module_methods = [] counter = 0
layer_names = {}
layer_names_and_declarations = []
function_definition = []
# get_input_names 函式遍歷graph,找到這個子圖的輸入
input_names = get_input_names(graph, full_graph)
num_inputs = len(input_names)
output_names = input_names.copy()
sources = graph.sources() # Now, generate expressions for each node.
# Iterate through nodes in topological order, and add output_name mappings for
# each expression. Use this output_name mapping when generating expressions
# in the model's implementation file.
# TODO: Make sure that nodes with multiple inputs have the inputs in the
# right order (even though this probably does not matter in practice). # 構建forward函式定義部分,為後續生成程式碼做準備
for node_id in input_names:
output_name = "out%d" % counter
function_definition.append("%s = %s.clone()" % (output_name,
input_names[node_id]))
output_names[node_id] = output_name
counter += 1
# 得到 function_definition 為 ['out0 = input0.clone()', 'out1 = input1.clone()'] # 遍歷圖中的節點
for node in nodes:
# 層相關資訊
layer_call = None
layer_name = "self.layer%d" % counter
output_name = "out%d" % counter
layer_declaration = "torch.nn.%s" % (
node.node_desc.replace("inplace", "inplace=True"))
layer_names[node.node_id] = layer_name
if node.node_id not in output_names:
output_names[node.node_id] = output_name # Skip layers that don't need a declaration (example: '+=').
for declaration in declaration_specialcase:
# 如果某節點是需要特殊定義的,就進行特殊轉換,比如import,層定義等等
if node.node_desc.startswith(declaration):
found = True
if declaration == "EmuBidirLSTM":
m = re.search(r'.*LSTM\((\d+), (\d+)\).*', node.node_desc)
input_size = int(m.group(1))
hidden_size = int(m.group(2))
layer_declaration = "EmuBidirLSTM(%d, %d)" % (input_size, hidden_size)
import_statements.append("from seq2seq.models.encoder import EmuBidirLSTM")
# 這裡得到 import_statements 為 ['from seq2seq.models.encoder import EmuBidirLSTM'],layer_declaration 為 'EmuBidirLSTM(1024, 1024)' elif declaration == "RecurrentAttention":
m = re.search(r'.*LSTM\((\d+), (\d+)\).*', node.node_desc)
input_size = int(m.group(1))
hidden_size = int(m.group(2))
m = re.search(r'.*in_features=(\d+), out_features=(\d+).*', node.node_desc)
context_size = int(m.group(1))
layer_declaration = "RecurrentAttention(%d, %d, %d)" % (input_size, hidden_size, context_size)
import_statements.append("from seq2seq.models.decoder import RecurrentAttention") # 省略部分程式碼 elif declaration == "InferenceBatchSoftmax":
layer_declaration = "InferenceBatchSoftmax()"
import_statements.append("from model import InferenceBatchSoftmax")
break # 歸併import語句
import_statements = list(set(import_statements))
# 如果節點描述不在宣告白名單之中,則處理
found = False
for declaration in declaration_whitelist:
if node.node_desc.startswith(declaration):
found = True
if not found:
layer_names_and_declarations.append((layer_name, layer_declaration)) # 得到節點入邊
if node.node_id in full_graph.in_edges:
in_edges = full_graph.in_edges[node.node_id]
else:
in_edges = []
if len(in_edges) == 0 and node.node_desc.startswith("Input"):
pass # Don't need to do anything for this case.
else:
# 看看節點是否在內建運算子之中
# node_desc 為 'EmuBidirLSTM( (bidir): LSTM(1024, 1024, bidirectional=True) (layer1): LSTM(1024, 1024) (layer2): LSTM(1024, 1024))' if node.node_desc.startswith("Size"):
assert(len(in_edges) == 1)
m = re.search(r'Size\((-?\d+)\)', node.node_desc)
idx = int(m.group(1))
layer_call = "%s = %s.size(%d)" % (output_name,
output_names[in_edges[0].node_id],
idx) elif node.node_desc.startswith("Add"):
assert(len(in_edges) == 2)
node1 = in_edges[0]
node2 = in_edges[1]
if len(full_graph.edges[node1.node_id]) > 1:
tmp = node1
node1 = node2
node2 = tmp
layer_call = "%s = %s + %s" % (output_names[node1.node_id],
output_names[node1.node_id],
output_names[node2.node_id])
output_names[node.node_id] = output_names[node1.node_id] # 省略部分程式碼 elif node.node_desc.startswith("hidden"):
pass
elif node.node_desc == "self.get_seq_lens":
assert(len(in_edges) == 1)
in_node = in_edges[0]
layer_call = "%s = %s(%s)" % (output_name, node.node_desc, output_names[in_node.node_id])
else:
# 如果不是內建運算,就直接設定,這裡為 'out2 = self.layer2(out0, out1)'
layer_call = "%s = %s(%s)" % (output_name, layer_name,
", ".join([output_names[in_node.node_id]
for in_node in in_edges]))
if layer_call is not None:
function_definition.append(layer_call)
counter += 1 # Ensure that outputs of a module are returned in the same order as
# the original model implementation.
# TODO: This might not work as intended for sub-graphs
# 確保模組輸出是按照原始模型的順序輸出
full_graph.populate_depths()
graph_output_names, _ = get_output_names(graph, full_graph, 0)
for key in graph_output_names:
graph_output_names[key] = output_names[key]
output_names_list = get_tensor_names_list(graph_output_names)
num_outputs = len(output_names_list)
function_definition.append("return %s" %
get_output_tuple_str(output_names_list))
# function_definition 是 ['out0 = input0.clone()', 'out1 = input1.clone()', 'out2 = self.layer2(out0, out1)', 'out3 = self.layer3(out2)', 'return out3'] # Layer declarations are added to the constructor of the module.
# Function definitions are added to the `forward()' method of the
# module.
layer_declarations_str = "\n ".join([
"%s = %s" % (x[0], x[1]) for x in layer_names_and_declarations]) # 如果需要初始化權重,則做處理
if initialize_weights:
layer_declarations_str += "\n self._initialize_weights()"
module_methods.append("""def _initialize_weights(self):
for m in self.modules():
if isinstance(m, torch.nn.Conv2d):
torch.nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
if m.bias is not None:
torch.nn.init.constant_(m.bias, 0)
elif isinstance(m, torch.nn.BatchNorm2d):
torch.nn.init.constant_(m.weight, 1)
torch.nn.init.constant_(m.bias, 0)
elif isinstance(m, torch.nn.Linear):
torch.nn.init.normal_(m.weight, 0, 0.01)
torch.nn.init.constant_(m.bias, 0)""")
function_definition_str = "\n ".join(function_definition) # function_definition_str 為 "\n ".join(function_definition) ['out0 = input0.clone()', 'out1 = input1.clone()', 'out2 = self.layer2(out0, out1)', 'out3 = self.layer3(out2)', 'return out3']
input_names_list = get_tensor_names_list(input_names)
input_names = ", ".join(input_names_list # input_names 為 'input1, input0'
# 應用模版檔案生成模型
model = model_template % {"layer_declarations": layer_declarations_str,
"function_definition": function_definition_str,
"module_name": module_name,
"inputs": input_names,
"import_statements": "\n".join(import_statements),
"module_methods": "\n\n".join(module_methods)}
# 寫入模型python檔案
with open(output_filename, 'w') as f:
f.write(model)
return num_inputs, num_outputs

3.2.2 模版檔案

上面程式碼中,是依據應用模版檔案生成模型,模版檔案位於:optimizer/templates/model.py.template,內容如下,具體就是使用轉換過程中生成的python語句對模版檔案進行填充。

import torch
%(import_statements)s class %(module_name)s(torch.nn.Module):
def __init__(self):
super(%(module_name)s, self).__init__()
%(layer_declarations)s %(module_methods)s def forward(self, %(inputs)s):
%(function_definition)s

3.2.3 生成檔案

前面程式碼之中,如下語句會生成若干模型檔案,每一個subgraph會生成一個python檔案。

# 寫入模型python檔案
with open(output_filename, 'w') as f:
f.write(model)
return num_inputs, num_outputs

得到的生成模型檔案我們舉出兩個檔案如下:

import torch

class Stage0(torch.nn.Module):
def __init__(self):
super(Stage0, self).__init__()
self.layer6 = torch.nn.Embedding(32320, 1024, padding_idx=0) def forward(self, input0, input1, input2):
out0 = input0.clone()
out1 = input1.clone()
out2 = input2.clone()
out6 = self.layer6(out0)
return (out1, out2, out6)

再比如:

import torch
from seq2seq.models.encoder import EmuBidirLSTM class Stage1(torch.nn.Module):
def __init__(self):
super(Stage1, self).__init__()
self.layer2 = EmuBidirLSTM(1024, 1024)
self.layer3 = torch.nn.Dropout(p=0.2) def forward(self, input1, input0):
out0 = input0.clone()
out1 = input1.clone()
out2 = self.layer2(out0, out1)
out3 = self.layer3(out2)
return out3

3.3 融合模型

前一部分中,生成了若干module的python檔案,都對應了一個subgraph,本節的作用就是把這些子圖合併成一個大圖,對應到python程式碼,就是生成一個新python檔案,裡面把各個subgraph的python 引入,生成一個總module檔案。

3.3.1 main函式邏輯

main函式邏輯為:

  • 匯入引數配置,得到類似 ['from .gnmt import pd']。
  • 把子圖融合成一個總體module。
  • 依據融合好的結果拓展 model。
  • 依據融合好的結果拓展import 語句。
model = []

# 匯入引數配置,得到類似 ['from .gnmt import pd']
import_statements = ["from .%s import %s" % (args.arch, args.model_name)]
pytorch_modules = None
if len(subgraphs) > 1:
# 把子圖融合成一個總體module
python_modules, pytorch_modules, subgraph_inputs, subgraph_outputs = \
fuse_subgraphs_to_module(full_graph, subgraphs, args.model_name,
initialize_weights,
args.model_template_filename,
os.path.join(args.output_directory,
"%s.py" % args.arch))
# 依據融合好的結果拓展 model
model = ["(%s(), [%s], [%s])" % (x[0],
", ".join(["\"%s\"" % y for y in x[1]]),
", ".join(["\"%s\"" % y for y in x[2]]))
for x in zip(pytorch_modules, subgraph_inputs,
subgraph_outputs)]
model.append("(criterion, [\"%s\"], [\"loss\"])" % subgraph_outputs[-1][0])
# 依據融合好的結果拓展import 語句
import_statements.extend(
["from .%s import %s" % (python_module, pytorch_module)
for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)])
else:
inputs = ["\"input%d\"" % i for i in range(num_inputs)]
assert(num_outputs == 1)
model.append("(%s.%s(), [%s], [\"output\"])" % (args.arch, args.model_name, ", ".join(inputs)))
model.append("(criterion, [\"output\"], [\"loss\"])")

3.3.2 融合模型

fuse_subgraphs_to_module 生成一個gnmt.py檔案,具體邏輯如下:

  • 載入模版。
  • 歸併模組名稱。
  • 處理函式定義和層定義。
  • 遍歷子圖,構建輸出和輸入。
  • 新增輸出資訊。
  • 新增import資訊。
  • 應用模版檔案。
  • 輸出檔案。

原始碼如下:

def fuse_subgraphs_to_module(graph, subgraphs, model_name, initialize_weights,
model_template_filename, output_filename):
# 載入模版
model_template = open(model_template_filename, 'r').read() # PyTorch modules are the names given to the generated stages (which are
# of type torch.nn.Module).
# Python modules are the names given to the filenames containing these
# generated torch.nn.Modules.
# 歸併模組名稱
pytorch_modules = []
python_modules = []
for i in range(len(subgraphs)):
pytorch_modules.append("Stage%d" % i)
python_modules.append("stage%d" % i) # python_modules = {list: 10} ['stage0', 'stage1', 'stage2', 'stage3', 'stage4', 'stage5', 'stage6', 'stage7', 'stage8', 'stage9']
# pytorch_modules = {list: 10} ['Stage0', 'Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5', 'Stage6', 'Stage7', 'Stage8', 'Stage9'] # 處理函式定義和層定義
layer_declarations = []
function_definition = []
for i, pytorch_module in enumerate(pytorch_modules):
layer_declarations.append("self.stage%d = %s()" % (
i, pytorch_module))
if initialize_weights:
layer_declarations.append("self._initialize_weights()")
# function_definition = {list: 0} []
# layer_declarations = {list: 10} ['self.stage0 = Stage0()', 'self.stage1 = Stage1()', 'self.stage2 = Stage2()', 'self.stage3 = Stage3()', 'self.stage4 = Stage4()', 'self.stage5 = Stage5()', 'self.stage6 = Stage6()', 'self.stage7 = Stage7()', 'self.stage8 = Stage8()', 'self.stage9 = Stage9 output_counter = 0
output_names = {}
graph_input_names = get_input_names(graph, graph, check_stages=False)
for key in graph_input_names:
output_names[key] = graph_input_names[key]
subgraph_inputs = []
subgraph_outputs = []
# 遍歷子圖,構建輸出和輸入
for i, subgraph in enumerate(subgraphs):
subgraph_input_names = get_input_names(subgraph, graph)
subgraph_output_names, output_counter = get_output_names(
subgraph, graph, output_counter)
for key in subgraph_input_names:
subgraph_input_names[key] = output_names[key]
for key in subgraph_output_names:
output_names[key] = subgraph_output_names[key] function_definition.append("%s = self.stage%d(%s)" % (
get_output_tuple_str(get_tensor_names_list(subgraph_output_names)),
i, ", ".join(get_tensor_names_list(subgraph_input_names))))
subgraph_inputs.append(get_tensor_names_list(subgraph_input_names))
subgraph_outputs.append(get_tensor_names_list(subgraph_output_names))
#subgraph_input_names = {dict: 1} {'node47': 'out20'}
#subgraph_inputs = {list: 10} [['input0', 'input1', 'input2'], ['out2', 'out0'], ['out4'], ['out5'], ['out7', 'out6'], ['out8', 'out9', 'out2', 'out3'], ['out10', 'out12'], ['out14', 'out15', 'out16', 'out11'], ['out14', 'out17', 'out18', 'out19'], ['out20']]
#subgraph_output_names = {dict: 1} {'node48': 'out21'}
#subgraph_outputs = {list: 10} [['out2', 'out3', 'out0'], ['out4'], ['out5'], ['out7', 'out6'], ['out8', 'out9'], ['out10', 'out12', 'out11'], ['out14', 'out15', 'out16'], ['out17', 'out18', 'out19'], ['out20'], ['out21']] # 新增輸出資訊
function_definition.append("return %s" %
get_output_tuple_str(get_tensor_names_list(subgraph_output_names)))
function_definition_str = "\n ".join(function_definition)
# 新增import資訊
import_statements = ["from .%s import %s" % (python_module, pytorch_module)
for (python_module, pytorch_module) in zip(python_modules, pytorch_modules)]
input_names = get_input_names(graph, graph, check_stages=False)
input_names = ", ".join(get_tensor_names_list(input_names)) # 應用模版檔案
model = model_template % {"layer_declarations": "\n ".join(layer_declarations),
"function_definition": function_definition_str,
"module_name": model_name,
"inputs": input_names,
"import_statements": "\n".join(import_statements),
"module_methods": ""} # TODO: Figure out if we need to pass in other module_methods here? print("Done with sub-graph fusion...") # 輸出檔案
with open(output_filename, 'w') as f:
f.write(model)
return python_modules, pytorch_modules, subgraph_inputs, subgraph_outputs

3.3.3 輸出

最終融合結果如下:

import torch
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
from .stage4 import Stage4
from .stage5 import Stage5
from .stage6 import Stage6
from .stage7 import Stage7
from .stage8 import Stage8
from .stage9 import Stage9 class pd(torch.nn.Module):
def __init__(self):
super(pd, self).__init__()
self.stage0 = Stage0()
self.stage1 = Stage1()
self.stage2 = Stage2()
self.stage3 = Stage3()
self.stage4 = Stage4()
self.stage5 = Stage5()
self.stage6 = Stage6()
self.stage7 = Stage7()
self.stage8 = Stage8()
self.stage9 = Stage9() def forward(self, input0, input1, input2):
(out2, out3, out0) = self.stage0(input0, input1, input2)
out4 = self.stage1(out2, out0)
out5 = self.stage2(out4)
(out7, out6) = self.stage3(out5)
(out8, out9) = self.stage4(out7, out6)
(out10, out12, out11) = self.stage5(out8, out9, out2, out3)
(out14, out15, out16) = self.stage6(out10, out12)
(out17, out18, out19) = self.stage7(out14, out15, out16, out11)
out20 = self.stage8(out14, out17, out18, out19)
out21 = self.stage9(out20)
return out21

3.4 init 檔案

為了便於使用,系統又生成了 __init__檔案。

就是依據之前的 import_statements,model 等變數進行生成:

變數如下:

model = {list: 11}
00 = {str} '(Stage0(), ["input0", "input1", "input2"], ["out2", "out3", "out0"])'
01 = {str} '(Stage1(), ["out2", "out0"], ["out4"])'
02 = {str} '(Stage2(), ["out4"], ["out5"])'
03 = {str} '(Stage3(), ["out5"], ["out7", "out6"])'
04 = {str} '(Stage4(), ["out7", "out6"], ["out8", "out9"])'
05 = {str} '(Stage5(), ["out8", "out9", "out2", "out3"], ["out10", "out12", "out11"])'
06 = {str} '(Stage6(), ["out10", "out12"], ["out14", "out15", "out16"])'
07 = {str} '(Stage7(), ["out14", "out15", "out16", "out11"], ["out17", "out18", "out19"])'
08 = {str} '(Stage8(), ["out14", "out17", "out18", "out19"], ["out20"])'
09 = {str} '(Stage9(), ["out20"], ["out21"])'
10 = {str} '(criterion, ["out21"], ["loss"])'
__len__ = {int} 11 import_statements = {list: 1} ['from .gnmt import pd']
0 = {str} 'from .gnmt import pd'

程式碼如下:

    with open(os.path.join(args.output_directory, "__init__.py"), 'w') as f1, \
open(args.init_template_filename, 'r') as f2:
template = f2.read()
init = template % {
"arch": args.arch,
"import_statements": "\n".join(import_statements),
"model": ",\n ".join(model),
"full_model": "%s()" % args.model_name
}
f1.write(init)

得到的__init__檔案如下:

from .gnmt import pd
from .stage0 import Stage0
from .stage1 import Stage1
from .stage2 import Stage2
from .stage3 import Stage3
from .stage4 import Stage4
from .stage5 import Stage5
from .stage6 import Stage6
from .stage7 import Stage7
from .stage8 import Stage8
from .stage9 import Stage9 def arch():
return "gnmt" def model(criterion):
return [
(Stage0(), ["input0", "input1", "input2"], ["out2", "out3", "out0"]),
(Stage1(), ["out2", "out0"], ["out4"]),
(Stage2(), ["out4"], ["out5"]),
(Stage3(), ["out5"], ["out7", "out6"]),
(Stage4(), ["out7", "out6"], ["out8", "out9"]),
(Stage5(), ["out8", "out9", "out2", "out3"], ["out10", "out12", "out11"]),
(Stage6(), ["out10", "out12"], ["out14", "out15", "out16"]),
(Stage7(), ["out14", "out15", "out16", "out11"], ["out17", "out18", "out19"]),
(Stage8(), ["out14", "out17", "out18", "out19"], ["out20"]),
(Stage9(), ["out20"], ["out21"]),
(criterion, ["out21"], ["loss"])
] def full_model():
return pd()

3.5 配置檔案

接下來會生成配置檔案,這個是為後續程式執行準備。具體可能會生成 "dp_conf.json", "mp_conf.json", "hybrid_conf.json" 這幾個檔案。

檔案具體內容大致就是:哪個module配置到哪個stage之上,哪個stage配置到rank之上。

3.5.1 程式碼邏輯

主要邏輯是:

  • 如果程式輸入中已經設定瞭如何把stage配置到rank之上,就進行設定。
  • 依據pytorch_modules進行設定stage數目和module數目。
  • 對具體rank, stage, module的分配作出設定。
  • 寫入配置檔案。

其中,pytorch_modules 是fuse_subgraphs_to_module返回的結果。

pytorch_modules = {list: 10} ['Stage0', 'Stage1', 'Stage2', 'Stage3', 'Stage4', 'Stage5', 'Stage6', 'Stage7', 'Stage8', 'Stage9']

程式碼如下:

if args.stage_to_num_ranks_map is not None:
# 如果程式輸入中已經設定瞭如何把stage配置到rank之上,就進行設定
stage_to_num_ranks_map = args.stage_to_num_ranks_map.split(",")
stage_to_num_ranks_map = [(int(x.split(":")[0]), int(x.split(":")[1]))
for x in stage_to_num_ranks_map]
num_stages = 0
for (stage_id, replication_factor) in stage_to_num_ranks_map:
num_stages += replication_factor
assert(len(stage_to_num_ranks_map) == len(pytorch_modules))
num_modules = len(pytorch_modules) + 1 # Add 1 for criterion.
elif pytorch_modules is None:
# 依據pytorch_modules進行設定stage數目和module數目
num_stages = 1
num_modules = 2 # Add 1 for criterion.
else:
num_stages = len(pytorch_modules)
num_modules = len(pytorch_modules) + 1 # Add 1 for criterion. # 對具體rank, stage, module的分配作出設定
all_template_args = []
# 對資料並行進行設定
all_template_args.append({
"module_to_stage_map": [0] * num_modules,
"stage_to_rank_map": str({"0": list(range(num_stages))}).replace("'", "\"")
})
# 對模型配置進行設定
all_template_args.append({
"module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
"stage_to_rank_map": str({str(i): [i] for i in range(num_modules-1)}).replace("'", "\"")
}) # 執行時變數如下:
"""
all_template_args =
0 = {dict: 2}
'module_to_stage_map' = {list: 11} [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
'stage_to_rank_map' = {str} '{"0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}'
1 = {dict: 2}
'module_to_stage_map' = {list: 11} [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9]
'stage_to_rank_map' = {str} '{"0": [0], "1": [1], "2": [2], "3": [3], "4": [4], "5": [5], "6": [6], "7": [7], "8": [8], "9": [9]}'
""" # 如果程式引數做設定,進行處理
if args.stage_to_num_ranks_map is not None:
stage_to_rank_map = {}
ranks_so_far = 0
for i in range(num_modules-1):
stage_to_rank_map[str(i)] = list(range(ranks_so_far,
ranks_so_far + stage_to_num_ranks_map[i][1]))
ranks_so_far += stage_to_num_ranks_map[i][1]
stage_to_rank_map = str(stage_to_rank_map).replace("'", "\"")
all_template_args.append({
"module_to_stage_map": list(range(num_modules-1)) + [num_modules-2],
"stage_to_rank_map": stage_to_rank_map
}) # 寫入配置檔案
for conf_filename, template_args in zip(
["dp_conf.json", "mp_conf.json", "hybrid_conf.json"], all_template_args):
with open(os.path.join(args.output_directory, conf_filename), 'w') as f1, \
open(args.conf_template_filename, 'r') as f2:
template = f2.read()
conf = template % template_args
f1.write(conf)

3.5.2 資料並行

dp_config.json是專門為資料並行生成的配置檔案,舉例如下。

{
"module_to_stage_map": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
"stage_to_rank_map": {"0": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}
}

3.5.3 模型並行

mp_config.json 是專門為模型並行生成的配置檔案,舉例如下。

{
"module_to_stage_map": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 9],
"stage_to_rank_map": {"0": [0], "1": [1], "2": [2], "3": [3], "4": [4], "5": [5], "6": [6], "7": [7], "8": [8], "9": [9]}
}

0x04 總結

最終結果如下,就是把模型圖轉換成每個stage自己對應的python檔案,最後再彙總打包成一個總的python檔案,使用者可以直接使用。

0xFF 參考

[原始碼解析] 深度學習流水線並行之PipeDream(1)--- Profile階段

[原始碼解析] 深度學習流水線並行 PipeDream(2)--- 計算分割槽