1. 程式人生 > >使用估算器、tf.keras 和 tf.data 進行多 GPU 訓練

使用估算器、tf.keras 和 tf.data 進行多 GPU 訓練

文 / Zalando Research 研究科學家 Kashif Rasul

來源 | TensorFlow 公眾號

與大多數 AI 研究部門一樣,Zalando Research 也意識到了對創意進行嘗試和快速原型設計的重要性。隨著資料集變得越來越龐大,瞭解如何利用我們擁有的共享資源來高效快速地訓練深度學習模型變得大有用處。

TensorFlow 的估算器 API 對於在分散式環境中使用多個 GPU 來訓練模型非常有用。本文將主要介紹這一工作流程。我們先使用 Fashion-MNIST 小資料集訓練一個用 tf.keras 編寫的自定義估算器,然後在文末介紹一個較實際的用例。

TL; DR:基本上,我們需要記住,對於 tf.keras. 模型,我們只要通過 tf.keras.estimator.model_to_estimator 方法將其轉化為 tf.estimator.Estimator 物件,即可使用 tf.estimator API 來進行訓練。轉化完成後,我們可以使用估算器提供的機制用不同的硬體配置訓練模型。

import os import time

#!pip install -q -U tensorflow-gpu
import tensorflow as tf

import numpy as np

匯入 Fashion-MNIST 資料集 我們用 Fashion-MNIST 資料集隨手替換一下 MNIST,這裡麵包含幾千張 Zalando 時尚文章的灰度影象。獲取訓練和測試資料非常簡單,如下所示:

(train_images, train_labels), (test_images, test_labels) = 
   tf.keras.datasets.fashion
_mnist.load_data()

我們想把這些影象的畫素值從 0 到 255 之間的一個數字轉換為 0 到 1 之間的一個數字,並將該資料集轉換為 [B, H, W ,C] 格式,其中 B 代表批處理的影象數,H 和 W 分別是高度和寬度,C 是我們資料集的通道數(灰度為 1):

TRAINING_SIZE = len(train_images)
TEST_SIZE = len(test_images)

train_images = np.asarray(train_images, dtype=np.float32) / 255
# Convert the train images and add channels
train_images = train_images.reshape((TRAINING_SIZE, 28, 28, 1)) test_images = np.asarray(test_images, dtype=np.float32) / 255 # Convert the test images and add channels test_images = test_images.reshape((TEST_SIZE, 28, 28, 1)) 接下來,我們想將標籤從整數編號(例如,2 或套衫)轉換為獨熱編碼(例如,0,0,1,0,0,0,0,0,0,0)。為此,我們要使用 tf.keras.utils.to_categorical 函式: # How many categories we are predicting from (0-9) LABEL_DIMENSIONS = 10 train_labels = tf.keras.utils.to_categorical(train_labels, LABEL_DIMENSIONS) test_labels = tf.keras.utils.to_categorical(test_labels, LABEL_DIMENSIONS) # Cast the labels to floats, needed later train_labels = train_labels.astype(np.float32) test_labels = test_labels.astype(np.float32)

構建 tf.keras 模型 我們會使用 Keras 功能 API 來建立神經網路。Keras 是一個高階 API,可用於構建和訓練深度學習模型,其採用模組化設計,使用方便,易於擴充套件。tf.keras 是 TensorFlow 對這個 API 的實現,其支援 Eager Execution、tf.data 管道和估算器等。

在架構方面,我們會使用 ConvNet。一個非常籠統的說法是,ConvNet 是卷積層 (Conv2D) 和池化層 (MaxPooling2D) 的堆疊。但最重要的是,ConvNet 將每個訓練示例當作一個 3D 形狀張量(高度、寬度、通道),對於灰度影象,張量從通道 = 1 開始,然後返回一個 3D 張量。

因此,在 ConvNet 部分之後,我們需要將張量平面化,並新增密集層,其中最後一個返回 LABEL_DIMENSIONS 大小的向量,並附帶 tf.nn.softmax 啟用:

inputs = tf.keras.Input(shape=(28,28,1))  # Returns a placeholder

