1. 程式人生 > >python神經網路案例——CNN卷積神經網路實現mnist手寫體識別

python神經網路案例——CNN卷積神經網路實現mnist手寫體識別

全棧工程師開發手冊 (作者:欒鵬)

載入樣本資料集

首先我們要有手寫體的資料集檔案

我們實現一個MNIST.py檔案,專門用來讀取手寫體檔案中的資料。

# -*- coding: UTF-8 -*-

# 獲取手寫資料。
# 28*28的圖片物件。每個圖片物件根據需求是否轉化為長度為784的橫向量
# 每個物件的標籤為0-9的數字,one-hot編碼成10維的向量
import numpy as np

# 資料載入器基類。派生出圖片載入器和標籤載入器
class Loader(object):
    # 初始化載入器。path: 資料檔案路徑。count: 檔案中的樣本個數
def __init__(self, path, count): self.path = path self.count = count # 讀取檔案內容 def get_file_content(self): print(self.path) f = open(self.path, 'rb') content = f.read() # 讀取位元組流 f.close() return content # 位元組陣列 # 將unsigned byte字元轉換為整數。python3中bytes的每個分量讀取就會變成int
# def to_int(self, byte): # return struct.unpack('B', byte)[0] # 影象資料載入器 class ImageLoader(Loader): # 內部函式,從檔案位元組陣列中獲取第index個影象資料。檔案中包含所有樣本圖片的資料。 def get_picture(self, content, index): start = index * 28 * 28 + 16 # 檔案頭16位元組,後面每28*28個位元組為一個圖片資料 picture = [] for
i in range(28): picture.append([]) # 圖片新增一行畫素 for j in range(28): byte1 = content[start + i * 28 + j] picture[i].append(byte1) # python3中本來就是int # picture[i].append(self.to_int(byte1)) # 新增一行的每一個畫素 return picture # 圖片為[[x,x,x..][x,x,x...][x,x,x...][x,x,x...]]的列表 # 將影象資料轉化為784的行向量形式 def get_one_sample(self, picture): sample = [] for i in range(28): for j in range(28): sample.append(picture[i][j]) return sample # 載入資料檔案,獲得全部樣本的輸入向量。onerow表示是否將每張圖片轉化為行向量,to2表示是否轉化為0,1矩陣 def load(self,onerow=False): content = self.get_file_content() # 獲取檔案位元組陣列 data_set = [] for index in range(self.count): #遍歷每一個樣本 onepic =self.get_picture(content, index) # 從樣本資料集中獲取第index個樣本的圖片資料,返回的是二維陣列 if onerow: onepic = self.get_one_sample(onepic) # 將影象轉化為一維向量形式 data_set.append(onepic) return data_set # 標籤資料載入器 class LabelLoader(Loader): # 載入資料檔案,獲得全部樣本的標籤向量 def load(self): content = self.get_file_content() # 獲取檔案位元組陣列 labels = [] for index in range(self.count): #遍歷每一個樣本 onelabel = content[index + 8] # 檔案頭有8個位元組 onelabelvec = self.norm(onelabel) #one-hot編碼 labels.append(onelabelvec) return labels # 內部函式,one-hot編碼。將一個值轉換為10維標籤向量 def norm(self, label): label_vec = [] # label_value = self.to_int(label) label_value = label # python3中直接就是int for i in range(10): if i == label_value: label_vec.append(1) else: label_vec.append(0) return label_vec # 獲得訓練資料集。onerow表示是否將每張圖片轉化為行向量 def get_training_data_set(num,onerow=False): image_loader = ImageLoader('train-images.idx3-ubyte', num) # 引數為檔案路徑和載入的樣本數量 label_loader = LabelLoader('train-labels.idx1-ubyte', num) # 引數為檔案路徑和載入的樣本數量 return image_loader.load(onerow), label_loader.load() # 獲得測試資料集。onerow表示是否將每張圖片轉化為行向量 def get_test_data_set(num,onerow=False): image_loader = ImageLoader('t10k-images.idx3-ubyte', num) # 引數為檔案路徑和載入的樣本數量 label_loader = LabelLoader('t10k-labels.idx1-ubyte', num) # 引數為檔案路徑和載入的樣本數量 return image_loader.load(onerow), label_loader.load() # 將一行784的行向量,列印成圖形的樣式 def printimg(onepic): onepic=onepic.reshape(28,28) for i in range(28): for j in range(28): if onepic[i,j]==0: print(' ',end='') else: print('* ',end='') print('') if __name__=="__main__": train_data_set, train_labels = get_training_data_set(100) # 載入訓練樣本資料集,和one-hot編碼後的樣本標籤資料集 train_data_set = np.array(train_data_set) #.astype(bool).astype(int) #可以將圖片簡化為黑白圖片 train_labels = np.array(train_labels) onepic = train_data_set[12] # 取一個樣本 printimg(onepic) # 打印出來這一行所顯示的圖片 print(train_labels[12].argmax()) # 列印樣本標籤

