1. 程式人生 > >官方卷積神經網路cifar10原始碼的學習筆記(多塊GPU)

官方卷積神經網路cifar10原始碼的學習筆記(多塊GPU)

先前只有自己的遊戲本有一塊GTX1050的GPU,所以對於官方的卷積神經網路教程,僅僅是按照自己的理解將教程簡單化,具體見部落格卷積神經網路:CIFAR-10訓練和測試(單塊GPU),現如今導師提供了具有兩塊GTX1080TiGPU 的工作站,硬體條件支援了,所以就將此教程完全實現一遍。

1. tf.app.flags 主要處理命令列引數的解析工作

tf.app.flag.DEFINE_xxx()就是新增命令列的可選引數(optional argument), 裡面有三個引數,分別是引數名稱,預設值和引數描述。如下面的程式碼所示,定義'num_gpu'為一個整數,預設值為1。

FLAGS = tf.app.flags.FLAGS
tf.app.flags.DEFINE_string('train_dir', 'cifar10_multi_gpu_train_logs',
                           '''Directory where to write event logs and checkpoint''')
tf.app.flags.DEFINE_integer('max_steps', 100000, '''Number of batches to run''')
tf.app.flags.DEFINE_integer('num_gpus', 1, '''How many GPUs to use''')

2. weight decay 是放在正則化前面的一個係數,正則化一般表示模型的複雜度,所以weight decay的作用是調節模型複雜度對損失函式的影響,若weight decay很大,則複雜的模型損失函式的值也就越大。

    if wd is not None:
        # 新增L2Loss, 並將其新增到‘losses’集合
        weight_decay = tf.multiply(tf.nn.l2_loss(var), wd, name='weight_loss')
        tf.add_to_collection('losses', weight_decay)

3. tf.control_dependencies()設計是用來控制計算流圖的,給圖中的某些計算指定順序。它是個context manager, 控制節點執行順序,先執行[]中的操作,在執行context中的內容。

# tf.control_dependencies()是一個context manager, 控制節點執行順序
# 先執行[]中的操作,在執行context中的操作
with tf.control_dependencies([loss_average_op]):
    opt = tf.train.GradientDescentOptimizer(lr)
    grads = opt.compute_gradients(total_loss)

apply_gradient_op = opt.apply_gradients(grads, global_step=global_step)

4. 在採用隨機梯度下降演算法訓練網路時,使用tf.train.ExponentialMovingAverage滑動平均的意義在於提高模型在測試資料上的健壯性。tf.train.ExponentialMovingAverage包含兩個引數,一個衰減率decay和一個num_updates。decay用於控制模型更新的速度,ExponentialMovingAverage對每一個變數(variable)都會維護一個影子變數(shadow variable)。影子變數的初始值就是這個變數的初始值,影子變數的計算公式為shadow\_variable = decay \times shadow\_variable + \left ( 1 - decay \right ) \times variable,decay 越大,shadow_variable 變化的越小,越趨於穩定。在實際運動中,decay的設定一般都接近於1(例如,0.99或者0.999或者0.9999)。num_updates引數動態設定decay的大小,decay = min \left \{ decay, \frac{1 + num\_updates}{10 + num\_updates } \right \}可以使得模型在訓練的初始階段更新得更快。

  • apply()方法添加了訓練變數的影子副本,並保持其影子副本中訓練變數的移動平均值操作。在每次訓練之後呼叫此操作,更新移動平均值
  • average()和average_name()方法可以獲取影子變數及其名稱
# tf.train.ExponentialMovingAverage(decay, steps就是採用滑動平均的方法更新引數
# 這個函式初始化需要提供一個衰減速率decay,用於控制模型的更新速度,decay越大越趨於穩定
# ExponentialMovingAverage還提供num_updates引數來設定decay的大小,使得模型在訓練的
# 初始階段更新得更快
variable_average = tf.train.ExponentialMovingAverage(MOVING_AVERAGE_DECAY, num_updates=global_step)
# apply()方法添加了訓練變數的影子副本,並保持其影子副本中訓練變數的移動平均值操作
# 在每次訓練之後呼叫此操作,更新移動平均值
with tf.control_dependencies([apply_gradient_op]):
    variable_average_op = variable_average.apply(tf.trainable_variables())

5. 優化時,可以通過minimize()函式來同時計算梯度並更新該梯度所對應的引數狀態,就是日常常見的程式碼方案,如想計算梯度,然後再將梯度對應引數狀態更新,可以先利用computer_gradients()函式來計算梯度,(按照自己的需求處理梯度),然後呼叫apply_gradients()函式來更新該梯度所對應的引數的狀態。前者不需要人工參與,比較省時省力,後者可以按照自己的需求改動後使用。

with tf.control_dependencies([loss_average_op]):
    opt = tf.train.GradientDescentOptimizer(lr)
    # 利用computer_gradients()函式計算梯度
    grads = opt.compute_gradients(total_loss)

