1. 程式人生 > >Tensorflow中的tf.layers.batch_normalization()用法

Tensorflow中的tf.layers.batch_normalization()用法

使用tf.layers.batch_normalization()需要三步:

  1. 在卷積層將啟用函式設定為None。
  2. 使用batch_normalization。
  3. 使用啟用函式啟用。

需要特別注意的是:在訓練時,需要將第二個引數training = True。在測試時,將training = False。需要特別注意的是:在訓練時,需要將第二個引數training = True。在測試時,將training = False。
下面這段程式碼就是在卷積層之後添加了bn層。

import tensorflow as tf
import  os
import
numpy as np import pickle # 檔案存放目錄 CIFAR_DIR = "./cifar-10-batches-py" # tensorboard # 1. 指定面板圖上顯示的變數 # 2. 訓練過程中將這些變數計算出來,輸出到檔案中 # 3. 檔案解析 ./tensorboard --logdir = dir. def load_data( filename ): '''read data from data file''' with open( filename, 'rb' ) as f: data = pickle.load( f,
encoding='bytes' ) # python3 需要新增上encoding='bytes' return data[b'data'], data[b'labels'] # 並且 在 key 前需要加上 b class CifarData: def __init__( self, filenames, need_shuffle ): '''引數1:資料夾 引數2:是否需要隨機打亂''' all_data = [] all_labels = [] for filename in filenames:
# 將所有的資料,標籤分別存放在兩個list中 data, labels = load_data( filename ) all_data.append( data ) all_labels.append( labels ) # 將列表 組成 一個numpy型別的矩陣!!!! self._data = np.vstack(all_data) # 對資料進行歸一化, 尺度固定在 [-1, 1] 之間 self._data = self._data # 將列表,變成一個 numpy 陣列 self._labels = np.hstack( all_labels ) # 記錄當前的樣本 數量 self._num_examples = self._data.shape[0] # 儲存是否需要隨機打亂 self._need_shuffle = need_shuffle # 樣本的起始點 self._indicator = 0 # 判斷是否需要打亂 if self._need_shuffle: self._shffle_data() def _shffle_data( self ): # np.random.permutation() 從 0 到 引數,隨機打亂 p = np.random.permutation( self._num_examples ) # 儲存 已經打亂 順序的資料 self._data = self._data[p] self._labels = self._labels[p] def next_batch( self, batch_size ): '''return batch_size example as a batch''' # 開始點 + 數量 = 結束點 end_indictor = self._indicator + batch_size # 如果結束點大於樣本數量 if end_indictor > self._num_examples: if self._need_shuffle: # 重新打亂 self._shffle_data() # 開始點歸零,從頭再來 self._indicator = 0 # 重新指定 結束點. 和上面的那一句,說白了就是重新開始 end_indictor = batch_size # 其實就是 0 + batch_size, 把 0 省略了 else: raise Exception( "have no more examples" ) # 再次檢視是否 超出邊界了 if end_indictor > self._num_examples: raise Exception( "batch size is larger than all example" ) # 把 batch 區間 的data和label儲存,並最後return batch_data = self._data[self._indicator:end_indictor] batch_labels = self._labels[self._indicator:end_indictor] self._indicator = end_indictor return batch_data, batch_labels # 拿到所有檔名稱 train_filename = [os.path.join(CIFAR_DIR, 'data_batch_%d' % i) for i in range(1, 6)] # 拿到標籤 test_filename = [os.path.join(CIFAR_DIR, 'test_batch')] # 拿到訓練資料和測試資料 train_data = CifarData( train_filename, True ) test_data = CifarData( test_filename, False ) batch_size = 20 # 設計計算圖 # 形狀 [None, 3072] 3072 是 樣本的維數, None 代表位置的樣本數量 x = tf.placeholder( tf.float32, [batch_size, 3072] ) # 形狀 [None] y的數量和x的樣本數是對應的 y = tf.placeholder( tf.int64, [batch_size] ) is_training = tf.placeholder(tf.bool, []) x_image = tf.reshape( x, [-1, 3, 32, 32] ) # 將最開始的向量式的圖片,轉為真實的圖片型別 x_image = tf.transpose( x_image, perm= [0, 2, 3, 1] ) # 將x_image拆分,eg: [(1, 32, 32, 3), (1, 32, 32, 3), ... ] x_image_arr = tf.split(x_image, num_or_size_splits = batch_size, axis = 0) # 用於儲存增強後的圖片 result_x_image_arr = [] for x_single_image in x_image_arr: # 將x_single_image改變形狀,改為圖片的格式. eg:[1, 32, 32, 3] -> [32, 32, 3] x_single_image = tf.reshape(x_single_image, [32, 32, 3]) # 上下反轉 data_aug_1 = tf.image.random_flip_left_right(x_single_image) # 增加亮度 data_aug_2 = tf.image.random_brightness(data_aug_1, max_delta = 63) # 增加對比度 data_aug_3 = tf.image.random_contrast(data_aug_2, lower = 0.2, upper = 1.8) # 將單張圖片重新改成 四維 x_single_image = tf.reshape(data_aug_3, [1, 32, 32, 3]) # 將單張圖片存入列表 result_x_image_arr.append(x_single_image) # 將result_x_image_arr重新合併成資料集的樣子 result_x_images = tf.concat(result_x_image_arr, axis = 0) # 重新做歸一化 normal_result_x_images = result_x_images / 127.5 - 1 """ def conv_wrapper(inputs, name, output_channel = 32, kernel_size = (3, 3), activation = tf.nn.relu, padding = 'same'): ''' tf.layers.conv2d 的包裹函式 :param inputs: :param name: :param output_channel: :param kernel_size: :param activation: :param padding: :return: ''' return tf.layers.conv2d(inputs, output_channel, kernel_size, padding = padding, activation = activation, name = name) """ def conv_wrapper(inputs, name, is_training, output_channel = 32, kernel_size = (3, 3), activation = tf.nn.relu, padding = 'same'): ''' 卷積層 包裹函式 :param inputs: :param name: :param is_training: :param output_channel: :param kernel_size: :param activation: :param padding: :return: ''' # without bn: conv -> activation # with batch normalization: conv -> bn -> activation with tf.name_scope(name): conv2d = tf.layers.conv2d(inputs, output_channel, kernel_size, padding = padding, activation = None, name = name + '/conv2d') # 第二個引數很重要,normalization需要維護一個均值和一個方差, # 在訓練過程和測試過程中,他們的值是不一樣的, # 在訓練上,均值和方差是在一個batch上計算得到的 # 預測過程中,均值和方差是在整個資料集上,通過加權平均計算得到的 # 所以,在訓練和測試中模式是不一樣的, # 在 train 中,設定為 True,在test中,設定為False bn = tf.layers.batch_normalization(conv2d, training = is_training) return activation(bn) def pooling_wrapper(inputs, name): ''' tf.layers.max_pooling2d 的包裹函式 :param inputs: :param name: :return: ''' return tf.layers.max_pooling2d(inputs, (2, 2), (2, 2), name = name) # conv1:神經元 feature_map 輸出影象 影象大小: 32 * 32 conv1_1 = conv_wrapper(normal_result_x_images, 'conv1_1', is_training) conv1_2 = conv_wrapper(conv1_1, 'conv1_2', is_training) conv1_3 = conv_wrapper(conv1_2, 'conv1_3', is_training) # 池化層 影象輸出為: 16 * 16 pooling1 = pooling_wrapper(conv1_3, 'pooling1') conv2_1 = conv_wrapper(pooling1, 'conv2_1', is_training) conv2_2 = conv_wrapper(conv2_1, 'conv2_2', is_training) conv2_3 = conv_wrapper(conv2_2, 'conv2_4', is_training) # 池化層 影象輸出為 8 * 8 pooling2 = pooling_wrapper(conv2_3, 'pooling2') conv3_1 = conv_wrapper(pooling2, 'conv3_1', is_training) conv3_2 = conv_wrapper(conv3_1, 'conv3_2', is_training) conv3_3 = conv_wrapper(conv3_2, 'conv3_3', is_training) # 池化層 輸出為 4 * 4 * 32 pooling3 = pooling_wrapper(conv3_3, 'pooling3') # 展平 flatten = tf.contrib.layers.flatten( pooling3 ) y_ = tf.layers.dense(flatten, 10) # 使用交叉熵 設定損失函式 loss = tf.losses.sparse_softmax_cross_entropy( labels = y, logits = y_ ) # 該api,做了三件事兒 1. y_ -> softmax 2. y -> one_hot 3. loss = ylogy # 預測值 獲得的是 每一行上 最大值的 索引.注意:tf.argmax()的用法,其實和 np.argmax() 一樣的 predict = tf.argmax( y_, 1 ) # 將布林值轉化為int型別,也就是 0 或者 1, 然後再和真實值進行比較. tf.equal() 返回值是布林型別 correct_prediction = tf.equal( predict, y ) # 比如說第一行最大值索引是6,說明是第六個分類.而y正好也是6,說明預測正確 # 將上句的布林型別 轉化為 浮點型別,然後進行求平均值,實際上就是求出了準確率 accuracy = tf.reduce_mean( tf.cast(correct_prediction, tf.float64) ) with tf.name_scope( 'train_op' ): # tf.name_scope() 這個方法的作用不太明白(有點迷糊!) train_op = tf.train.AdamOptimizer( 1e-3 ).minimize( loss ) # 將 損失函式 降到 最低 def variable_summary(var, name): ''' 一個變數的各種統計量,建立一個summary :param var: 計算summary的變數 :param name: 指定名稱空間,以防衝突 :return: ''' with tf.name_scope(name): mean = tf.reduce_mean(var) with tf.name_scope('stddev'): # 求標準差 stddev = tf.sqrt(tf.reduce_mean(tf.square(var - mean))) tf.summary.scalar('mean', mean) # 均值 tf.summary.scalar('stddev', stddev) # 標準差 tf.summary.scalar('min', tf.reduce_min(var)) # 最小值 tf.summary.scalar('max', tf.reduce_max(var)) # 最大值 tf.summary.histogram('histogram', var) # 直方圖 反應的是變數的分佈 # 給六個卷積層新增summary with tf.name_scope('summary'): variable_summary(conv1_1, 'conv1_1') variable_summary(conv1_2, 'conv1_2') variable_summary(conv2_1, 'conv2_1') variable_summary(conv2_2, 'conv2_2') variable_summary(conv3_1, 'conv3_1') variable_summary(conv3_2, 'conv3_2') loss_summary = tf.summary.scalar('loss', loss) # 'loss':<10, 1.1>, <20, 1.08> accuracy_summary = tf.summary.scalar('accurary', accuracy) inputs_summary = tf.summary.image('inputs_image', normal_result_x_images) merged_summary = tf.summary.merge_all() # 將以上所有帶有 summary 的變數聚合起來 merged_summary_test = tf.summary.merge([loss_summary, accuracy_summary]) # 指定檔案儲存路徑 LOG_DIR = '.' run_label = 'run_vgg_tensorboard' run_dir = os.path.join(LOG_DIR, run_label) # 判斷該資料夾是否已經建立 if not os.path.exists(run_dir): os.mkdir(run_dir) # 在該資料夾下建立兩個資料夾,一個存放訓練資料,一個存放測試資料 train_log_dir = os.path.join(run_dir, 'train') test_log_dir = os.path.join(run_dir, 'test') # 判斷這兩個資料夾是否存在 if not os.path.exists(train_log_dir): os.mkdir(train_log_dir) if not os.path.exists(test_log_dir): os.mkdir(test_log_dir) # 初始化變數 init = tf.global_variables_initializer() train_steps = 1000000 test_steps = 100 # 不是每一步summary都是要計算的可以定義一個範圍,每過多少步計算一次 output_summary_every_steps = 100 with tf.Session() as sess: sess.run( init ) # 注意: 這一步必須要有!! # 開啟一個writer,向writer中寫資料 train_writer = tf.summary.FileWriter(train_log_dir, sess.graph) # 引數2:顯示計算圖 test_writer = tf.summary.FileWriter(test_log_dir) fixed_test_batch_data, fixed_test_batch_labels = test_data.next_batch(batch_size) # 開始訓練 for i in range( train_steps ): # 得到batch batch_data, batch_labels = train_data.next_batch( batch_size ) eval_ops = [loss, accuracy, train_op] should_output_summary = ((i+1) % output_summary_every_steps == 0) if should_output_summary: eval_ops.append(merged_summary) # 獲得 損失值, 準確率 eval_val_results = sess.run( eval_ops, feed_dict={x:batch_data, y:batch_labels, is_training:True} ) # 在訓練的時候,is_train 為 True loss_val, acc_val = eval_val_results[0:2] if should_output_summary: train_summary_str = eval_val_results[-1] train_writer.add_summary(train_summary_str, i+1) # 在 測試 時候,is_trian 為 False test_summary_str = sess.run([merged_summary_test], feed_dict = {x: fixed_test_batch_data,y: fixed_test_batch_labels, is_training: False} )[0] test_writer.add_summary(test_summary_str, i+1) # 每 500 次 輸出一條資訊 if ( i+1 ) % 500 == 0: print('[Train] Step: %d, loss: %4.5f, acc: %4.5f' % ( i+1, loss_val, acc_val )) # 每 5000 次 進行一次 測試 if ( i+1 ) % 5000 == 0: # 獲取資料集,但不隨機 test_data = CifarData( test_filename, False ) all_test_acc_val = [] for j in range( test_steps ): test_batch_data, test_batch_labels = test_data.next_batch( batch_size ) test_acc_val = sess.run( [accuracy], feed_dict={ x:test_batch_data, y:test_batch_labels, is_training:False } ) all_test_acc_val.append( test_acc_val ) test_acc = np.mean( all_test_acc_val ) print('[Test ] Step: %d, acc: %4.5f' % ( (i+1), test_acc ))