我們嘗試執行一下。讀取第13個樣本的內容。

可以看到列印輸出樣式如下。

這裡寫圖片描述

啟用器模組

CNN卷積神經網路並不是只有卷積層,還有采樣層和全連線層。在卷積層和全連線層都有啟用函式。並且在前向預測和後項傳播中需要計算啟用函式的前向預測影響,以及誤差後項傳播影響。所以我們將所有的啟用函式形成了一個獨立的模組來實現。下面的程式碼儲存為Activators.py

# 1. 當為array的時候,預設d*f就是對應元素的乘積,multiply也是對應元素的乘積,dot(d,f)會轉化為矩陣的乘積, dot點乘意味著相加,而multiply只是對應元素相乘,不相加
# 2. 當為mat的時候,預設d*f就是矩陣的乘積,multiply轉化為對應元素的乘積,dot(d,f)為矩陣的乘積

import numpy as np

# rule啟用器
class ReluActivator(object):
    def forward(self, weighted_input):    # 前向計算,計算輸出
        return max(0, weighted_input)

    def backward(self, output):  # 後向計算,計算導數
        return 1 if output > 0 else 0

# IdentityActivator啟用器.f(x)=x
class IdentityActivator(object):
    def forward(self, weighted_input):   # 前向計算,計算輸出
        return weighted_input

    def backward(self, output):   # 後向計算,計算導數
        return 1

#Sigmoid啟用器
class SigmoidActivator(object):
    def forward(self, weighted_input):
        return 1.0 / (1.0 + np.exp(-weighted_input))

    def backward(self, output):
        # return output * (1 - output)
        return np.multiply(output, (1 - output))  # 對應元素相乘

# tanh啟用器
class TanhActivator(object):
    def forward(self, weighted_input):
        return 2.0 / (1.0 + np.exp(-2 * weighted_input)) - 1.0

    def backward(self, output):
        return 1 - output * output

# # softmax啟用器
# class SoftmaxActivator(object):
#     def forward(self, weighted_input):  # 前向計算,計算輸出
#         return max(0, weighted_input)
#
#     def backward(self, output):  # 後向計算,計算導數
#         return 1 if output > 0 else 0

DNN全連線網路層的實現

將下面的程式碼儲存為DNN.py

# 實現神經網路反向傳播演算法,以此來訓練網路。全連線神經網路可以包含多層,但是隻有最後一層輸出前有啟用函式。
# 所謂向量化程式設計,就是使用矩陣運算。

import random
import math
import numpy as np
import datetime
import Activators  # 引入啟用器模組

# 1. 當為array的時候,預設d*f就是對應元素的乘積,multiply也是對應元素的乘積,dot(d,f)會轉化為矩陣的乘積, dot點乘意味著相加,而multiply只是對應元素相乘,不相加
# 2. 當為mat的時候,預設d*f就是矩陣的乘積,multiply轉化為對應元素的乘積,dot(d,f)為矩陣的乘積



# 全連線每層的實現類。輸入物件x、神經層輸出a、輸出y均為列向量
class FullConnectedLayer(object):
    # 全連線層建構函式。input_size: 本層輸入列向量的維度。output_size: 本層輸出列向量的維度。activator: 啟用函式
    def __init__(self, input_size, output_size,activator,learning_rate):
        self.input_size = input_size
        self.output_size = output_size
        self.activator = activator
        # 權重陣列W。初始化權重為(rand(output_size, input_size) - 0.5) * 2 * sqrt(6 / (output_size + input_size))
        wimin = (output_size - 0.5) * 2 * math.sqrt(6 / (output_size + input_size))
        wimax = (input_size-0.5)*2*math.sqrt(6/(output_size + input_size))
        # self.W = np.random.uniform(wimin,wimax,(output_size, input_size))  #初始化為-0.1~0.1之間的數。權重的大小。行數=輸出個數,列數=輸入個數。a=w*x,a和x都是列向量
        self.W = np.random.uniform(-0.1, 0.1,(output_size, input_size))  # 初始化為-0.1~0.1之間的數。權重的大小。行數=輸出個數,列數=輸入個數。a=w*x,a和x都是列向量
        # 偏置項b
        self.b = np.zeros((output_size, 1))  # 全0列向量偏重項
        # 學習速率
        self.learning_rate = learning_rate
        # 輸出向量
        self.output = np.zeros((output_size, 1)) #初始化為全0列向量

    # 前向計算,預測輸出。input_array: 輸入列向量,維度必須等於input_size
    def forward(self, input_array):   # 式2
        self.input = input_array
        self.output = self.activator.forward(np.dot(self.W, input_array) + self.b)

    # 反向計算W和b的梯度。delta_array: 從上一層傳遞過來的誤差項。列向量
    def backward(self, delta_array):
        # 式8
        self.delta = np.multiply(self.activator.backward(self.input),np.dot(self.W.T, delta_array))   #計算當前層的誤差,以備上一層使用
        self.W_grad = np.dot(delta_array, self.input.T)   # 計算w的梯度。梯度=誤差.*輸入
        self.b_grad = delta_array  #計算b的梯度

    # 使用梯度下降演算法更新權重
    def update(self):
        self.W += self.learning_rate * self.W_grad
        self.b += self.learning_rate * self.b_grad

