1. 程式人生 > >TensorFlow 如何構建高效能的資料輸入管道(Pipeline)

TensorFlow 如何構建高效能的資料輸入管道(Pipeline)

本篇主要介紹怎麼使用 tf.data API 來構建高效能的輸入 pipeline。

tf.data官方教程詳見前面的部落格<<<<<<<<<<tf.data官方教程

目錄

模型單個訓練 step 時間的減少 依賴於 GPU、TPU 的使用。最優效能不僅依賴於高速的計算硬體,也要求有一個高效的輸入管道(Input Pipeline Performance Guide),這個管道在當前step完成前,進行下一個 step 需要的資料的準備。tf.data API 有助於去構建靈活且高效的輸入管道。這個文件解釋了 tf.data API 的特徵和怎麼去構建高效能的 TensorFlow 輸入管道 over 各種模型 及 硬體加速器。

這個指南主要包含以下幾部分:

  • 說明 TensorFlow 輸入管道本質上是一個 ETL 過程(Extract,Transform,Load)。
  • Describes common performance optimizations in the context of the tf.data API.
  • Discusses the performance implications of the order in which you apply transformations.
  • Summarizes the best practices for designing performant TensorFlow input pipelines.

1. 輸入管道結構

一個典型的 TensorFlow 訓練輸入管道能夠抽象為一個 ETL 過程(Extract,Transform,Load):

  • Extract:從永久儲存上讀取資料——可以是本地(HDD 或 SSD),也可以是網盤(GCS 或 HDFS)
  • Transform:使用 CPU 去解析、預處理資料——比如:影象解碼、資料增強、變換(比如:隨機裁剪、翻轉、顏色變換)、打亂、batching。
  • Load:將 Transform 後的資料載入到 計算裝置(accelerator device(s))——例如:GPU、TPU 等執行機器學習模型的裝置。

同時保留加速器用於重啟訓練你的模型

這個模式在利用了 GPU 強大算力的同時,有效地利用了 CPU。另外,將輸入管道看作一個 ETL 過程,十分有利於效能的優化。

當使用 tf.estimator.Estimator API 時,傳給 tf.estimator.Estimatorinput_fn 包括了前兩個階段(Extract 和 Transform)。在程式碼中,這可能看起來像下面(簡易,順序)的實現:

def parse_fn(example):
  "Parse TFExample records and perform simple data augmentation."
  example_fmt = {
    "image": tf.FixedLengthFeature((), tf.string, ""),
    "label": tf.FixedLengthFeature((), tf.int64, -1)
  }
  parsed = tf.parse_single_example(example, example_fmt)
  image = tf.image.decode_image(parsed["image"])
  image = _augment_helper(image)  # augments image using slice, reshape, resize_bilinear
  return image, parsed["label"]

def input_fn():
  files = tf.data.Dataset.list_files("/path/to/dataset/train-*.tfrecord")
  dataset = files.interleave(tf.data.TFRecordDataset)
  dataset = dataset.shuffle(buffer_size=FLAGS.shuffle_buffer_size)
  dataset = dataset.map(map_func=parse_fn)
  dataset = dataset.batch(batch_size=FLAGS.batch_size)
  return dataset
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

下一部分建立在這個輸入管道上。

2. 優化輸入管道的效能

因為新計算裝置使得訓練網路的速度快了很多,所以 CPU 上的預處理很可能會變成瓶頸。tf.data API 提供了很多構建塊來設計能夠有效利用 CPU 的輸入管道,通過優化 ETL 過程的各個步驟。

2.1 Pipelining——prefetch(n)

為了執行一個訓練 step,你必須首先 Extract、Transform 訓練資料,然後將它 feed 到計算裝置上去。但是,在一個簡易的同步實現中,當 CPU 在準備資料時,計算裝置處於閒置狀態。相反,當計算裝置在訓練模型時,CPU 處於閒置狀態。因此,訓練 step 的時間是 CPU 的預處理時間 和 計算裝置的訓練時間的總和。

Pipelining 將一個訓練 step 中的 預處理 和 模型執行 重疊起來。當計算裝置在執行第 N 個訓練 step 時,CPU 為第 N+1 個訓練 step 準備資料。通過這個重疊將 step 的時間由原來的 總和 變為了 兩個部分(執行訓練、資料準備)的最大值。

沒有 pipelining,CPU 和 GPU / TPU 很大一部分時間都是閒置的: 這裡寫圖片描述 使用 pipelining 後,空閒時間顯著減少: 這裡寫圖片描述 tf.data API 通過 tf.data.Dataset.prefetch 變換提供了一個 software pipelining 機制,這個機制解耦了 資料產生的時間 和 資料消耗的時間。尤其是,這個機制使用一個後臺執行緒和一個內部快取區,在資料被請求前,去從資料資料集中預載入一些資料。因此,為了實現上述的 pipelining 效果,你可以新增 prefetch(1) 作為你資料集管道的最終變換(或者 prefetch(n),如果你一個訓練 step 消耗 n 個元素)。

