1. 程式人生 > >深度學習系列——AlxeNet實現MNIST手寫數字體識別

深度學習系列——AlxeNet實現MNIST手寫數字體識別

    本文實現AlexNet,用於識別MNIST手寫數字體。所有程式碼的框架基於tensorflow。看了幾篇論文的原始碼之後,覺得tensorflow 確實很難,學習程式設計還是靠實踐。這篇部落格留著給自己以及學習深度學習道路上的小夥伴們一些參考吧,希望能對大家有所幫助!

    環境:

    Ubuntu 16.04

    tensorflow-gpu

    一.資料集載入

        tensor flow examples 中自帶mnist資料集,可以自動下載和進行處理。由於筆者的tensorflow版本的原因吧,使用自帶的read_data_sets()沒有任何效果,那就把資料集下載下來,自己解壓處理。mnist資料解壓後格式是二進位制檔案,需要進行特殊的處理變成numpy陣列,這裡這是參考的網上的程式碼,覺得這個處理過程如果大家明白最好,不明白也不需要太糾結。載入資料集程式碼如下:(注意這裡需要把下載的資料集先解壓之後,在處理)

# laod mnist dataset to npy
import os
import struct
import numpy as np

# 載入資料集
def load_mnist(path, kind='train'):
    """Load MNIST data from `path`"""
    labels_path = os.path.join(path,
                               '%s-labels.idx1-ubyte'
                               % kind)
    images_path = os.path.join(path,
                               '%s-images.idx3-ubyte'
                               % kind)
    with open(labels_path, 'rb') as lbpath:
        magic, n = struct.unpack('>II',
                                 lbpath.read(8))
        labels = np.fromfile(lbpath,
                             dtype=np.uint8)

    with open(images_path, 'rb') as imgpath:
        magic, num, rows, cols = struct.unpack('>IIII',
                                               imgpath.read(16))
        images = np.fromfile(imgpath,
                             dtype=np.uint8).reshape(len(labels), 784)

    return images, labels


path = './MNIST_data'
images,labels = load_mnist(path=path,kind='t10k')

    上面的程式碼,可以把資料預處理,然後自己儲存成npy格式的檔案。原始的mnist資料灰度值是0-255,在feed給網路的時候,注意資料歸一化處理

二.網路搭建

    由於最原始的AlexNet是處理ImageNet資料集的,圖片大小是224x224x3,而mnist圖片大小可以看作是28x28x1.所以網路的形式上會有差別,包括卷積核的引數。但是思想是一樣的,學習這個思想很重要。程式碼如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Mar 24 16:37:00 2018

@author: wsw
"""
import tensorflow as tf

class Alexnet():
    
    def __init__(self):
        self.lr_based = 0.01
        self.wd = 1e-4
        self.epochs = 100
        
    def build(self,input_tensor,train=True,num_class=10):
        self.conv1 = self.conv_layer(input_tensor,[3,3,1,16],name='conv1')
        self.pool1 = self.max_pool_2x2(self.conv1,name='pool1')
        
        self.conv2_1 = self.conv_layer(self.pool1,[3,3,16,32],name='conv2_1')
        self.conv2_2 = self.conv_layer(self.conv2_1,[3,3,32,64],name='conv2_2')
        self.pool2 = self.max_pool_2x2(self.conv2_2,name='pool2')
        
        self.fc3 = self.fc_layer(self.pool2,120,relu=train,name='fc3')
        self.fc4 = self.fc_layer(self.fc3,120,relu=train,name='fc4')
        self.fc5 = self.fc_layer(self.fc4,num_class,relu=False,name='fc5')
        
        return self.fc5
    
        pass
    
    
    def variables_weights(self,shape,name):
        in_num = shape[0]*shape[1]*shape[2]
        out_num = shape[3]
        with tf.variable_scope(name):
            stddev = (2/(in_num+out_num))**0.5
            init = tf.random_normal_initializer(stddev=stddev)
            weights = tf.get_variable('weights',initializer=init,shape=shape)
        return weights
    
    
    def variables_biases(self,shape,name):
        with tf.variable_scope(name):
            init = tf.constant_initializer(0.1)
            biases = tf.get_variable('biases',initializer=init,shape=shape)
        return biases
    
    def conv_layer(self,bottom,shape,name):
        with tf.name_scope(name):
            kernel = self.variables_weights(shape,name)
            conv = tf.nn.conv2d(bottom,kernel,strides=[1,1,1,1],padding='SAME')
            bias = self.variables_biases(shape[3],name)
            biases = tf.nn.bias_add(conv,bias)
            conv_out = tf.nn.relu(biases)
            return conv_out
     
    def max_pool_2x2(self,bottom,name):
        with tf.name_scope(name):
            pool = tf.nn.max_pool(bottom,ksize=[1,2,2,1],strides=[1,2,2,1],padding='SAME')
            return pool
    
    def fc_layer(self,bottom,out_num,name,relu=True):
        bottom_shape = bottom.get_shape().as_list()
        if name=='fc3':
            in_num = bottom_shape[1]*bottom_shape[2]*bottom_shape[3]
            bottom = tf.reshape(bottom,shape=(-1,in_num))
        else:
            in_num = bottom_shape[1]
        with tf.name_scope(name):
            weights = self.fc_weights(in_num,out_num,name)
            biases = self.variables_biases(out_num,name)
            output = tf.matmul(bottom,weights) + biases
        if relu:
            output = tf.nn.relu(output)   
        return output    
        
    
    def fc_weights(self,in_num,out_num,name):
        with tf.variable_scope(name):
            stddev = (2/(in_num+out_num))**0.5
            init = tf.random_normal_initializer(stddev=stddev)
            return tf.get_variable('weights',initializer=init,shape=[in_num,out_num])


       

    這個簡易版的網路有3個卷積層,2個池化層,3個全連線層。但是麻雀雖小,五臟俱全。這個網路完整的示範了一個深度學習的框架搭建,從資料預處理,從佇列獲取batch個訓練樣本,到模型引數的儲存,輸出到tensorboard視覺化訓練過程,最後載入訓練好的引數,在測試資料上進行測試。

三.訓練過程

    先給出訓練的程式碼,然後在說說一些細節問題。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sat Mar 24 22:21:35 2018

@author: wsw
"""
import tensorflow as tf
import numpy as np
import mnist_CNN
import time
# from sklearn.preprocessing import StandardScaler