CNN卷積網路層和Pool降取樣層的實現

將下面的程式碼儲存為CNN.py

import numpy as np
import Activators   # 引入自定義的啟用器模組
import math

# 獲取卷積區域。input_array為單通道或多通道的矩陣順。i為橫向偏移,j為縱向偏移,stride為步幅,filter_width為過濾器寬度,filter_height為過濾器的高度
def get_patch(input_array, i, j, filter_width,filter_height, stride):
    '''
    從輸入陣列中獲取本次卷積的區域,
    自動適配輸入為2D和3D的情況
    '''
    start_i = i * stride
    start_j = j * stride
    if input_array.ndim == 2:  #如果只有一個通道
        return input_array[start_i:start_i + filter_height,start_j: start_j + filter_width]
    elif input_array.ndim == 3:  #如果有多個通道,也就是深度上全選
        return input_array[:,start_i: start_i + filter_height,start_j: start_j + filter_width]


# 獲取一個2D區域的最大值所在的索引
def get_max_index(array):
    location = np.where(array == np.max(array))
    return location[0], location[1]


# 計算一個過濾器的卷積運算,輸出一個二維資料。每個通道的輸入是圖片,但是可能不是一個通道,所以這裡自動適配輸入為2D和3D的情況。
def conv(input_array,kernel_array,output_array,stride, bias):
    output_width = output_array.shape[1]   # 獲取輸出的寬度。一個過濾器產生的輸出一定是一個通道
    output_height = output_array.shape[0] # 獲取輸出的高度
    kernel_width = kernel_array.shape[-1]  # 過濾器的寬度。有可能有多個通道。多通道時shape=[深度、高度、寬度],單通道時shape=[高度、寬度]
    kernel_height = kernel_array.shape[-2] # 過濾器的高度。有可能有多個通道。多通道時shape=[深度、高度、寬度],單通道時shape=[高度、寬度]
    for i in range(output_height):
        for j in range(output_width):
            juanjiqu = get_patch(input_array, i, j, kernel_width,kernel_height, stride)   # 獲取輸入的卷積區。(單通道或多通道)
            # 這裡是對每個通道的兩個矩陣對應元素相乘求和,再將每個通道的和值求和
            kernel_values= (np.multiply(juanjiqu,kernel_array)).sum() # 卷積區與過濾器卷積運算。1,一個通道內,卷積區矩陣與過濾器矩陣對應點相乘後,求和值。2、將每個通道的和值再求和。
            output_array[i][j] = kernel_values + bias  #將卷積結果加上偏量



# 為陣列增加Zero padding。zp步長,自動適配輸入為2D和3D的情況
def padding(input_array, zp):
    if zp == 0: # 如果不補0
        return input_array
    else:
        if input_array.ndim == 3:   # 如果輸入有多個通道
            input_width = input_array.shape[2]  # 獲取輸入的寬度
            input_height = input_array.shape[1]  # 獲取輸入的寬度
            input_depth = input_array.shape[0]  # 獲取輸入的深度
            padded_array = np.zeros((input_depth,input_height + 2 * zp,input_width + 2 * zp))  # 先定義一個補0後大小的全0矩陣
            padded_array[:,zp: zp + input_height,zp: zp + input_width] = input_array # 每個通道上,將中間部分替換成輸入,這樣就變成了原矩陣周圍補0 的形式
            return padded_array
        elif input_array.ndim == 2:  # 如果輸入只有一個通道
            input_width = input_array.shape[1] # 獲取輸入的寬度
            input_height = input_array.shape[0] # 虎丘輸入的高度
            padded_array = np.zeros((input_height + 2 * zp,input_width + 2 * zp))  # 先定義一個補0後大小的全0矩陣
            padded_array[zp: zp + input_height,zp: zp + input_width] = input_array  # 將中間部分替換成輸入,這樣就變成了原矩陣周圍補0 的形式
            return padded_array


