TensorFlow深度學習實戰(一):AlexNet對MNIST資料集進行分類
阿新 • • 發佈:2018-12-20
概要
進來一段時間在看深度學習中經典的CNN模型相關論文。同時,為了督促自己學習TensorFlow,通讀論文之後開始,利用TensorFlow實現各個模型,復現相關實驗。這是第一篇論文講解的是AlexNet,論文下載網址為:ImageNet Classification with Deep Convolutional
Neural Networks 。對應的論文筆記為:深度學習經典論文筆記(一)——AlexNet:ImageNet Classification with Deep Convolutional Neural Networks 。該篇部落格主要記錄的是TensorFlow程式碼,由於論文中提到的LSVRC2010資料集過於龐大,下載需花費大量時間,首先利用MNIST來進行分類。
程式碼
# -*- coding: utf-8 -*- # @Time : 2018/11/8 21:00 # @Author : Daipuwei # @Blog :https://blog.csdn.net/qq_30091945 # @EMail :[email protected] # @Site :中國民航大學北教25實驗室506 # @FileName: AlexNet.py # @Software: PyCharm """ 這是AlexNet的類程式碼 """ import tensorflow as tf import numpy as np import os import time from tensorflow.examples.tutorials.mnist import input_data IMAGE_SIZE = 28 # MNIST圖片尺寸 IMAGE_CHANNELS = 1 # MNIST影象通道個數 NUM_LABELS = 10 # MNIST資料集分類個數 DROPOUT = 0.8 # dropout概率 BATCH_SIZE = 64 # 小樣本規模 REGULARIZER_LEARNING_RATE = 0.0005 # 正則化洗漱 EXPONENTIAL_MOVING_AVERAGE_DECAY = 0.99 # 動量因子 LEARNING_RATE_BASE = 0.001 # 學習率衰初始值 LEARNING_RATE_DECAY = 0.99 # 學習率衰減率 TRAINING_STEP = 10000 # 迭代次數 MODEL_SAVE_PATH = "./Models" # 模型檔案存放資料夾地址 MODEL_NAME = "AlexNet.ckpt" # 模型名稱 class AlexNet(object): def __init__(self): # 將上述變數定義為全域性變數 global IMAGE_SIZE,IMAGE_CHANNELS,NUM_LABELS,DROPOUT global BATCH_SIZE,REGULARIZER_LEARNING_RATE,EXPONENTIAL_MOVING_AVERAGE_DECAY global LEARNING_RATE_BASE,TRAINING_STEP,LEARNING_RATE_DECAY global MODEL_SAVE_PATH,MODEL_NAME # 初始化各層的權重 with tf.variable_scope("weights"): self.weights ={ 'conv1': tf.Variable(tf.random_normal([11, 11, 1, 64]),trainable=True), 'conv2': tf.Variable(tf.random_normal([5, 5, 64, 192]),trainable=True), 'conv3': tf.Variable(tf.random_normal([3, 3, 192, 384]),trainable=True), 'conv4':tf.Variable(tf.random_normal([3, 3, 384, 384]),trainable=True), 'conv5': tf.Variable(tf.random_normal([3, 3, 384, 256]),trainable=True), 'fc1': tf.Variable(tf.random_normal([4*4*256,4096]),trainable=True), 'fc2': tf.Variable(tf.random_normal([4096,4096]),trainable=True), 'fc3': tf.Variable(tf.random_normal([4096,NUM_LABELS]),trainable=True) } # 初始化各層的偏置 with tf.variable_scope("biases"): self.biases ={ 'conv1': tf.Variable(tf.random_normal([64]),trainable=True), 'conv2': tf.Variable(tf.random_normal([192]),trainable=True), 'conv3': tf.Variable(tf.random_normal([384]),trainable=True), 'conv4': tf.Variable(tf.random_normal([384]),trainable=True), 'conv5': tf.Variable(tf.random_normal([256]),trainable=True), 'fc1': tf.Variable(tf.random_normal([4096]),trainable=True), 'fc2': tf.Variable(tf.random_normal([4096]),trainable=True), 'fc3': tf.Variable(tf.random_normal([10]),trainable=True) } def con2d(self,name,input,weights,biases,strides): """ 這是AlexNet內的卷積操作的函式 :param name: 名稱 :param input: 輸入 :param weights: 權重 :param biases: 偏置 :param strides: 步長 """ return tf.nn.relu(tf.nn.bias_add(tf.nn.conv2d(input,weights,strides=[1,strides,strides,1],padding="SAME"),biases),name=name) def max_pool(self,name,input,ksize,strides): """ 這是AlexNet內的最大池化操作函式 :param name: 名稱 :param input: 輸入 :param ksize: 核心大小 :param strides: 步長 """ return tf.nn.max_pool(input,ksize=[1,ksize,ksize,1],strides=[1,strides,strides,1],padding='SAME',name=name) def LocalResponseNormlization(self,name,input): """ 這是AlexNet內的LRN操作 :param name: 名稱 :param input: 輸入 """ return tf.nn.lrn(input,4,bias=1.0,alpha=0.001/9.0,beta=0.75,name=name) def inferecne(self,images,train_flag,regularizer=None): """ 這是前向傳播的函式 :param images: 輸入影象 :param train_flag: 訓練測試標誌 :param regularizer: 正則化函式 """ # 第一層,定義卷積權重、偏置和下采樣,計算卷積結果 conv1 = self.con2d('conv1',images,self.weights['conv1'],self.biases['conv1'],strides=1) # 最大池化 pool1 = self.max_pool('pool1',conv1,ksize=2,strides=2) # LRN lrn1 = self.LocalResponseNormlization('lrn1',pool1) # 第二層,定義卷積權重、偏置和下采樣,計算卷積結果 conv2 = self.con2d('conv2', lrn1, self.weights['conv2'], self.biases['conv2'], strides=1) # 最大池化 pool2 = self.max_pool('pool2', conv2, ksize=2, strides=2) # LRN lrn2 = self.LocalResponseNormlization('lrn2', pool2) # 第三層,定義卷積權重、偏置和下采樣,計算卷積結果 conv3 = self.con2d('conv3', lrn2, self.weights['conv3'], self.biases['conv3'], strides=1) # LRN lrn3 = self.LocalResponseNormlization('lrn3',conv3) # 第四層,定義卷積權重、偏置和下采樣,計算卷積結果 conv4 = self.con2d('conv4', lrn3, self.weights['conv4'], self.biases['conv4'], strides=1) # LRN lrn4 = self.LocalResponseNormlization('lrn4',conv4) # 第五層,定義卷積權重、偏置和下采樣,計算卷積結果 conv5 = self.con2d('conv4', lrn4, self.weights['conv5'], self.biases['conv5'], strides=1) # 最大池化 pool5 = self.max_pool('pool5', conv5, ksize=2, strides=2) # LRN lrn5 = self.LocalResponseNormlization('lrn4', pool5) # 第六層為全連線層,fc1 fc1_input = tf.reshape(lrn5,[-1,self.weights['fc1'].get_shape().as_list()[0]]) fc1 = tf.nn.relu(tf.matmul(fc1_input,self.weights['fc1'])+self.biases['fc1'],name='fc1') # 訓練階段,則進行dropout if train_flag == True: fc1 = tf.nn.dropout(fc1,DROPOUT) # 對權重進行l2正則化 if regularizer != None: tf.add_to_collection("losses", regularizer(self.weights['fc1'])) # 第七層為全連線層,fc2 fc2_input = tf.reshape(fc1,[-1,self.weights['fc2'].get_shape().as_list()[0]]) fc2 = tf.nn.relu(tf.matmul(fc2_input,self.weights['fc2'])+self.biases['fc2'],name='fc2') # 訓練階段,則進行dropout if train_flag == True: fc2 = tf.nn.dropout(fc2,DROPOUT) # 對權重進行l2正則化 if regularizer != None: tf.add_to_collection("losses",regularizer(self.weights['fc2'])) # 第八層為全連線層,fc3 fc3_input = tf.reshape(fc2,[-1,self.weights['fc3'].get_shape().as_list()[0]]) fc3 = tf.matmul(fc3_input,self.weights['fc3'])+self.biases['fc3'] # 對權重進行l2正則化 if regularizer != None: tf.add_to_collection("losses", regularizer(self.weights['fc3'])) return fc3 def train(self,mnist): """ 這是AlexNet的訓練函式 :param Train_Data: 訓練資料集 :param Train_Label: 訓練標籤集 """ # 定義AlexNet的輸入和輸出 x = tf.placeholder(tf.float32,shape=[None,IMAGE_SIZE,IMAGE_SIZE,IMAGE_CHANNELS],name="x_input") y_ = tf.placeholder(tf.float32,shape=[None,NUM_LABELS],name="y_input") # 定義L2正則化 regularizer = tf.contrib.layers.l2_regularizer(REGULARIZER_LEARNING_RATE) # 利用AlexNet進行前向傳播得到預測分類 y = self.inferecne(x,True,regularizer) global_step = tf.Variable(0,trainable=False) # 定義滑動平均操作以及訓練過程 variable_average = tf.train.ExponentialMovingAverage(EXPONENTIAL_MOVING_AVERAGE_DECAY, global_step) variable_average_op = variable_average.apply(tf.trainable_variables()) # 定義cross_entropy交叉熵和損失函式 cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y,labels=tf.argmax(y_,1)) cross_entropy_mean = tf.reduce_mean(cross_entropy) tf.add_to_collection("losses",cross_entropy_mean) loss = tf.add_n(tf.get_collection("losses")) # 定義精度 correct = tf.equal(tf.arg_max(y_,1),tf.arg_max(y,1)) accuracy = tf.reduce_mean(tf.cast(correct,tf.float32)) # 定義指數衰減學習率 learning_rate = tf.train.exponential_decay(LEARNING_RATE_BASE,global_step, mnist.train.labels.shape[0]/BATCH_SIZE,LEARNING_RATE_DECAY) train_step = tf.train.AdamOptimizer(learning_rate).minimize(loss,global_step) with tf.control_dependencies([train_step, variable_average_op]): train_op = tf.no_op(name='train') # 初始化持久化類 saver = tf.train.Saver() with tf.Session() as sess: # 初始化所有變數 tf.global_variables_initializer().run() for i in range(TRAINING_STEP): xs, ys = mnist.train.next_batch(BATCH_SIZE) xs = np.reshape(xs, (BATCH_SIZE, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS)) ys = np.reshape(ys,(BATCH_SIZE,NUM_LABELS)) '''random_sequence_batch = self.Shuffle_Sequence(len(Train_Data), BATCH_SIZE) accuracy_train = [] loss_train = [] step_train = 0 for batch in random_sequence_batch: Train_Data_Batch,Train_Label_Batch = Train_Data[batch],Train_Label[batch] Train_Data_Batch = np.shape(Train_Data_Batch,(BATCH_SZIE,IMAGE_SIZE,IMAGE_SIZE,IMAGE_CHANNELS)) Train_Label_Batch = np.shape(Train_Label_Batch,(BATCH_SZIE,NUM_LABELS)) #xs = np.reshape(xs,(BATCH_SIZE, Inference.IMAGE_SIZE, Inference.IMAGE_SIZE, Inference.NUM_CHANNELS)) _,loss_train,acc_train,step = sess.run([train_op,loss,accuracy,global_step], feed_dict={x:Train_Data_Batch,y_:Train_Label_Batch}) accuracy_train.append(acc_train) loss_train.append(loss_train) step_train = step''' _,step = sess.run([train_op,global_step],feed_dict={x: xs, y_: ys}) if i % 1000 == 0: # 計算精度 acc_train,step = sess.run([accuracy,global_step], feed_dict={x: xs, y_: ys}) acc_val = [] iter = int(mnist.validation.labels.shape[0]/100) #print(mnist.validation.labels.shape) for i in np.arange(0,iter): xs, ys = mnist.validation.next_batch(100) xs = np.reshape(xs, (100, IMAGE_SIZE, IMAGE_SIZE, IMAGE_CHANNELS)) acc = sess.run(accuracy, feed_dict={x: xs, y_: ys}) acc_val.append(acc) acc_val = np.sum(np.array(acc_val)*100)/mnist.validation.labels.shape[0] print("After %d training step(s),accuracy on train batch is:%.5f,accuracy on validation is:%.5f"%(step,acc_train,acc_val)) saver.save(sess,os.path.join(MODEL_SAVE_PATH,MODEL_NAME),global_step=step) def test(self,mnist): """ 這是AlexNet的測試函式 :param mnist:MNIST資料集 """ # 定義AlexNet的輸入和輸出 x = tf.placeholder(tf.float32,[None,IMAGE_SIZE,IMAGE_SIZE,IMAGE_CHANNELS],name="x_input") y_ = tf.placeholder(tf.float32,[None,NUM_LABELS],name="y_input") # 利用AlexNet進行前向傳播得到預測分類 y = self.inferecne(x,False) # 定義精度 correct = tf.equal(tf.argmax(y_,1),tf.argmax(y,1)) accuracy = tf.reduce_mean(tf.cast(correct,tf.float32)) # 通過變數重新命名的方式來載入模型,這樣在前向傳播的過程中就不需要呼叫求滑動平均的函式來獲取平局值了。 # 這樣就可以完全共用mnist_inference.py中定義的前向傳播過程 variable_averages = tf.train.ExponentialMovingAverage(EXPONENTIAL_MOVING_AVERAGE_DECAY) variable_to_restore = variable_averages.variables_to_restore() saver = tf.train.Saver(variable_to_restore) #每隔EVAL_INTERVAL_SECS秒呼叫一次計算正確率的過程以檢測訓練過程中正確率的變化 while True: with tf.Session() as sess: # tf.train.get_checkpoint_state函式會通過checkpoint檔案自動找到目錄中最新模型的檔名 ckpt = tf.train.get_checkpoint_state(MODEL_SAVE_PATH) if ckpt and ckpt.model_checkpoint_path: # 載入模型 saver.restore(sess, ckpt.model_checkpoint_path) # 通過檔名得到模型儲存時迭代的輪數 global_step = ckpt.model_checkpoint_path.split('/')[-1].split('-')[-1] xs,ys = mnist.test.images,mnist.test.labels accuracy_test = [] for start in np.arange(0,len(ys),100): end = np.min([start+100,len(ys)]) test_data,test_label = xs[start:end],ys[start:end] test_data = np.reshape(test_data,(100,IMAGE_SIZE,IMAGE_SIZE,IMAGE_CHANNELS)) acc_test = sess.run(accuracy, feed_dict={x: test_data, y_: test_label}) accuracy_test.append(acc_test) accuracy_test = np.sum(np.array(accuracy_test)*100)/len(ys) print("After %s training step(s), test accuracy = %f" % (global_step, accuracy_test)) else: print("No checkpoint file found") time.sleep(10) def run_main(): """ 這是主函式 """ mnist = input_data.read_data_sets("E:\\DPW\\MNIST",one_hot=True) alexnet = AlexNet() alexnet.train(mnist) #alexnet.test(mnist) if __name__ == '__main__': run_main()