logdir = './log/'
model = './model/'

datapath = './MNIST_data/'

train_path = datapath+'train.npy'
train_label_path = datapath+'train-label.npy'

train_data = np.load(train_path)
train_label = np.load(train_label_path)

#ss = StandardScaler()
## normalization
#train_data = ss.fit_transform(train_data)

# random produce one instance
# 產生一個佇列,隨機產生一個例項
image,label = tf.train.slice_input_producer([train_data,train_label])

with tf.name_scope('reshape'):
    # reshape iamge
    image = tf.reshape(image,shape=(28,28,1))
    # image = tf.transpose(image,[1,2,0])
    image = tf.image.per_image_standardization(image)
    label = tf.one_hot(label,depth=10)

# 隨機生成一個batch的訓練資料
with tf.name_scope('get_bacth'):
    batch_size = 100
    min_after_dequeue = 10000
    capacity = 3*min_after_dequeue+batch_size
    img_batch,lab_batch = tf.train.shuffle_batch([image,label],batch_size=batch_size,
                                                 min_after_dequeue=min_after_dequeue,
                                                 capacity=capacity)

#print(img_batch.shape)
#print(lab_batch.shape)
# create model
with tf.name_scope('placeholder'):
    xs = tf.placeholder(dtype=tf.float32,shape=[None,28,28,1])
    ys = tf.placeholder(dtype=tf.float32,shape=[None,10])

with tf.name_scope('losses'):
    
    alexnet = mnist_CNN.Alexnet()
    logits = alexnet.build(xs,train=True)
    labels = tf.argmax(ys,axis=1)
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=logits,
                                                                   labels=labels)
    losses = tf.reduce_mean(cross_entropy)
    tf.summary.scalar(logdir+'losses',losses)

global_step = tf.Variable(0,trainable=False)  
 

with tf.name_scope('train'):
    optimizer = tf.train.GradientDescentOptimizer(0.01)
    train_op = optimizer.minimize(losses,global_step=global_step)
    
with tf.name_scope('accuracy'):
    accu_predict = tf.nn.in_top_k(logits,labels,k=1)
    accuracy = tf.reduce_mean(tf.cast(accu_predict,dtype=tf.float32))
    tf.summary.scalar(logdir+'train_accuracy',accuracy)

with tf.name_scope('configuration'):
    
    saver = tf.train.Saver(max_to_keep=3)
    merge_op = tf.summary.merge_all()

