1. 程式人生 > >Tensorflow框架(五)

Tensorflow框架(五)

本章是對前五章的總結

一、概述

Tensorflow框架的核心概念是計算圖:

 

整個計算流圖的主要包含以下幾個部分:

  • 匯入資料
  • 網路結構
  • 損失函式
  • 反向傳播

由於Tensorflow框架的機制,反向傳播過程並不需要我們去描述。因此我們要做的就是:

  • 定義網路的結構
  • 定義損失函式與反向傳播演算法
  • 訓練並測試模型

 

二、計算圖

 

import tensorflow as tf
import numpy as np

# 定義常量
a = tf.constant([1.0], name = 'a')
b = tf.constant([2.0], name = 'b')
result = a + b
print(result)

輸出結果:  

 

# 定義常量
a = tf.constant([1.0], name = 'a')
b = tf.constant([2.0], name = 'b')
result = a + b

# 定義會話
with tf.Session() as sess:
    print(sess.run(result))

輸出結果: 

上述的程式碼定義了一個計算圖,通過定義會話執行計算。可以形象地類比成一個管道系統,開始的時候我們只是構建了管道系統的結構與流通規則。但此時沒有水流入,通過定義會話,將水通入管道系統,最後才能出現結果。

其中的a,b,result在tensorflow中都別稱作張量(對運算結果的引用)

 

三、神經網路的搭建

在搭建神經網路之前,我們需要了解一下本次使用的資料集MNIST

MNSIT資料集是深度學習經典入門的demo,其訓練集包含了55000張圖片,驗證集包含了5000張圖片,測試集包含了10000張,其中每張圖片是以28*28*1的矩陣形式儲存

我們使用下面的程式碼來讀取資料

mnist = input_data.read_data_sets('./', one_hot = True)

這樣會在當前資料夾下出現4個檔案:

 

下面是一個簡單的神經網路模型

基於此建立一個簡單的神經網路:

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data
import numpy as np

# 使用下面語句在當前目錄下下載並讀取檔案,如果檔案已存在則直接讀取
mnist = input_data.read_data_sets('./', one_hot = True)

# 每個批次的大小
batch_size = 100

# 定義兩個placeholder,為資料的匯入預留兩個位置
# 這裡的 None表示第一個維度暫時未知,在實際執行的時候會給定
x = tf.placeholder(tf.float32, [None, 784])
y = tf.placeholder(tf.float32, [None, 10])

# 這裡的tf.matmul表示矩陣乘法
w1 = tf.Variable(tf.truncated_normal([784, 500], stddev = 0.1))
b1 = tf.Variable(tf.constant(0.1, shape = [500]))
a1 = tf.nn.relu(tf.matmul(x, w1) + b1)

w2 = tf.Variable(tf.truncated_normal([500,10], stddev = 0.1))
b2 = tf.Variable(tf.constant(0.1, shape = [10]))
logit = tf.nn.softmax(tf.matmul(a1, w2) + b2)

# 定義softmax損失函式
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
    logits = logit, labels = tf.argmax(y, 1)))

# 使用梯度下降法
train_step = tf.train.GradientDescentOptimizer(0.2).minimize(loss)

# 初始化變數
init = tf.global_variables_initializer()

# 將結果放在一個bool型別的列表中
# tf.argmax(y, 1) 表示按axis = 1,也就是按第二個維度取值最大的位置
correction_prediction = tf.equal(tf.argmax(y,1), tf.argmax(logit, 1))
# 求準確率
accuracy = tf.reduce_mean(tf.cast(correction_prediction, tf.float32))