x = tf.keras.layers.Conv2D(filters=32, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(inputs)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.MaxPooling2D(pool_size=(2, 2), strides=2)(x)

x = tf.keras.layers.Conv2D(filters=64, 
                          kernel_size=(3, 3), 
                          activation=tf.nn.relu)(x)

x = tf.keras.layers.Flatten()(x)

x = tf.keras.layers.Dense(64, activation=tf.nn.relu)(x)
predictions = tf.keras.layers.Dense(LABEL_DIMENSIONS,
                                   activation=tf.nn.softmax)(x)

現在,我們可以定義學習模型,請選擇優化器(我們從 TensorFlow 中選擇一個,而不使用來自 tf.keras. optimizers 的優化器)並進行編譯:

model = tf.keras.Model(inputs=inputs, outputs=predictions)

optimizer = tf.train.AdamOptimizer(learning_rate=0.001)

model.compile(loss='categorical_crossentropy',
             optimizer=optimizer,
             metrics=['accuracy'])

建立估算器 使用已編譯的 Keras 模型建立估算器,也就是我們所說的 model_to_estimator 方法。請注意,Keras 模型的初始模型狀態儲存在建立的估算器中。

那估算器有哪些優點呢?首先要提以下幾點:

  • 您可以在本地主機或分散式多 GPU 環境中執行基於估算器的模型,而無需更改您的模型;
  • 估算器能夠簡化模型開發者之間的共享實現;
  • 估算器能夠為您構建圖形,所以有點像 Eager Execution,沒有明確的會話。

那麼我們要如何訓練簡單的 tf.keras 模型來使用多 GPU?我們可以使用 tf.contrib.distribute.MirroredStrategy 正規化,通過同步訓練進行圖形內複製。如需瞭解更多關於此策略的資訊,請觀看分散式 TensorFlow 訓練講座。 注:分散式 TensorFlow 連結 https://www.youtube.com/watch?v=bRMGoPqsn20

基本上,每個工作器 GPU 都有一個網路拷貝,並會獲取一個數據子集,據以計算本地梯度,然後等待所有工作器以同步方式結束。然後,工作器通過 Ring All-reduce 運算互相傳遞其本地梯度,這通常要進行優化,以減少網路頻寬並增加吞吐量。在所有梯度到達後,每個工作器會計算其平均值並更新引數,然後開始下一步。理想情況下,您在單個節點上有多個高速互聯的 GPU。

要使用此策略,我們首先要用已編譯的 tf.keras 模型建立一個估算器,然後通過 RunConfig config 賦予其 MirroredStrategy 配置。預設情況下,該配置會使用全部 GPU,但您也可以賦予其一個 num_gpus 選項,以使用特定數量的 GPU:

NUM_GPUS = 2

strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)

estimator = tf.keras.estimator.model_to_estimator(model,
                                                 config=config)

建立估算器輸入函式 要通過管道將資料傳遞到估算器,我們需要定義一個數據匯入函式,該函式返回批量資料的 tf.data 資料集(影象、標籤)。下面的函式接收 numpy 陣列,並通過 ETL 過程返回資料集。

請注意,最後我們還呼叫了預讀取方法,該方法會在訓練時將資料緩衝到 GPU,以便下一批資料準備就緒並等待 GPU,而不是在每次迭代時讓 GPU 等待資料。GPU 可能仍然沒有得到充分利用,要改善這一點,我們可以使用融合版轉換運算(如 shuffle_and_repeat),而不是兩個單獨的運算。不過,我在這裡選用的是簡單用例。

def input_fn(images, labels, epochs, batch_size):

     # Convert the inputs to a Dataset. (E)
    ds = tf.data.Dataset.from_tensor_slices((images, labels))    

    # Shuffle, repeat, and batch the examples. (T)
    SHUFFLE_SIZE = 5000
    ds = ds.shuffle(SHUFFLE_SIZE).repeat(epochs).batch(batch_size)
    ds = ds.prefetch(2)    

    # Return the dataset. (L)
    return ds

訓練估算器 首先,我們定義一個 SessionRunHook 類,用於記錄隨機梯度下降法每次迭代的次數:

class TimeHistory(tf.train.SessionRunHook):
    def begin(self):
       self.times = []    

    def before_run(self, run_context):
       self.iter_time_start = time.time()    

    def after_run(self, run_context, run_values):
       self.times.append(time.time() - self.iter_time_start)

亮點在這裡!我們可以對估算器呼叫 train 函式,並通過 hooks 引數,向其賦予我們定義的 input_fn (包含批次大小和我們希望的訓練回合次數)和 TimeHistory 例項:

time_hist = TimeHistory()

BATCH_SIZE = 512
EPOCHS = 5

estimator.train(lambda:input_fn(train_images,
                               train_labels,
                               epochs=EPOCHS,
                               batch_size=BATCH_SIZE),
               hooks=[time_hist])

效能 現在,我們可以使用時間鉤子來計算訓練的總時間和平均每秒訓練的影象數量(平均吞吐量):

total_time = sum(time_hist.times)
print(f"total time with {NUM_GPUS} GPU(s): {total_time} seconds")

avg_time_per_batch = np.mean(time_hist.times)
print(f"{BATCH_SIZE*NUM_GPUS/avg_time_per_batch} images/second with
       {NUM_GPUS} GPU(s)")

使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,採用不同 NUM_GPUS,顯示縮放不良

評估估算器 為了檢驗模型的效能,我們要對估算器呼叫評估方法:

estimator.evaluate(lambda:input_fn(test_images, 
                                  test_labels,
                                  epochs=1,
                                  batch_size=BATCH_SIZE))

視網膜 OCT (光學相干斷層成像術)影象示例 為了測試模型在處理較大資料集時的擴充套件效能,我們使用 視網膜 OCT 影象資料集,這是 Kaggle 眾多大型資料集中的一個。該資料集由活人視網膜的橫截面 X 光影象組成,分為四個類別:NORMAL、CNV、DME 和 DRUSEN:

光學相干斷層成像術的代表影象,選自 Kermany 等人所著的《通過基於影象的深度學習技術確定醫學診斷和可治療疾病》(Identifying Medical Diagnoses and Treatable Diseases by Image-Based Deep Learning)

該資料集共有 84,495 張 JPEG 格式的 X 光影象,尺寸多為 512x496,可以通過 Kaggle CLI 下載: 注:CLI 連結 https://github.com/Kaggle/kaggle-api

#!pip install kaggle
#!kaggle datasets download -d paultimothymooney/kermany2018

下載完成後,訓練集和測試集影象類位於各自的資料夾內,因此我們可以將模式定義為:

labels = ['CNV', 'DME', 'DRUSEN', 'NORMAL']

train_folder = os.path.join('OCT2017', 'train', '**', '*.jpeg')
test_folder = os.path.join('OCT2017', 'test', '**', '*.jpeg')

接下來,我們要編寫估算器的輸入函式,該函式可以提取任何檔案模式,並返回已縮放影象和獨熱編碼標籤作為 tf.data.Dataset。這次,我們遵循輸入管道效能指南中的最佳實踐。請特別注意,如果 prefetch 的 buffer_size 為 None,則 TensorFlow 會自動使用最優的預讀取緩衝區大小: 注:輸入管道效能指南連結 https://www.tensorflow.org/performance/datasets_performance

1    def input_fn(file_pattern, labels, 
2                        image_size=(224,224), 
3                        shuffle=False,
4                        batch_size=64, 
5                        num_epochs=None,
6                        buffer_size=4096,
7                        prefetch_buffer_size=None): 
8
9            table = tf.contrib.lookup.index_table_from_tensor(mapping=tf.constant(labels))
10          num_classes = len(labels) 
11
12          def _map_func(filename):
13                label = tf.string_split([filename], delimiter=os.sep).values[-2]
14                image = tf.image.decode_jpeg(tf.read_file(filename), channels=3)
15                image = tf.image.convert_image_dtype(image, dtype=tf.float32) 
16                image = tf.image.resize_images(image, size=image_size)
17                return (image, tf.one_hot(table.lookup(label), num_classes))
18
19          dataset = tf.data.Dataset.list_files(file_pattern, shuffle=shuffle)
20
21          if num_epochs is not None and shuffle:
22                dataset = dataset.apply(
23                    tf.contrib.data.shuffle_and_repeat(buffer_size, num_epochs))
24          elif shuffle:
25                dataset = dataset.shuffle(buffer_size)
26          elif num_epochs is not None:
27                dataset = dataset.repeat(num_epochs)
28
29          dataset = dataset.apply(
30                tf.contrib.data.map_and_batch(map_func=_map_func,
31                                        batch_size=batch_size,
32                                        num_parallel_calls=os.cpu_count()))
33          dataset = dataset.prefetch(buffer_size=prefetch_buffer_size)
34
35          return dataset 

這次訓練該模型時,我們將使用一個經過預訓練的 VGG16,並且只重新訓練其最後 5 層:

keras_vgg16 = tf.keras.applications.VGG16(input_shape=(224,224,3),
                                         include_top=False)

output = keras_vgg16.output
output = tf.keras.layers.Flatten()(output)
prediction = tf.keras.layers.Dense(len(labels),
                                  activation=tf.nn.softmax)(output)

model = tf.keras.Model(inputs=keras_vgg16.input,
                      outputs=prediction)

for layer in keras_vgg16.layers[:-4]:
   layer.trainable = False

現在,我們萬事皆備,可以按照上述步驟進行,並使用 NUM_GPUS GPU 在幾分鐘內訓練我們的模型:

model.compile(loss='categorical_crossentropy',               optimizer=tf.train.AdamOptimizer(),              metrics=['accuracy'])

NUM_GPUS = 2
strategy = tf.contrib.distribute.MirroredStrategy(num_gpus=NUM_GPUS)
config = tf.estimator.RunConfig(train_distribute=strategy)
estimator = tf.keras.estimator.model_to_estimator(model,                                                  config=config)
BATCH_SIZE = 64
EPOCHS = 1

estimator.train(input_fn=lambda:input_fn(train_folder,                                         labels,                                         shuffle=True,                                         batch_size=BATCH_SIZE,                                         buffer_size=2048,                                         num_epochs=EPOCHS,                                         prefetch_buffer_size=4),                hooks=[time_hist])

訓練結束後,我們可以評估測試集的準確度,應該在 95% 左右(對初始基線來說還不錯):

estimator.evaluate(input_fn=lambda:input_fn(test_folder,
                                           labels, 
                                           shuffle=False,
                                           batch_size=BATCH_SIZE,
                                           buffer_size=1024,
                                           num_epochs=1))

使用兩塊 K80 GPU 進行訓練時的 Fashion-MNIST 訓練吞吐量和總時間,採用不同 NUM_GPUS,顯示線性縮放

總結 我們在上文中介紹瞭如何使用估算器 API 在多個 GPU 上輕鬆訓練 Keras 深度學習模型,如何編寫符合最佳實踐的輸入管道,以充分利用我們的資源(線性縮放),以及如何通過鉤子為我們的訓練吞吐量計時。

請務必注意,最後我們主要關注的是測試集錯誤。您可能會注意到,測試集的準確度會隨著 NUM_GPUS 值的增加而下降。其中一個原因可能是,使用 BATCH_SIZE*NUM_GPUS 的批量大小時,MirroredStrategy 能夠有效地訓練模型,而當我們增加 GPU 數量時,可能需要調整 BATCH_SIZE 或學習率。為便於製圖,文中除 NUM_GPUS 之外的所有其他超引數均保持不變,但實際上我們需要調整這些超引數。

資料集和模型的大小也會影響這些方案的縮放效果。在讀取或寫入小資料時,GPU 的頻寬較差,如果是較為老舊的 GPU(如 K80),則情形尤其如此,而且可能會造成上面 Fashion-MNIST 圖中所示情況。

致謝 感謝 TensorFlow 團隊,特別是 Josh Gordon,以及 Zalando Research 的各位同事,特別是 Duncan Blythe、Gokhan Yildirim 和 Sebastian Heinz,感謝他們幫忙修改草稿。