為了應用這個特性,將:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
return dataset
  • 1
  • 2

改為:

dataset = dataset.batch(batch_size=FLAGS.batch_size)
dataset = dataset.prefetch(buffer_size=FLAGS.prefetch_buffer_size)
return dataset
  • 1
  • 2
  • 3

注意:如果 資料產生器 和 資料消耗器 的工作可能重合,那麼 prefetch 變換將在任何時間都能產生效能提升。前面的建議僅僅是最簡單的應用。

2.2 並行資料變換——多執行緒進行 map 變換,mapbatch的融合

當準備一個 batch 時,輸入元素可能需要去進行預處理。為了這個目的,tf.data API 提供了 tf.data.Dataset.map 變換,這個變換應用一個使用者自定義函式到輸入資料集的每一個元素。因為輸入元素之間是獨立的,所以能夠在多個 CPU 核心上並行地進行預處理。為了使這成為可能,map 變換提供了一個 num_parallel_calls 引數去指定並行的級別。例如,下面的框圖說明了 num_parallel_calls=2 時,map 變換的效果: 這裡寫圖片描述

num_parallel_calls 引數的最優值取決於你的硬體,訓練資料的特點(比如:它的 size、shape),map 函式的計算量 和 CPU 上同時進行的其它處理。一個簡單的原則是:將 num_parallel_calls 設定為 CPU 的核心數。例如,如果 CPU 有四個核,將 num_parallel_calls 設定為 4 將會很高效。另一方面,設定 num_parallel_calls 大於 CPU 的核心數,能夠導致低效的排程,導致輸入管道速度下降。

為了應用這個特性,將:

dataset = dataset.map(map_func=parse_fn)
  • 1

改為:

dataset = dataset.map(map_func=parse_fn, num_parallel_calls=FLAGS.num_parallel_calls)
  • 1

進一步,如果你的 batch size 為成百上千,你的輸入管道將很可能受益於並行地 batch 處理。為了這個目的,tf.data API 提供了 tf.contrib.data.map_and_batch 變化,它有效地融合了 map 和 batch 變化。

為了使用這個變換,將:

dataset = dataset.map(map_func=parse_fn, num_parallel_calls=FLAGS.num_parallel_calls)
dataset = dataset.batch(batch_size=FLAGS.batch_size)
  • 1
  • 2

改為:

dataset = dataset.apply(tf.contrib.data.map_and_batch(
    map_func=parse_fn, batch_size=FLAGS.batch_size))
  • 1
  • 2

2.3 並行資料提取——並行地讀取並解析多個數據檔案

在一個實際配置中,輸入資料可能被儲存在網盤(例如,GCS 或 HDFS)。要麼因為輸入資料不適合本地,要麼因為訓練是分散式的,在每臺機器上覆制輸入資料是沒有意義的。在本地能夠很好的讀取資料的資料集管道 可能會卡在 I/O 瓶頸上,因為 本地 和 遠端儲存 有以下區別:

  • Time-to-first-byte(讀取第一個bytes的時間):從遠端儲存讀取檔案的第一個位元組的時間比本地儲存長一個數量級。
  • Read throughput(讀取吞吐量):雖然遠端儲存通常提供大的聚合頻寬,但是讀取單個檔案可能僅能利用該頻寬的一小部分。

另外,一旦原始位元組被讀取到記憶體中,也可能需要對資料進行反序列化或解密(例如:protobuf),這將導致額外的負載。不管資料是本地儲存還是遠端儲存,該開銷都存在,但如果資料未被高效地預載入,則遠端情況下可能更糟。

為了減輕各種資料提取開銷的影響,tf.data API提供了 tf.contrib.data.parallel_interleave 轉換。使用此變換可以將 從多個檔案中提取資料並解析 這一過程並行化。同時讀取的檔案的數目可以通過引數 cycle_length 來指定。

下面的框圖說明了將 parallel_interleave 變化 cycle_length=2 時的效果: 這裡寫圖片描述 為了應用這個特性,將:

dataset = files.interleave(tf.data.TFRecordDataset)
  • 1

改為:

dataset = files.apply(tf.contrib.data.parallel_interleave(
    tf.data.TFRecordDataset, cycle_length=FLAGS.num_parallel_readers))
  • 1
  • 2

由於負載或網路事件,遠端儲存系統的吞吐量會隨時間而變化。 為了緩解這種變化,parallel_interleave 變換能夠可選地使用 prefetching詳情見:tf.contrib.data.parallel_interleave