# 定義會話
with tf.Session() as sess:
    sess.run(init)
    for i in range(20001):
        # 以100個樣本作為一個批次
        start = (i * batch_size) % mnist.train.num_examples
        end = min(start + batch_size, mnist.train.num_examples)
        
        # 把當前批次的資料匯入進神經網路
        sess.run(train_step,feed_dict = {x:mnist.train.images[start:end], y:mnist.train.labels[start:end]})
        if i % 1000 == 0:
            
            # 將訓練集和測試集匯入神經網路,計算準確率
            train_prediction = sess.run(accuracy, feed_dict = {x:mnist.train.images, y:mnist.train.labels})
            test_prediction = sess.run(accuracy, feed_dict = {x:mnist.test.images, y:mnist.test.labels})
            print("After %d, train correction: %g, test correction: %g" %(i, train_prediction, test_prediction))

執行結果:

 

四、優化演算法

通過簡單的搭建一個神經網路,我們可以瞭解到整個程式的大概面貌,但在實際使用情況中,需要對神經網路進行一些優化,以達到更好的預期效果。這裡我們加入了正則化和學習率衰減優化演算法。同時上面提到的程式碼從某種程度上來說,並不規範,因此新增優化演算法後並進行規範化後的程式碼如下:

import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

input_node = 784 # mnist資料集共有28*28個畫素,所以輸入節點共有784
output_node = 10 # 輸出層節點數
layer1_node = 500 # 隱藏層節點數
batch_size = 100 # 一個訓練batch中的訓練資料個數
learning_rate_base = 0.8 # 基礎學習率
learning_rate_decay = 0.99 # 學習率衰減率
regularization_rate = 0.0001 # 正則化項
training_steps = 30000 # 訓練輪數

def get_weight(shape, regularizer):
    '''
    如果有正則化項,則將weight加入到losses集合中
    '''
    weight = tf.get_variable('weight', shape, initializer = tf.truncated_normal_initializer(stddev = 0.1))
    if regularizer != None:
        tf.add_to_collection('losses', regularizer(weight))
    return weight

def get_bias(shape):
    bias = tf.get_variable('bias', shape, initializer = tf.constant_initializer(0.1))
    return bias

def inference(input_tensor, regularizer):
    '''
    神經網路正向傳播過程
    '''
    # 對神經網路的第一層賦予名稱layer1的名稱空間
    with tf.variable_scope('layer1'):
        weight = get_weight([input_node, layer1_node], regularizer)
        tf.summary.histogram('weight1', weight)
        bias = get_bias([layer1_node])
        layer1 = tf.nn.relu(tf.matmul(input_tensor, weight) + bias)
        
    # 對神經網路的輸出層賦予名稱layer2的名稱空間 
    with tf.variable_scope('layer2'):
        weight = get_weight([layer1_node, output_node], regularizer)
        tf.summary.histogram('weight2', weight)
        bias = get_bias([output_node])
        layer2 = tf.nn.softmax(tf.matmul(layer1, weight) + bias)
    return layer2


def train(mnist):
    # 定義輸入空白位
    x = tf.placeholder(tf.float32, [None, input_node], name = 'x-input')
    y = tf.placeholder(tf.float32, [None, output_node], name = 'y-input')
    
    # 定義L2正則化項
    regularizer = tf.contrib.layers.l2_regularizer(regularization_rate)
    
    # 計算神經網路前向傳播的結果
    logit = inference(x, regularizer)
    
    # 這裡與之前說到滑動平均模型裡的num_updates變數一致,通過模仿迭代次數來控制衰減速率
    global_step = tf.Variable(0, trainable = False)
    
    # 定義損失函式
    cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits(
    logits = logit, labels = tf.argmax(y, 1))
    
    cross_entropy_mean = tf.reduce_mean(cross_entropy)
    
    # 總的損失等於交叉熵的損失和正則化損失的和
    loss = cross_entropy_mean + tf.add_n(tf.get_collection('losses'))
    
    # 學習率衰減函式
    learning_rate = tf.train.exponential_decay(
    learning_rate_base, # 基礎學習率,在此基礎上進行衰減
    global_step,        # 當前迭代的輪數
    mnist.train.num_examples, # 走完所有資料需要的迭代次數
    learning_rate_decay) # 學習率衰減速率
    
    # 使用梯度下降法優化
    train_step = tf.train.GradientDescentOptimizer(learning_rate).minimize(loss, global_step = global_step)
    
    # 測試輸出結果是否與真實標籤相等
    correction_prediction = tf.equal(tf.argmax(logit, 1), tf.argmax(y, 1))
    
    # 測試一組資料正確率
    # 這裡將correction_pred型別改為tf.float32
    accuracy = tf.reduce_mean(tf.cast(correction_prediction, tf.float32))

    # 引數初始化
    init = tf.global_variables_initializer()
    
    with tf.Session() as sess:    
        sess.run(init)
        
        # 開始訓練
        for i in range(training_steps):
            # 產生當前輪的訓練批次
            xs, ys = mnist.train.next_batch(batch_size)
            sess.run(train_step, feed_dict = {x: xs, y:ys})
            
            # 每一千次訓練測試一下驗證集正確率
            if i % 1000 == 0:
                train_acc = sess.run(accuracy, feed_dict = {x: mnist.train.images, y: mnist.train.labels})
                
                test_acc= sess.run(accuracy, feed_dict = {x: mnist.test.images, y: mnist.test.labels})
                print("After %d training step, train accuracy is %g, test accuracy is %g" %(i, train_acc, test_acc))