# 呼叫apply_gradients()函式來更新該梯度所對應的引數的狀態
apply_gradient_op = opt.apply_gradients(grads, global_step=global_s

6. 原始碼在訓練時,並沒有使用tensorboard顯示訓練中的一些實現資料,而是通過hook來檢測網路訓練的情況。

在訓練中主要用到了tf.train.MonitoredTrainingSession()和tf.train.SessionRunHook()。

所有的hook都繼承SessionRunHook, tf.train.SessionRunHook()類定義在tensorflow/python/trainning/session_run_hook.py中,類中包含五個通用的函式,如下:

  • begin(self): 建立會話前呼叫,呼叫begin()時,default graph會被建立
  • after_create_session(self, session,coord): tf.Session被建立後呼叫,呼叫後會指示所有的Hooks會有一個新的會話被建立
  • before_run(seflt, run_context): 呼叫在每個sess.run()執行之前
  • after_run(self, run_context, run_values): 呼叫在每個sess.run()之後
  • end(self, session): 在會話結束時呼叫

tf.train.MonitorSession()引數過多,在這裡就不展示,需要的可以去官方API文件檢視。它的父類是MonitorSession。官方文件中中給出了一段示例程式碼:

saver_hook = CheckpointSaverHook(...)
summary_hook = SummarySaverHook(...)
with MonitoredSession(session_creator=ChiefSessionCreator(...),
                      hooks=[saver_hook, summary_hook]) as sess:
  while not sess.should_stop():
    sess.run(train_op)

操作流程如下:

Initialization:

  • 對於Hooks列表中的hook,呼叫beigin()
  • 通過scaffold.finalize()完成圖的定義
  • 建立會話
  • 通過Scaffold提供的初始化操作初始化模型
  • 如果checkpoint存在的話,恢復模型變數
  • 啟動佇列執行緒
  • 呼叫hook中的after_create_session()函式

Run:

  • 呼叫Hook中的before_run()函式
  • 用合併後的fetches和feed_dict呼叫Tensorflow中的session.run()定義的神經網路規模比較小,所以上圖中的GPU的使用率並不高,如果訓練大型的神經網路模型,Tensorflow將會佔滿所有能夠用到的GPU
  • 呼叫Hook中的after_run()函式
  • 返回使用者需要的session.run()結果
  • 如果AbortedError和UnavailableError發生了,在再次執行run()之前恢復或者重新初始化會話

Exit

  • 呼叫Hook中的end()函式
  • 關閉佇列執行緒和會話
  • 在monitored_session的上下文中,抑制由於處理完所有輸入丟擲的OutOfRange錯誤
def train():
    with tf.Graph().as_default():
        global_step = tf.train.get_or_create_global_step()
        with tf.device('/cpu:0'):
            images, labels = cifar10.distorted_inputs()

        logits = cifar10.inference(images)
        loss = cifar10.loss(logits, labels)
        train_op = cifar10.train(loss, global_step)

        class _LoggerHook(tf.train.SessionRunHook):
            '''
            該類用來列印訓練資訊
            '''
            def begin(self):
                '''
                在建立會話之前呼叫,呼叫begin()時,default graph
                會被建立,可在此處向default graph增加新op, begin()
                呼叫後,default graph不能再被掉用
                '''
                self._step = -1
                self._start_time = time.time()

            def before_run(self, run_context):
                '''
                呼叫在每個sess.run()執行之前,可以返回一個
                tf.train.SessRunArgs(op/tensor),在即將執行的會話中加入這些
                op/tensor; 加入的op/tensor會和sess.run()中已定義的op/tensor
                合併,然後一起執行。
                @param run_context: A 'SessionRunContext' object
                @return: None or a 'SessionRunArgs' object
                '''
                self._step += 1
                # 在這裡返回你想在執行過程中產看的資訊,以list的形式傳遞,如:[loss, accuracy]
                return tf.train.SessionRunArgs(loss)

            def after_run(self, run_context, run_values):
                '''
                呼叫在每個sess.run()之後,引數run_values是before_run()中要求的
                op/tensor的返回值; 
                可以呼叫run_contex.request_stop()用於停止迭代。 
                sess.run丟擲任何異常after_run不會被呼叫
                @param run_context: A 'SessionRunContext' object
                @param run_values: A SessionRunValues object
                '''
                if self._step % FLAGS.log_frequency == 0:
                    current_time = time.time()
                    duration = current_time - self._start_time
                    self._start_time = current_time

                    # results返回的是上面before_run()的返回結果,上面是loss所以返回loss
                    # 如若上面返回的是個list,則返回的也是個list
                    loss_value = run_values.results
                    examples_per_sec = FLAGS.log_frequency * FLAGS.batch_size / duration
                    sec_per_batch = float(duration / FLAGS.log_frequency)

                    print('%s: step %d, loss = %.2f (%.1f examples/sec; %.3f sec/batch)'
                          % (datetime.now(), self._step, loss_value, examples_per_sec, sec_per_batch))
        '''
        將計算圖的各個節點/操作定義好,構成一個計算圖。然後開啟一個
        MonitoredTrainingSession來初始化/註冊我們的圖和其他資訊
        在其引數hooks中,傳遞了三個hook:
        1. tf.train.StopAtStepHook(last_step):該hook是訓練達到特定步數時請求
        停止。使用該hook必須要預先定義一個tf.train.get_or_create_global_step()
        2. tf.train.NanTensorHook(loss):該hook用來檢測loss, 若loss的結果為NaN,則會
        丟擲異常
        3. _LoggerHook():該hook是自定義的hook,用來檢測訓練過程中的一些資料,譬如loss, accuracy
        。首先會隨著MonitoredTrainingSession的初始化來呼叫begin()函式,在這裡初始化步數,before_run()
        函式會隨著sess.run()函式的呼叫而呼叫。所以每訓練一步呼叫一次,這裡返回想要列印的資訊,隨後呼叫
        after_run()函式。
        '''
        with tf.train.MonitoredTrainingSession(checkpoint_dir=FLAGS.train_dir,
                                               hooks=[tf.train.StopAtStepHook(last_step=FLAGS.max_steps),
                                                      tf.train.NanTensorHook(loss),
                                                      _LoggerHook()],
                                               config=tf.ConfigProto(
                                                   log_device_placement=FLAGS.log_device_placement
                                               )) as mon_sess:
            while not mon_sess.should_stop():
                mon_sess.run(train_op)

7. 在多個GPU上訓練模型。在具有多個GPU的工作站中,每個GPU的速度基本接近,並且都含有足夠的記憶體來執行整個CIFAR-10模型。因此我們選擇以下方式來設計我們的訓練系統:在每個GPU上放置單獨的模型副本,等所有的GPU處理完一批資料後再同步更新模型的引數。這一機制要求所有GPU能夠共享模型引數。由於GPU之間傳輸資料非常慢,因此在CPU上儲存和更新所有模型的引數。

如下圖所示,每一個GPU會用一批獨立的資料計算梯度和估計值,而且GPU是同步執行的,所有GPU中的梯度會累積並求平均值(CPU),所以導致GPU在處理一批新的資料之前會更新一遍引數。

 總而言之就是:模型引數儲存在cpu上,模型引數的副本在不用gpu上,每次訓練,提供batch_size*gpu_nums資料,並等量拆分成多個batch,分別送入不同GPU。前向在不同gpu上進行,模型引數更新時,將多個GPU後向計算得到的梯度資料進行平均,並在cpu上利用梯度資料更新模型引數

. 在多個裝置中設定變數和操作。在多個裝置中設定變數和操作需要做一些特殊的抽象,首先需要把在單個模型拷貝中計算估計值和梯度的行為抽象到一個函式中,在程式碼中,我們稱這個抽象物件為"tower"。對每一個"tower"設定兩個屬性:

在一個tower中為所有操作設定一個唯一的名稱。tf.name_scope()通過新增一個範圍字首來提供該唯一的名稱。比如,在第一個tower中的所有操作都會附帶一個字首tower_0

在一個tower中執行操作的優先硬體裝置。tf.device()提供該資訊。比如,在第一個tower中的所有操作都位於device('/gpu:0')範圍中,暗含的意思是這些操作應該執行在第一塊GPU上。

為了在多個GPU上共享變數,所有的變數都繫結在CPU上,並通過tf.get_variable()訪問。

9. tf.ConfigProto()函式是用在建立session的時候,用來對session進行引數配置。

  • 引數llog_device_placement=True記錄裝置指派情況,可以獲取到operations和Tensor被指派到哪個裝置(幾號CPU或幾號GPU)上執行,在終端打印出各項操作是在哪個裝置上執行的
  • 引數allow_soft_placement=True自動選擇執行裝置,允許tf自動選擇一個存在並且可用的裝置來執行操作
sess = tf.Session(config=tf.ConfigProto(allow_soft_placement=True,
                                              log_device_placement=FLAGS.log_device_placement))

tensorflow提供了兩種限制GPU資源使用的方法,一是讓tf在執行過程中動態申請記憶體,需要多少就申請多少;第二種方式就是研製GPU的使用率。

config= tf.ConfigProto()
# 動態申請視訊記憶體
config.gpu_options.allow_growth = True
# 限制GPU使用率,佔用40%視訊記憶體
config.gpu_options.per_process_gpu_memory_fraction = 0.4
sess = tf.Session(config=config)

10. 啟動並在多個GPU上訓練模型。執行multi_gpu_train.py,使用多個GPU實現模型並行訓練。我有兩個GPU,所以設num_gpus=2.

python multi_gpu_train.py  --num_gpus=2

注:tensorflow在訓練時預設佔用所有的GPU視訊記憶體。

用多GPU執行時,發現一個問題,兩塊GPU執行的時間竟然是單塊GPU執行時間的近2倍,使用nvidia-smi命令檢視GPU的使用起情況,發現GPU的使用率過低,如下圖所示:

 原因可能是:定義的神經網路規模比較小,所以上圖中的GPU的使用率並不高,如果訓練大型的神經網路模型,Tensorflow將會佔滿所有能夠用到的GPU。畢竟cifar10資料集並不是多大,而且定義的網路模型並不是很複雜。