with tf.Session() as sess:
    
    writer = tf.summary.FileWriter(logdir,sess.graph)
    tf.local_variables_initializer().run()
    tf.global_variables_initializer().run()
    coord = tf.train.Coordinator()
    thread = tf.train.start_queue_runners(sess,coord)
    
    try:
        while not coord.should_stop():
            
            start = time.time()
            imgs,labs = sess.run([img_batch,lab_batch])
            _,loss_value = sess.run([train_op,losses],
                                      feed_dict={xs:imgs,ys:labs})
            end = time.time()
            duration = end - start
            print('duration time:',duration)
            if global_step.eval() %100 == 0:
                result = sess.run(merge_op,feed_dict={xs:imgs,
                                                      ys:labs})
                writer.add_summary(result,global_step.eval())
                saver.save(sess,model,global_step=global_step.eval())
            if global_step.eval() == alexnet.epochs*600:
                break

    except tf.errors.OutOfRangeError:
        print('done!');
    finally:
        coord.request_stop()

    coord.join(thread)

    首先說說tensorflow 佇列的問題,個人理解也沒有多通透,昨天這個程式碼這裡還出現錯誤,一直在網上找原因。tf.train.slice_input_producer()函式,可以隨機從輸入的tensorlist中輸出一個tensor,把所有資料集和標籤組合輸入[images,labels],隨機選擇一個進行輸出,這個函式實際上生成了一個佇列,由子執行緒執行的。這裡在主執行緒裡面也就是with tf.Session() as sess 這個上下文管理器中開啟子執行緒 tf.train.start_queue_runners(),不然佇列始終為空,執行緒一直掛起。還有一個引數問題,tf.train.slice_input_producer()中num_epochs=None,預設為空,檢視其他的資料說迴圈迭代的次數,一般使用不設定。之前設定了程式有問題,後來參考網上相關資料,不設定比較好。然後,tf.train.slice_input_producer()函式每次都只是產生一個例項,我們需要一個batch的資料,然後使用tf.train.shuffle_batch([image,label]),該函式使用開啟的子執行緒自動進行佇列填充,當達到batch_size時就輸出feed給網路。還有一個問題是,tensorflow中多個執行緒是非同步進行的,我們需要使執行緒同步結束,這個時候需要tf.train.Coordinator() 建立一個執行緒協調器,使多個非同步執行緒之間同步結束。上面程式碼快兒中try............except.............finally,是固定的經典的套路,照搬就行。訓練100步之後,儲存一次模型的引數,輸出到tensorboard日誌檔案中,檢視一次訓練資料集的準確率。

四.在資料集測試

    以上訓練了60000步。訓練完成後,載入模型引數進行測試。測試程式碼如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Mar 25 13:58:37 2018

@author: wsw
"""

import tensorflow as tf
import mnist_CNN
import numpy as np

model = './model/'

with tf.name_scope('load_data'):
    datapath = './MNIST_data/'
    testdata = datapath+'test.npy'
    testlabel = datapath+'test-label.npy'
    test_data = np.load(testdata)
    labels = np.load(testlabel)
    
with tf.name_scope('preprocessing'):
    mean = np.mean(test_data,axis=1)
    std = np.std(test_data,axis=1)
    data = np.zeros(shape=(test_data.shape))
    for i in range(len(mean)):
        data[i,:] = (test_data[i,:]-mean[i])/std[i]
    data = data.reshape(-1,28,28,1).astype(np.float32)
    
with tf.name_scope('create_model'):
    alexnet = mnist_CNN.Alexnet()
    logits = alexnet.build(data,train=True)
    
with tf.name_scope('test_accuracy'):
    # labels is not need to one-hot encode 
    # and the dtype is int32 or int64
    accuracy_op = tf.nn.in_top_k(logits,labels,k=1)
    accuracy = tf.reduce_mean(tf.cast(accuracy_op,dtype=tf.float32))

saver = tf.train.Saver() 

with tf.Session() as sess:
    # get the newest model parameter
    ckpt = tf.train.get_checkpoint_state(model)
    path = ckpt.model_checkpoint_path
    if ckpt and path:
        saver.restore(sess,path)
        accu_score = sess.run(accuracy)
        print('test accuracy:',accu_score)

    在測試時需要對每張圖片進行歸一化操作。計算準確率可以使用tensorflow tf.nn.in_top_k()函式,注意這裡的標籤值不需要進行one_hot編碼。tf.train.get_checkpoint_state()可以得到最新儲存的模型引數,然後進行恢復載入,最後利用載入的引數進行預測。在測試程式碼中,不需要進行變數的初始化。

    最後簡單看一下,模型的準確率為99%。期望和大家交流與學習。