# 對numpy陣列進行逐個元素的操作。op為函式。element_wise_op函式實現了對numpy陣列進行按元素操作,並將返回值寫回到陣列中
def element_wise_op(array, op):
    for i in np.nditer(array,op_flags=['readwrite']):
        i[...] = op(i)   # 將元素i傳入op函式,返回值,再修改i

# Filter類儲存了卷積層的引數以及梯度,並且實現了用梯度下降演算法來更新引數。
class Filter(object):
    def __init__(self, width, height, depth,filter_num):
        # 卷積核每個元素初始化為[-sqrt(6 / (fan_in + fan_out)), sqrt(6 / (fan_in + fan_out))]。
        # 其中fan_in為輸入通道數與濾波器寬高的乘機,即width*height*depth
        # 其中fan_out為輸出通道數與濾波器寬高的乘機,即width*height*filter_num
        wimin = -math.sqrt(6 / (width*height*depth + width*height*filter_num))
        wimax = -wimin
        self.weights = np.random.uniform(wimin, wimax, (depth, height, width))  # 隨機初始化卷基層權重一個很小的值,
        # self.weights = np.random.uniform(-1e-2, 1e-2,(depth, height, width))  # 隨機初始化卷基層權重一個很小的值,
        self.bias = 0  # 初始化偏量為0
        self.weights_grad = np.zeros(self.weights.shape)   # 初始化權重梯度
        self.bias_grad = 0  # 初始化偏量梯度

    def __repr__(self):
        return 'filter weights:\n%s\nbias:\n%s' % (repr(self.weights), repr(self.bias))

    # 讀取權重
    def get_weights(self):
        return self.weights

    # 讀取偏量
    def get_bias(self):
        return self.bias

    # 更新權重和偏量
    def update(self, learning_rate):
        self.weights -= learning_rate * self.weights_grad
        self.bias -= learning_rate * self.bias_grad

