1. 程式人生 > >Tensorflow使用Resnet思想實現CIFAR-10十分類demo

Tensorflow使用Resnet思想實現CIFAR-10十分類demo

關於Resnet殘差網路的介紹已經非常多了,這裡就不在贅述.使用Tensorflow寫了一個簡單的Resnet,對CIFAR-10資料集進行十分類.關鍵步驟都寫了詳細註釋,雖然最後的精度不高,但還是學習Resnet的思想為主.

import tensorflow as tf
import  os
import numpy as np
import pickle

# 檔案存放目錄
CIFAR_DIR = "./cifar-10-batches-py"


def load_data(filename):
    '''read data from data file'''
    with open(filename, 'rb'
) as f: data = pickle.load(f, encoding='bytes') # python3 需要新增上encoding='bytes' return data[b'data'], data[b'labels'] # 並且 在 key 前需要加上 b class CifarData: def __init__(self, filenames, need_shuffle): '''引數1:資料夾 引數2:是否需要隨機打亂''' all_data = [] all_labels = [] for
filename in filenames: # 將所有的資料,標籤分別存放在兩個list中 data, labels = load_data(filename) all_data.append(data) all_labels.append(labels) # 將列表 組成 一個numpy型別的矩陣!!!! self._data = np.vstack(all_data) # 對資料進行歸一化, 尺度固定在 [-1, 1] 之間 self._data = self._data / 127.5
- 1 # 將列表,變成一個 numpy 陣列 self._labels = np.hstack(all_labels) # 記錄當前的樣本 數量 self._num_examples = self._data.shape[0] # 儲存是否需要隨機打亂 self._need_shuffle = need_shuffle # 樣本的起始點 self._indicator = 0 # 判斷是否需要打亂 if self._need_shuffle: self._shffle_data() def _shffle_data(self): # np.random.permutation() 從 0 到 引數,隨機打亂 p = np.random.permutation(self._num_examples) # 儲存 已經打亂 順序的資料 self._data = self._data[p] self._labels = self._labels[p] def next_batch(self, batch_size): '''return batch_size example as a batch''' # 開始點 + 數量 = 結束點 end_indictor = self._indicator + batch_size # 如果結束點大於樣本數量 if end_indictor > self._num_examples: if self._need_shuffle: # 重新打亂 self._shffle_data() # 開始點歸零,從頭再來 self._indicator = 0 # 重新指定 結束點. 和上面的那一句,說白了就是重新開始 end_indictor = batch_size # 其實就是 0 + batch_size, 把 0 省略了 else: raise Exception("have no more examples") # 再次檢視是否 超出邊界了 if end_indictor > self._num_examples: raise Exception("batch size is larger than all example") # 把 batch 區間 的data和label儲存,並最後return batch_data = self._data[self._indicator:end_indictor] batch_labels = self._labels[self._indicator:end_indictor] self._indicator = end_indictor return batch_data, batch_labels # 拿到所有檔名稱 train_filename = [os.path.join(CIFAR_DIR, 'data_batch_%d' % i) for i in range(1, 6)] # 拿到標籤 test_filename = [os.path.join(CIFAR_DIR, 'test_batch')] # 拿到訓練資料和測試資料 train_data = CifarData(train_filename, True) test_data = CifarData(test_filename, False) def residual_block(x, output_channel): ''' 定義殘差塊兒 :param x: 輸入tensor :param output_channel: 輸出的通道數 :return: tensor 需要注意的是:每經過一個stage,通道數就要 * 2 在同一個stage中,通道數是沒有變化的 ''' input_channel = x.get_shape().as_list()[-1] # 拿出 輸入 tensor 的 最後一維:也就是通道數 if input_channel * 2 == output_channel: increase_dim = True strides = (2, 2) # elif input_channel == output_channel: increase_dim = False strides = (1, 1) else: raise Exception("input channel can't match output channel") conv1 = tf.layers.conv2d(x, output_channel, (3, 3), strides = strides, padding = 'same', activation = tf.nn.relu, name = 'conv1' ) conv2 = tf.layers.conv2d(conv1, output_channel, (3, 3), strides = (1, 1), # 因為 上一層 卷積已經進行過降取樣,故這裡不需要 padding = 'same', activation = tf.nn.relu, name = 'conv2' ) if increase_dim: # 需要使用降取樣 # pooled_x 資料格式 [ None, image_width, image_height, channel ] # 要求格式 [ None, image_width, image_height, channel * 2 ] pooled_x = tf.layers.average_pooling2d(x, (2, 2), # size (2, 2), # stride padding = 'valid' ) ''' 如果輸出通道數是輸入的兩倍的話,需要增加通道數量. maxpooling 只能降取樣,而不能增加通道數, 所以需要單獨增加通道數 ''' padded_x = tf.pad(pooled_x, # 引數 2 ,在每一個通道上 加 pad [ [ 0, 0 ], [ 0, 0 ], [ 0, 0 ], [input_channel // 2, input_channel // 2] # 實際上就是 2倍input_channel,需要均分開 ] ) else: padded_x = x output_x = conv2 + padded_x # 就是 公式: H(x) = F(x) + x return output_x def res_net(x, num_residual_blocks, num_filter_base, class_num): ''' 殘差網路主程式 :param x: 輸入tensor :param num_residual_blocks: 每一個stage有多少殘差塊兒 eg: list [3, 4, 6, 2] 及每一個stage上的殘差塊兒數量 :param num_filter_base: 最初的通道數 :param class_num: 所需要的分類數 :return: tensor ''' num_subsampling = len(num_residual_blocks) # num_subsampling 為 stage 個數 layers = [] # 儲存每一個殘差塊的輸出 # x: [ None, width, height, channel] -> [width, height, channel] input_size = x.get_shape().as_list()[1:] # 首先,開始第一個卷積層 with tf.variable_scope('conv0'): conv0= tf.layers.conv2d(x, num_filter_base, (3, 3), strides = (1, 1), padding = 'same', name = 'conv0' ) layers.append(conv0) # 根據 模型,此處應有一個 pooling,但是 cifar-10 資料集很小,所以不再做 pool # num_subsampling = 4, sample_id = [0, 1, 2, 3] for sample_id in range(num_subsampling): for i in range(num_residual_blocks[sample_id]): with tf.variable_scope('conv%d_%d' % (sample_id, i)): conv = residual_block(layers[-1], num_filter_base * ( 2 ** sample_id ) # 每一個stage都是之前的2倍 ) layers.append(conv) # 最後就到了 average pool, 1000維 全連線, 這一步 with tf.variable_scope('fc'): # layer[-1].shape: [None, width, height, channel] # kernal_size = image_width, image_height global_pool = tf.reduce_mean(layers[-1], [1, 2]) # 求平均值函式,引數二 指定 axis # global_pool的shape是(?, 128) # 這裡需要解釋一下,對第二維,第三維求平均值,實際上就是對每一個feature map求一個平均值,一共有128個特徵圖. # 所以維度從四維,降低到了兩維 logits = tf.layers.dense(global_pool, class_num) layers.append(logits) return layers[-1] # 設計計算圖 # 形狀 [None, 3072] 3072 是 樣本的維數, None 代表位置的樣本數量 x = tf.placeholder(tf.float32, [None, 3072]) # 形狀 [None] y的數量和x的樣本數是對應的 y = tf.placeholder(tf.int64, [None]) # [None, ], eg: [0, 5, 6, 3] x_image = tf.reshape(x, [-1, 3, 32, 32]) # 將最開始的向量式的圖片,轉為真實的圖片型別 x_image = tf.transpose(x_image, perm= [0, 2, 3, 1]) y_ = res_net(x_image, [2, 3, 2], 32, 10) # 使用交叉熵 設定損失函式 loss = tf.losses.sparse_softmax_cross_entropy(labels = y, logits = y_) # 該api,做了三件事兒 1. y_ -> softmax 2. y -> one_hot 3. loss = ylogy # 預測值 獲得的是 每一行上 最大值的 索引.注意:tf.argmax()的用法,其實和 np.argmax() 一樣的 predict = tf.argmax(y_, 1) # 將布林值轉化為int型別,也就是 0 或者 1, 然後再和真實值進行比較. tf.equal() 返回值是布林型別 correct_prediction = tf.equal(predict, y) # 比如說第一行最大值索引是6,說明是第六個分類.而y正好也是6,說明預測正確 # 將上句的布林型別 轉化為 浮點型別,然後進行求平均值,實際上就是求出了準確率 accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float64)) with tf.name_scope('train_op'): # tf.name_scope() 定義該變數的名稱空間 train_op = tf.train.AdamOptimizer(1e-3).minimize(loss) # 將 損失函式 降到 最低 # 初始化變數 init = tf.global_variables_initializer() batch_size = 20 train_steps = 100000 test_steps = 100 with tf.Session() as sess: sess.run(init) # 注意: 這一步必須要有!! # 開始訓練 for i in range(train_steps): # 得到batch batch_data, batch_labels = train_data.next_batch(batch_size) # 獲得 損失值, 準確率 loss_val, acc_val, _ = sess.run([loss, accuracy, train_op], feed_dict={x:batch_data, y:batch_labels}) # 每 500 次 輸出一條資訊 if (i+1) % 500 == 0: print('[Train] Step: %d, loss: %4.5f, acc: %4.5f' % (i+1, loss_val, acc_val)) # 每 5000 次 進行一次 測試 if (i+1) % 5000 == 0: # 獲取資料集,但不隨機 test_data = CifarData(test_filename, False) all_test_acc_val = [] for j in range(test_steps): test_batch_data, test_batch_labels = test_data.next_batch(batch_size) test_acc_val = sess.run([accuracy], feed_dict={ x:test_batch_data, y:test_batch_labels }) all_test_acc_val.append(test_acc_val) test_acc = np.mean(all_test_acc_val) print('[Test ] Step: %d, acc: %4.5f' % ((i+1), test_acc)) ''' 訓練一萬次的最終結果: ===================================================== [Train] Step: 10000, loss: 0.75691, acc: 0.75000 [Test ] Step: 10000, acc: 0.74750 ===================================================== '''