# 執行程式
mnist = input_data.read_data_sets('./', one_hot = True)
tf.reset_default_graph()
train(mnist)

 在tensorboard下,執行結果曲線圖,這裡的加粗線表示訓練集精度,另外的線表示測試集精度。當迭代到一定程度時,曲線收斂。

 

 

五、卷積神經網路

對於簡單的卷積神經網路,其結構如下圖所示:

 

有了之前的基礎,我們可以瞭解卷積神經網路的搭建。首先一般的卷積網路主要有:

  • 卷積層
  • 池化層
  • 全連線層

其中全連線層與我們之前介紹的網路一樣

首先是卷積層:這裡的第一個和第二個維度表示了過濾器的尺寸,第三個維度表示了當前輸入圖層也就是上層的輸出的通道數,第四個維度表示了過濾器的個數。

 

回顧一下卷積層的操作:

根據上面的圖示看, 

# 定義過濾器的權重
filter_weight = tf.get_variable('weight', [5,5,3,16], initializer = tf.truncated_normal_initializer(stddev = 0.1))

# 定義過濾器的偏置
bias = tf.get_variable('biases', [16], initializer = tf.truncated_normal_initializer(0.1))

之後我們定義卷積層運算:

conv = tf.nn.conv2d(input, filter_weight, strides = [1,1,1,1], padding = 'SAME')

這裡的步長由於支隊矩陣的長和寬有效,因此第一維和最後一維的一定為1

padding = 'SAME'表示用0填充,padding = 'VALID'表示不填充

設原影象尺寸為W * W,步長為s,過濾器尺寸為f * f,則經過卷積運算後的影象尺寸:

VALID:\left \lceil (W-f+1) / s \right \rceil

SAME:\left \lceil W / s \right \rceil

 

池化層又分成最大池化層和平均池化層:

# 最大池化層
# ksize維度裡第一個和第四個必須為1,第二個和第三個維度表示過濾器尺寸
pool = tf.nn.max_pool(actived_conv, ksize = [1,3,3,1],
                      strides = [1,2,2,1], padding = 'SAME')
# 平均池化層
pool = tf.nn.avg_pool(actived_conv, ksize = [1,3,3,1],
                      strides = [1,2,2,1], padding = 'SAME')

 

下面使用cifar-10資料集,cifar-10資料集總共有60000張彩色影象,50000張用於訓練,另外10000張用於測試。其中每張影象是32*32*3規格的。共有10個類別,每個類別有5000張

 

使用VGGNet網路將會帶來較好的效率,但也會因此產生巨大的時間開銷,因此這裡我們使用一個簡化版的VGG-16網路對影象進行識別:

import tensorflow as tf
import numpy as np
import pickle