# 用ConvLayer類來實現一個卷積層。下面的程式碼是初始化一個卷積層,可以在建構函式中設定卷積層的超引數
class ConvLayer(object):
    #初始化構造卷積層:輸入寬度、輸入高度、通道數、濾波器寬度、濾波器高度、濾波器數目、補零數目、步長、啟用器、學習速率
    def __init__(self, input_width, input_height,channel_number, filter_width,filter_height, filter_number,
                 zero_padding, stride, activator,learning_rate):
        self.input_width = input_width   #  輸入寬度
        self.input_height = input_height  # 輸入高度
        self.channel_number = channel_number  # 通道數=輸入的深度=過濾器的深度
        self.filter_width = filter_width  # 過濾器的寬度
        self.filter_height = filter_height  # 過濾器的高度
        self.filter_number = filter_number  # 過濾器的數量。
        self.zero_padding = zero_padding  # 補0圈數
        self.stride = stride # 步幅
        self.output_width = int(ConvLayer.calculate_output_size(self.input_width, filter_width, zero_padding,stride))  # 計算輸出寬度
        self.output_height = int(ConvLayer.calculate_output_size(self.input_height, filter_height, zero_padding,stride))  # 計算輸出高度
        self.output_array = np.zeros((self.filter_number,self.output_height, self.output_width)) # 建立輸出三維陣列。每個過濾器都產生一個二維陣列的輸出
        self.filters = []   # 卷積層的每個過濾器
        for i in range(filter_number):
            self.filters.append(Filter(filter_width,filter_height, self.channel_number,filter_number))
        self.activator = activator   # 使用rule啟用器
        self.learning_rate = learning_rate  # 學習速率

    # 計算卷積層的輸出。輸出結果儲存在self.output_array
    def forward(self, input_array):
        self.input_array = input_array  # 多個通道的圖片,每個通道為一個二維圖片
        self.padded_input_array = padding(input_array,self.zero_padding)  # 先將輸入補足0
        for i in range(self.filter_number):  #每個過濾器都產生一個二維陣列的輸出
            filter = self.filters[i]
            conv(self.padded_input_array,filter.get_weights(), self.output_array[i],self.stride, filter.get_bias())
        # element_wise_op函式實現了對numpy陣列進行按元素操作,並將返回值寫回到陣列中
        element_wise_op(self.output_array,self.activator.forward)

    # 後向傳播。input_array為該層的輸入,sensitivity_array為當前層的輸出誤差(和輸出的維度相同),activator為啟用函式
    def backward(self, input_array, sensitivity_array,activator):

        '''
        計算傳遞給前一層的誤差項,以及計算每個權重的梯度
        前一層的誤差項儲存在self.delta_array
        梯度儲存在Filter物件的weights_grad
        '''

        self.forward(input_array)   # 先根據輸入計算經過該卷積層後的輸出。(卷積層有幾個過濾器,輸出層的深度就是多少。輸出每一層為一個二維陣列)
        self.bp_sensitivity_map(sensitivity_array, activator)   # 將誤差傳遞到前一層,self.delta_array儲存上一次層的誤差
        self.bp_gradient(sensitivity_array)   # 計算每個過濾器的w和b梯度

    # 按照梯度下降,更新權重
    def update(self):
        for filter in self.filters:
            filter.update(self.learning_rate)   # 每個過濾器


    # 將誤差項傳遞到上一層。sensitivity_array: 本層的誤差。activator: 上一層的啟用函式
    def bp_sensitivity_map(self, sensitivity_array,activator):   # 公式9
        # 根據卷積步長,對原始sensitivity map進行補0擴充套件,擴充套件成如果步長為1的輸出誤差形狀。再用公式8求解
        expanded_error_array = self.expand_sensitivity_map(sensitivity_array)
        # print(sensitivity_array)
        # full卷積,對sensitivitiy map進行zero padding
        # 雖然原始輸入的zero padding單元也會獲得殘差,但這個殘差不需要繼續向上傳遞,因此就不計算了
        expanded_width = expanded_error_array.shape[2]   # 誤差的寬度
        zp = int((self.input_width + self.filter_width - 1 - expanded_width) / 2)   # 計算步長
        padded_array = padding(expanded_error_array, zp)  #補0操作
        # 初始化delta_array,用於儲存傳遞到上一層的sensitivity map
        self.delta_array = self.create_delta_array()
        # 對於具有多個filter的卷積層來說,最終傳遞到上一層的sensitivity map相當於所有的filter的sensitivity map之和
        for i in range(self.filter_number):   # 遍歷每一個過濾器。每個過濾器都產生多通道的誤差,多個多通道的誤差疊加
            filter = self.filters[i]
            # 將濾波器每個通道的權重權重翻轉180度。
            flipped_weights=[]
            for oneweight in filter.get_weights():  # 這一個濾波器下的每個通道都進行180翻轉
                flipped_weights.append(np.rot90(oneweight, 2))
            flipped_weights = np.array(flipped_weights)
            # 計算與一個filter對應的delta_array
            delta_array = self.create_delta_array()
            for d in range(delta_array.shape[0]):   # 計算每個通道上的誤差,儲存在delta_array的對應通道上
                # print('大小:\n',flipped_weights[d])
                conv(padded_array[i], flipped_weights[d],delta_array[d], 1, 0)
            self.delta_array += delta_array   # 將每個濾波器每個通道產生的誤差疊加

        # 將計算結果與啟用函式的偏導數做element-wise乘法操作
        derivative_array = np.array(self.input_array)  # 複製一個矩陣,因為下面的會改變元素的值,所以深複製了一個矩陣
        element_wise_op(derivative_array,activator.backward)  # 逐個元素求偏導數。
        self.delta_array *= derivative_array  # 誤差乘以偏導數。得到上一層的誤差

    # 計算梯度。根據誤差值,計算本層每個過濾器的w和b的梯度
    def bp_gradient(self, sensitivity_array):
        # 處理卷積步長,對原始sensitivity map進行擴充套件,擴充套件成如果步長為1的輸出誤差形狀。再用公式8求解
        expanded_error_array = self.expand_sensitivity_map(sensitivity_array)
        for i in range(self.filter_number):  # 每個過濾器產生一個輸出
            # 計算每個權重的梯度
            filter = self.filters[i]
            for d in range(filter.weights.shape[0]):   # 過濾器的每個通道都要計算梯度
                conv(self.padded_input_array[d],expanded_error_array[i],filter.weights_grad[d], 1, 0)   #  公式(31、32中間)

            # 計算偏置項的梯度
            filter.bias_grad = expanded_error_array[i].sum()   # 公式(34)

    # 對步長為S的sensitivitymap相應的位置進行補0,將其『還原』成步長為1時的sensitivitymap,再用式8進行求解
    def expand_sensitivity_map(self, sensitivity_array):
        depth = sensitivity_array.shape[0]   # 獲取誤差項的深度
        # 確定擴充套件後sensitivity map的大小,即計算stride為1時sensitivity map的大小
        expanded_width = (self.input_width - self.filter_width + 2 * self.zero_padding + 1)
        expanded_height = (self.input_height - self.filter_height + 2 * self.zero_padding + 1)
        # 構建新的sensitivity_map
        expand_array = np.zeros((depth, expanded_height, expanded_width))
        # 從原始sensitivity map拷貝誤差值,每有拷貝的位置,就是要填充的0
        for i in range(self.output_height):
            for j in range(self.output_width):
                i_pos = i * self.stride
                j_pos = j * self.stride
                expand_array[:, i_pos, j_pos] = sensitivity_array[:, i, j]
        return expand_array

    # 建立用來儲存傳遞到上一層的sensitivity map的陣列。(上一層的輸出也就是這一層的輸入。所以上一層的誤差項的維度和這一層的輸入的維度相同)
    def create_delta_array(self):
        return np.zeros((self.channel_number,self.input_height, self.input_width))


    # 確定卷積層輸出的大小
    @staticmethod
    def calculate_output_size(input_size,filter_size, zero_padding, stride):
        return (input_size - filter_size + 2 * zero_padding) / stride + 1