預設情況下,parallel_interleave 變換為元素提供一個確定性順序,以方便再現。作為 prefetching 的一個替代方案(這在某些情況下,可能不高效),parallel_interleave 變換也提供了一個選項去提高效能(代價是元素的順序的確定性)。尤其是,如果 sloppy 引數被設定為 True,變換可能偏離設定的順序,通過臨時跳過在下一個元素被請求時元素不可用的檔案。

3. 效能考量(Performance Considerations)

tf.data API 是圍繞可組合的變換設計的(為使用者提供靈活性)。雖然這些變換中的很多變換的次序是可交換的,但某些變換的次序對效能有影響。

3.1 Map and Batch——使用者自定義函式向量化

將使用者自定義的函式傳給 map 變換 會產生排程、執行使用者自定義函式的負載。一般情況下,這個負載與自定義函式的計算量相比很小。但是,如果 map 的函式的計算量很小,這個負載將是主要開銷。在這種情況下,我們推薦使用向量化的自定義函式(它一次對一個batch進行變換),並且在 map 變換前使用 batch 變換。

3.2 Map and Cache——快取資料集

tf.data.Dataset.cache 變化能夠在記憶體或本地儲存器上快取一個數據集。如果傳遞給 map 變換的使用者自定義函式的計算量很大,只要得到的資料集仍然適合記憶體或本地儲存,就可以在 map 轉換之後應用 cache 轉換。

如果使用者定義函式導致儲存資料集需要的空間超過了 cache 的容量,考慮提前對資料集進行預處理,以減少資源的使用。

3.3 Map and Interleave / Prefetch / Shuffle——變換的順序

很多變變化(包括 interleave,prefetch,shuffle)維護元素的內部快取。如果傳給 map 變換的 使用者自定義函式 改變了元素的 size,那麼 map 變換的次序影響記憶體的使用量。通常情況下,我們建議選擇記憶體使用量更低的次序,除非不同的次序能夠產生效能上的提高(例如,為了使用融合的 tf.contrib.data.map_and_batch)。

3.4 Repeat and Shuffle——repeatshuffle 的次序

tf.data.Dataset.repeat 變換重複輸入資料有限次(或無限次);資料的每一次重複稱為一個 epoch。tf.data.Dataset.shuffle 變換隨機打亂資料集 example 的次序。

如果 repeat 變換被放在 shuffle 變換之前,那麼 epoch 邊界將變得模糊。也就是說,某些元素可以在其他元素出現一次之前重複。另一方面,如果在 repeat 變換之前應用 shuffle 變換,那麼在每個 epoch 開始時,效能可能會下降(因為這時,也需要進行 shuffle 變化的初始化)。換句話說,將 repeat 放置在 shuffle 之前,提供了更好的效能,將 shuffle 放置在 repeat 之前,提供了更強的次序保證。

當可能時,我們推薦使用融合op:tf.contrib.data.shuffle_and_repeat 變換,這個變換在效能和更強的次序保證上都是最好的(good performance and strong ordering guarantees)。否則,我們推薦在 repeat 之前使用 shuffle

4. 最優實現的總結(Summary of Best Practices)

下面是設計輸入管道的最佳實踐的總結:

  • 使用 prefetch 變換去重疊 資料讀取器 和 資料消耗器的工作。我們尤其推薦在輸入管道的末端新增 prefetch(n) (n是batch size),以重疊 CPU 上的變換 及 GPU/TPU裝置上的訓練。詳見【2.1】
  • 通過設定 num_parallel_calls 引數,來並行 map 變換。我們建議使用將該引數設定為 CPU 的核心數。詳見【2.2】
  • 如果你使用 batch 變換來將預處理好的元素 batching,我們建議使用融合op:map_and_batch 變換;尤其是你如果使用大的batch size。詳見【2.2】
  • 如果你的資料存在遠端儲存上,(且有時需要反序列化),我們建議使用 parallel_interleave 來並行資料的讀取和解析。詳見【2.3】
  • 將簡單的使用者自定義函式進行向量化,然後傳遞給 map 變換去分攤 使用者自定義函式有關的呼叫、執行的負載。詳見【3.1】
  • 如果你的資料能夠載入到記憶體,使用 cache 變化去在訓練的第一個 epoch 將資料集快取到記憶體,所以能避免後來的 epoch 讀取、解析、變換資料的負載。詳見【3.2】
  • 如果你的預處理會增加你資料的 size,我們建議你首先使用 interleaveprefetchshuffle 變換去減少記憶體使用量(如果可能)。詳見【3.3】
  • 我們建議在 repeat 變換之前使用 shuffle 變換,最好使用融合op: shuffle_and_repeat 變換。詳見【3.4】