# 讀取cifar-10資料集
def load_CIFAR_batch(filename):
    with open(filename, 'rb') as f:
        datadict = pickle.load(f,encoding='latin1')
        X = datadict['data']
        Y = datadict['labels']
        X = X.reshape(10000, 3, 32,32).transpose(0,2,3,1).astype("float")
        Y = np.array(Y)
        return X, Y

# 給定檔案路徑,解析資料為標準格式
def load_data(file_path):
    X_train = []
    Y_train = []
    file_name = file_path + "data_batch_"
    for i in range(1, 6):
        X, Y = load_CIFAR_batch(file_name + str(i))
        X_train.append(X)
        Y_train.append(Y)
    X_train = np.concatenate(X_train)
    Y_train = np.concatenate(Y_train)
    del X, Y
    
    X_test, Y_test = load_CIFAR_batch(file_path + "test_batch")
    
    train_label = np.zeros([Y_train.shape[0], 10])
    test_label = np.zeros([Y_test.shape[0], 10])
    for i in range(Y_train.shape[0]):
        train_label[i, Y_train[i]] = 1
    for i in range(Y_test.shape[0]):
        test_label[i, Y_test[i]] = 1
    
    return X_train, train_label, X_test, test_label

# 定義全連線層
def fc_op(input_op, name, n_out):
    n_in = input_op.get_shape()[-1].value
    
    with tf.name_scope(name) as scope:
        weight = tf.get_variable(
            name = scope + "w",
            shape = [n_in, n_out],
            initializer = tf.truncated_normal_initializer(stddev = 0.2))
        bias = tf.get_variable('bias',
                               [n_out],
                               initializer =  tf.constant_initializer(0.1))
        result = tf.matmul(input_op, weight) + bias
        
        return result

# 定義卷積層
def conv_op(input_op, name, kh, kw, n_out, dh, dw):
    '''
    input_op:上層輸入
    name:空間命名名稱
    kh,kw:過濾器的尺寸
    n_out:過濾器數目,也可以理解成輸出通道數
    dh,dw:步長的高,和寬
    '''
    
    # 獲取上層輸入的通道數目
    n_in = input_op.get_shape()[-1].value
    
    with tf.name_scope(name) as scope:
        # 定義過濾器
        kernel = tf.get_variable(
            name = scope + "w",
            shape = [kh, kw, n_in, n_out],
            initializer = tf.truncated_normal_initializer(stddev = 0.2))
        
        # 卷積運算
        conv = tf.nn.conv2d(input_op, kernel, (1, dh, dw, 1), padding = 'SAME')
        # 定義偏置
        bias = tf.get_variable(scope + "b", [n_out],  initializer = tf.constant_initializer(0.1))
        # relu啟用
        conv = tf.nn.relu(tf.nn.bias_add(conv, bias), name = scope)
        
        return conv

# 定義最大池化層
def pool_op(input_op, name ,kh, kw, dh, dw):
    '''
    input_op:上層輸入
    kh,kw:過濾器尺寸
    dh,dw:步長的高和寬
    '''
    pool = tf.nn.max_pool(input_op,
                          ksize = [1,kh,kw,1],
                          strides = [1,dh,dw,1],
                          padding = 'SAME',
                          name = name)
    return pool