# Max Pooling層的實現。就是一個卷積區域取最大值,形成輸出。除了Max Pooing之外,常用的還有Mean Pooling——取各樣本的平均值。
# 取樣層並不改變輸入的通道數,也不補零,只是通過某種卷積方式實現降取樣
class MaxPoolingLayer(object):
    # 構造降取樣層,引數為輸入寬度、高度、通道數、濾波器寬度、濾波器高度、步長
    def __init__(self, input_width, input_height,channel_number, filter_width,filter_height, stride):
        self.input_width = input_width
        self.input_height = input_height
        self.channel_number = channel_number
        self.filter_width = filter_width
        self.filter_height = filter_height
        self.stride = stride
        self.output_width = int((input_width -filter_width) / self.stride + 1)
        self.output_height = int((input_height -filter_height) / self.stride + 1)
        self.output_array = np.zeros((self.channel_number,self.output_height, self.output_width))

    # 前向計算。
    def forward(self, input_array):
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    self.output_array[d, i, j] = (get_patch(input_array[d], i, j,self.filter_width,self.filter_height,self.stride).max())   # 獲取卷積區後去最大值

    # 後向傳播誤差
    def backward(self, input_array, sensitivity_array):
        self.delta_array = np.zeros(input_array.shape)
        for d in range(self.channel_number):
            for i in range(self.output_height):
                for j in range(self.output_width):
                    patch_array = get_patch(input_array[d], i, j,self.filter_width,self.filter_height,self.stride)  # 獲取卷積區
                    k, l = get_max_index(patch_array)  # 獲取最大值的位置
                    self.delta_array[d,i * self.stride + k,j * self.stride + l] = sensitivity_array[d, i, j]   # 更新誤差



CNN識別MNIST資料

有了上面的MNIST資料採集模組,DNN模組,CNN模組,就可以來構建一個包含多層的卷積神經網路實現手寫體識別了。

資料集:MNIST資料集,60000張訓練影象,10000張測試影象,每張影象size為28*28。每張畫素值包含灰度值,即一個0-255之間的整數,而沒有rgb三種顏色的值。其實我們可以進一步將畫素灰度值,改為黑白畫素。每個畫素的值為0或1。這樣更加方便。

網路層級結構示意圖如下:

這裡寫圖片描述

網路層級結構概述:5層神經網路

輸入層: 輸入資料為原始訓練影象(我們在實現中將28*28的灰度圖片,改成了28*28的黑白圖片)

第一卷積層:6個5*5的過濾器(卷積核),步長Stride為1,不補0,啟用函式為sigmoid。在這一層,輸入為28*28,深度為1的圖片資料,輸出為24*24的,深度為6的圖片資料。

第一取樣層:過濾器(卷積核)為2*2,步長Stride為2。在這一層,輸入為24*24的,深度為6的圖片資料,輸出為12*12的,深度為6的圖片資料。

第二卷積層:12個5*5的過濾器(卷積核),步長Stride為1,不補0,啟用函式為sigmoid。在這一層,輸入為12*12,深度為6的圖片資料,輸出為8*8的,深度為12的圖片資料。

第二取樣層:過濾器(卷積核)為2*2,步長Stride為2。在這一層,輸入為8*8的,深度為12的圖片資料,輸出為4*4的,深度為12的圖片資料。

輸出層:線性函式輸入為寬高為(4*4*12,1)的列向量,輸出為10維列向量,啟用函式為sigmoid。

程式碼流程概述:

(1)獲取訓練資料和測試資料;

(2)定義網路層級結構;

(3)初始設定網路引數(權重W,偏向b)

(4)訓練網路的超引數的定義(學習率,每次迭代中訓練的樣本數目,迭代次數)

(5)網路訓練——前向運算

(6)網路訓練——反向傳播

(7)網路訓練——引數更新

(8)重複(5)(6)(7),直至滿足迭代次數

(9)網路模型測試

# 使用全連線神經網路類,和手寫資料載入器,實現驗證碼識別。

import datetime
import numpy as np
import Activators  # 引入啟用器模組
import CNN   # 引入卷積神經網路
import MNIST  # 引入手寫資料載入器
import DNN  # 引入全連線神經網路

# 網路模型類
class MNISTNetwork():
    # =============================構造網路結構=============================
    def __init__(self):
        # 初始化構造卷積層:輸入寬度、輸入高度、通道數、濾波器寬度、濾波器高度、濾波器數目、補零數目、步長、啟用器、學習速率
        self.cl1 = CNN.ConvLayer(28, 28, 1, 5, 5, 6, 0, 1, Activators.SigmoidActivator(),0.02)  # 輸入28*28 一通道,濾波器5*5的6個,步長為1,不補零,所以輸出為24*24深度6
        # 構造降取樣層,引數為輸入寬度、高度、通道數、濾波器寬度、濾波器高度、步長
        self.pl1 = CNN.MaxPoolingLayer(24, 24, 6, 2, 2, 2)  # 輸入24*24,6通道,濾波器2*2,步長為2,所以輸出為12*12,深度保持不變為6
        # 初始化構造卷積層:輸入寬度、輸入高度、通道數、濾波器寬度、濾波器高度、濾波器數目、補零數目、步長、啟用器、學習速率
        self.cl2 = CNN.ConvLayer(12, 12, 6, 5, 5, 12, 0, 1, Activators.SigmoidActivator(),0.02)  # 輸入12*12,6通道,濾波器5*5的12個,步長為1,不補零,所以輸出為8*8深度12
        # 構造降取樣層,引數為輸入寬度、高度、通道數、濾波器寬度、濾波器高度、步長
        self.pl2 = CNN.MaxPoolingLayer(8, 8, 12, 2, 2, 2)  # 輸入8*8,12通道,濾波器2*2,步長為2,所以輸出為4*4,深度保持不變為12。共192個畫素
        # 全連線層建構函式。input_size: 本層輸入向量的維度。output_size: 本層輸出向量的維度。activator: 啟用函式
        self.fl1 = DNN.FullConnectedLayer(192, 10, Activators.SigmoidActivator(),0.02)  # 輸入192個畫素,輸出為10種分類概率,學習速率為0.05

    # 根據輸入計算一次輸出。因為卷積層要求的資料要求有通道數,所以onepic是一個包含深度,高度,寬度的多維矩陣
    def forward(self,onepic):
        # print('圖片:',onepic.shape)
        self.cl1.forward(onepic)
        # print('第一層卷積結果:',self.cl1.output_array.shape)
        self.pl1.forward(self.cl1.output_array)
        # print('第一層取樣結果:',self.pl1.output_array.shape)
        self.cl2.forward(self.pl1.output_array)
        # print('第二層卷積結果:',self.cl2.output_array.shape)
        self.pl2.forward(self.cl2.output_array)
        # print('第二層取樣結果:',self.pl2.output_array.shape)
        flinput = self.pl2.output_array.flatten().reshape(-1, 1)  # 轉化為列向量
        self.fl1.forward(flinput)
        # print('全連線層結果:',self.fl1.output.shape)
        return  self.fl1.output

    def backward(self,onepic,labels):
        # 計算誤差
        delta = np.multiply(self.fl1.activator.backward(self.fl1.output), (labels - self.fl1.output))  # 計算輸出層啟用函式前的誤差
        # print('輸出誤差:',delta.shape)

        # 反向傳播
        self.fl1.backward(delta)  # 計算了全連線層輸入前的誤差,以及全連線的w和b的梯度
        self.fl1.update()  # 更新權重w和偏量b
        # print('全連線層輸入誤差:', self.fl1.delta.shape)
        sensitivity_array = self.fl1.delta.reshape(self.pl2.output_array.shape)  # 將誤差轉化為同等形狀
        self.pl2.backward(self.cl2.output_array, sensitivity_array)  # 計算第二取樣層的輸入誤差。引數為第二取樣層的 1、輸入,2、輸出誤差
        # print('第二取樣層的輸入誤差:', self.pl2.delta_array.shape)
        self.cl2.backward(self.pl1.output_array, self.pl2.delta_array,Activators.SigmoidActivator())  # 計算第二卷積層的輸入誤差。引數為第二卷積層的 1、輸入,2、輸出誤差,3、啟用函式
        self.cl2.update()  # 更新權重w和偏量b
        # print('第二卷積層的輸入誤差:', self.cl2.delta_array.shape)
        self.pl1.backward(self.cl1.output_array, self.cl2.delta_array)  # 計算第一取樣層的輸入誤差。引數為第一取樣層的 1、輸入,2、輸出誤差
        # print('第一取樣層的輸入誤差:', self.pl1.delta_array.shape)
        self.cl1.backward(onepic, self.pl1.delta_array,Activators.SigmoidActivator())  # 計算第一卷積層的輸入誤差。引數為第一卷積層的 1、輸入,2、輸出誤差,3、啟用函式
        self.cl1.update()  # 更新權重w和偏量b
        # print('第一卷積層的輸入誤差:', self.cl1.delta_array.shape)