# 定義神經網路的正向傳播
def inference(input_op, keep_prob):
    # 第一個卷積層
    conv1_1 = conv_op(input_op, name = 'conv1_1', kh = 3, kw = 3, n_out = 64,
                     dh = 1, dw = 1)
    conv1_2 = conv_op(conv1_1, name = 'conv1_2', kh = 3, kw = 3, n_out = 64,
                     dh = 1, dw = 1)
    pool_1 = pool_op(conv1_2, name = 'pool_1', kh = 2, kw =  2, dw = 2, dh = 2)
    
    
    # 第二個卷積層
    conv2_1 = conv_op(pool_1, name = 'conv2_1', kh = 3, kw = 3, n_out = 128,
                     dh = 1, dw = 1)
    conv2_2 = conv_op(conv2_1, name = 'conv2_2', kh = 3, kw = 3, n_out = 128,
                     dh = 1, dw = 1)
    pool_2 = pool_op(conv1_2, name = 'pool_2', kh = 2, kw =  2, dw = 2, dh = 2)
    
    
    # 第三個卷積層
    conv3_1 = conv_op(pool_2, name = 'conv3_1', kh = 3, kw = 3, n_out = 256,
                     dh = 1, dw = 1)
    conv3_2 = conv_op(conv2_1, name = 'conv3_2', kh = 3, kw = 3, n_out = 256,
                     dh = 1, dw = 1)
    pool_3 = pool_op(conv3_2, name = 'pool_3', kh = 2, kw =  2, dw = 2, dh = 2)
    

    # 將卷積層傳來的輸入壓成一個向量
    shape = pool_3.get_shape()
    flattened_shape = shape[1].value * shape[2].value * shape[3].value
    reshape = tf.reshape(pool_3, [-1, flattened_shape], name = 'reshape')
    
    # 使用dropout正則化,以keep_prob概率選擇神經元
    reshape = tf.nn.dropout(reshape, keep_prob)
    
    # 全連線層
    logit = fc_op(reshape, 'fc1', 10)
    
    return logit

def train(X_train, Y_train, X_test, Y_test):
    x = tf.placeholder(tf.float32, [None, 32, 32, 3])
    y = tf.placeholder(tf.float32, [None, 10])
    keep_prob = tf.placeholder(tf.float32)
    
    # 定義正向傳播
    logit = inference(x, keep_prob)
    
    # 定義損失函式
    loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(
        logits = logit, labels = tf.argmax(y, 1)))
    
    # 使用Adam下降法
    train_step = tf.train.AdamOptimizer(1e-4).minimize(loss)
    
    # 計算準確率
    correction_prediction = tf.equal(tf.argmax(logit, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correction_prediction, tf.float32))
    
    # 初始化變數
    init = tf.global_variables_initializer()
    
    # 定義會話,開始訓練神經網路
    with tf.Session() as sess:
        sess.run(init)
        
        for i in range(30000):
            start = (i * batch_size) % X_train.shape[0]
            end = min(start + batch_size, X_train.shape[0])
            sess.run(train_step, feed_dict = {x:X_train[start:end], y:Y_train[start:end], keep_prob:0.9})
        # 總共迭代3萬次,如果將所有的資料集合直接傳入神經網路將會導致記憶體空間不足,一般的情況下我們可以選取批次逐次訓
        #練,最終將結果累加求平均。這裡為簡單起見,我僅僅從測試集中抽取了一個batch_size的結果測試模型效果
        if i % 1000 == 0:
            num_epoch_train = X_train.shape[0] // batch_size
            num_epoch_test = X_test.shape[0] // batch_size
            train_accuracy = 0
            test_accuracy = 0
            for j in range(num_epoch_train):
                start = (j * batch_size) % X_train.shape[0]
                end = min(start + batch_size, X_train.shape[0])
                train_accuracy += sess.run(accuracy, feed_dict = {x: X_train[start:end], y:Y_train[start:end],keep_prob:1.0})
            train_accuracy /= num_epoch_train
            for k in range(num_epoch_test):
                start = (k * batch_size) % X_test.shape[0]
                end = min(start + batch_size, X_test.shape[0])
                test_accuracy += sess.run(accuracy, feed_dict = {x: X_test[start:end], y:Y_test[start:end], keep_prob:1.0})
            test_accuracy /= num_epoch_test
            print("After %d, train correction: %g, test correction: %g" %(i, train_accuracy, test_accuracy))


            
# 執行訓練過程
batch_size = 128
X_train, Y_train, X_test, Y_test = load_data("C:/Users/14981/Desktop/dataset/cifar-10-batches-py/")
tf.reset_default_graph()
train(X_train, Y_train, X_test, Y_test)

這個網路由於結構比較簡單,因此識別效率大概在70左右。如果要提高整個訓練準確率,可以考慮使用其他的網路結構