# 由於使用了邏輯迴歸函式,所以只能進行分類識別。識別ont-hot編碼的結果
if __name__ == '__main__':

    # =============================載入資料集=============================
    train_data_set, train_labels = MNIST.get_training_data_set(600, False)  # 載入訓練樣本資料集,和one-hot編碼後的樣本標籤資料集。樣本數量越大,訓練時間越久,也越準確
    test_data_set, test_labels = MNIST.get_test_data_set(100, False)  # 載入測試特徵資料集,和one-hot編碼後的測試標籤資料集。訓練時間越久,也越準確
    train_data_set = np.array(train_data_set).astype(bool).astype(int)    #可以將圖片簡化為黑白圖片
    train_labels = np.array(train_labels)
    test_data_set = np.array(test_data_set).astype(bool).astype(int)    #可以將圖片簡化為黑白圖片
    test_labels = np.array(test_labels)
    print('樣本資料集的個數:%d' % len(train_data_set))
    print('測試資料集的個數:%d' % len(test_data_set))


    # =============================構造網路結構=============================
    mynetwork =MNISTNetwork()

    # 列印輸出每層網路
    # print('第一卷積層:\n',mynetwork.cl1.filters)
    # print('第二卷積層:\n', mynetwork.cl2.filters)
    # print('全連線層w:\n', mynetwork.fl1.W)
    # print('全連線層b:\n', mynetwork.fl1.b)

    # =============================迭代訓練=============================
    for i in range(10):  #迭代訓練10次。每個迭代內,對所有訓練資料進行訓練,更新(訓練影象個數/batchsize)次網路引數
        print('迭代:',i)
        for k in range(train_data_set.shape[0]):  #使用每一個樣本進行訓練
            # 正向計算
            onepic =train_data_set[k]
            onepic = np.array([onepic])  # 卷積神經網路要求的輸入必須包含深度、高度、寬度三個維度。
            result = mynetwork.forward(onepic)   # 前向計算一次
            # print(result.flatten())
            labels = train_labels[k].reshape(-1, 1)  # 獲取樣本輸出,轉化為列向量
            mynetwork.backward(onepic,labels)



    # 列印輸出每層網路
    # print('第一卷積層:\n',mynetwork.cl1.filters)
    # print('第二卷積層:\n', mynetwork.cl2.filters)
    # print('全連線層w:\n', mynetwork.fl1.W)
    # print('全連線層b:\n', mynetwork.fl1.b)

    # =============================評估結果=============================

    right = 0
    for k in range(test_data_set.shape[0]):  # 使用每一個樣本進行訓練
        # 正向計算
        onepic = test_data_set[k]
        onepic = np.array([onepic])  # 卷積神經網路要求的輸入必須包含深度、高度、寬度三個維度。
        result = mynetwork.forward(onepic)  # 前向計算一次
        labels = test_labels[k].reshape(-1, 1)  # 獲取樣本輸出,轉化為列向量
        # print(result)
        pred_type = result.argmax()
        real_type = labels.argmax()

        # print(pred_type,real_type)
        if pred_type==real_type:
            right+=1


    print('%s after right ratio is %f' % (datetime.datetime.now(), right/test_data_set.shape[0]))  # 列印輸出正確率

注意由於使用的樣本和迭代次數少,所以訓練模型在訓練集上表現不錯,但是在測試集上表現不好。

由於讀者訓練時間太久,所以本文已經將訓練集數量改成了600,測試集數量改成了100。這樣大概在半個小時內就可以完成訓練。但是測試集的效