1. 程式人生 > >Python3.x實現神經網路

Python3.x實現神經網路

本文采用python實現神經網路,並通過實現的神經網路對手寫數字進行分類。

確定隱藏層節點數的公式:

模型的訓練和評估:


示例程式碼:

from functools import reduce
import random
import struct
from datetime import datetime
from numpy import *

# 啟用函式
def sigmoid(inX):
    return 1.0 / (1 + exp(-inX))


# 節點類,負責記錄和維護節點自身資訊以及與這個節點相關的上下游連線,實現輸出層和誤差項的計算
class Node(object):
    def __init__(self, layer_index, node_index):
        '''
        構造節點物件
        :param layer_index: 節點所屬層的編號
        :param node_index: 節點的編號
        '''
        self.layer_index = layer_index
        self.node_index = node_index
        self.downstream = []
        self.upstream = []
        self.output = 0
        self.delta = 0

    def set_output(self, output):
        '''
        設定節點的輸出值,如果節點屬於輸入層會用到這個函式
        :param output:
        :return:
        '''
        self.output = output

    def append_downstream_connection(self, conn):
        '''
        新增一個到下游節點的連線
        :param conn:
        :return:
        '''
        self.downstream.append(conn)

    def append_upstream_connection(self, conn):
        '''
        新增一個到上游節點的連線
        :param conn:
        :return:
        '''
        self.upstream.append(conn)

    def calc_output(self):
        '''
        計算節點的輸出值
        :return:
        '''
        output = reduce(lambda ret, conn: ret + conn.upstream_node.output * conn.weight,
                        self.upstream, 0)
        self.output = sigmoid(output)

    def calc_hidden_layer_delta(self):
        '''
        節點屬於隱藏層時,計算delta
        :return:
        '''
        downstream_delta = reduce(
            lambda ret, conn: ret + conn.downstream_node.delta * conn.weight,
            self.downstream, 0.0
        )
        self.delta = self.output * (1 - self.output) * downstream_delta

    def calc_output_layer_delta(self, label):
        '''
        節點屬於輸出層時,計算delta
        :param label:
        :return:
        '''
        self.delta = self.output * (1 - self.output) * (label - self.output)

    def __str__(self):
        '''
        列印節點資訊
        :return:
        '''
        node_str = '%u-%u: output: %f delta: %f' % (self.layer_index, self.node_index, self.output, self.delta)
        downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '')
        upstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.upstream, '')
        return node_str + '\n\tdownstream:' + downstream_str + '\n\tupstream:' + upstream_str

# ConstNode物件,用於實現一個輸出恆為1的節點(計算偏置項的wb時需要)
class ConstNode(object):
    def __init__(self, layer_index, node_index):
        '''
        構造節點物件
        :param layer_index:節點所屬的層的編號
        :param node_index: 節點的編號
        '''
        self.layer_index = layer_index
        self.node_index = node_index
        self.downstream = []
        self.output = 1

    def append_downstream_connection(self, conn):
        '''
        新增一個到下游節點的連線
        :param conn:
        :return:
        '''
        self.downstream.append(conn)

    def calc_hidden_layer_delta(self):
        '''
        節點屬於隱藏層時,計算delta
        :return:
        '''
        downstream_delta = reduce(
            lambda ret, conn: ret + conn.downstream_node.delta * conn.weight,
            self.downstream, 0.0
        )

    def __str__(self):
        '''
        列印節點的資訊
        '''
        node_str = '%u-%u: output: 1' % (self.layer_index, self.node_index)
        downstream_str = reduce(lambda ret, conn: ret + '\n\t' + str(conn), self.downstream, '')
        return node_str + '\n\tdownstream:' + downstream_str

class Layer(object):
    '''
    負責初始化一個層,此外,作為對Node的集合物件,提供對Node集合的操作
    '''
    def __init__(self, layer_index, node_count):
        '''
        初始化一層
        :param layer_index: 層編號
        :param node_count: 層所包含的節點個數
        '''
        self.layer_index = layer_index
        self.nodes = []
        for i in range(node_count):
            self.nodes.append(Node(layer_index, i))

        self.nodes.append(ConstNode(layer_index, node_count))

    def set_output(self, data):
        '''
        設定層的輸出,當層是輸入層時會用到
        :param delta:
        :return:
        '''
        for i in range(len(data)):
            self.nodes[i].set_output(data[i])

    def calc_output(self):
        '''
        計算層的輸出向量
        :return:
        '''
        for node in self.nodes[: -1]:
            node.calc_output()

    def dump(self):
        '''
        列印層的資訊
        :return:
        '''
        for node in self.nodes:
            print(node)


class Connection(object):
    '''
    主要職責是記錄連線的權重,以及這個連線所關聯的上下游節點
    '''
    def __init__(self, upstream_node, downstream_node):
       '''
       初始化連線,權重初始化為一個很小的隨機數
       :param upstream_node: 連線的上游節點
       :param downstream_node: 連線的下游節點
       '''
       self.upstream_node = upstream_node
       self.downstream_node = downstream_node
       self.weight = random.uniform(-0.1, 0.1)
       self.gradient = 0.0

    def calc_gradient(self):
        '''
        計算梯度
        :return:
        '''
        self.gradient = self.downstream_node.delta * self.upstream_node.output

    def get_gradient(self):
        '''
        獲得當前的梯度
        :return:
        '''
        return self.gradient

    def update_weight(self, rate):
        '''
        根據梯度下降演算法更新權重
        :param rate:
        :return:
        '''
        self.calc_gradient()
        self.weight += rate * self.gradient

    def __str__(self):
        '''
        列印連線資訊
        :return:
        '''
        return '(%u-%u) -> (%u-%u) = %f' % (
            self.upstream_node.layer_index,
            self.upstream_node.node_index,
            self.downstream_node.layer_index,
            self.downstream_node.node_index,
            self.weight)


class Connections(object):
    '''
    提供Connection的集合操作
    '''
    def __init__(self):
        self.connections = []

    def add_connection(self, connection):
        self.connections.append(connection)

    def dump(self):
        for conn in self.connections:
            print(conn)

class Network(object):
    def __init__(self, layers):
        '''
        初始化一個全連線的神經網路
        :param layers: 二維陣列,描述神經網路每層節點數
        '''
        self.connections = Connections()
        self.layers = []
        layer_count = len(layers)
        nodeC_count = 0
        for i in range(layer_count):
            self.layers.append(Layer(i, layers[i]))
        for layer in range(layer_count - 1):
            connections = [Connection(upstream_node, downstream_node)
                           for upstream_node in self.layers[layer].nodes
                           for downstream_node in self.layers[layer + 1].nodes[:-1]]
            for conn in connections:
                self.connections.add_connection(conn)
                conn.downstream_node.append_upstream_connection(conn)
                conn.upstream_node.append_downstream_connection(conn)

    def train(self, labels, data_set, rate, iteration):
        '''
        訓練神經網路
        :param labels:陣列,訓練樣本標籤,每個元素是一個樣本的標籤
        :param data_set: 二維陣列,訓練樣本特徵,每個元素是一個樣本的特徵
        :param rate: 學習率
        :param iteration: 迭代次數
        :return:
        '''
        for i in range(iteration):
            for d in range(len(data_set)):
                self.train_one_sample(labels[d], data_set[d], rate)

    def train_one_sample(self, label, sample, rate):
        '''
        內部函式,用一個樣本訓練網路
        :param label:
        :param sample:
        :param rate:
        :return:
        '''
        self.predict(sample)
        self.calc_delta(label)
        self.update_weight(rate)

    def calc_delta(self, label):
        '''
        內部函式,計算每個節點的delta
        :param label:
        :return:
        '''
        output_nodes = self.layers[-1].nodes
        for i in range(len(label)):
            output_nodes[i].calc_output_layer_delta(label[i])
        for layer in self.layers[-2:: -1]:
            for node in layer.nodes:
                node.calc_hidden_layer_delta()

    def update_weight(self, rate):
        '''
        內部函式,更新每個連線的權重
        :param rate:
        :return:
        '''
        for layer in self.layers[: -1]:
            for node in layer.nodes:
                for conn in node.downstream:
                    conn.update_weight(rate)

    def calc_gradient(self):
        '''
        內部函式,用於計算每個連線的梯度
        :return:
        '''
        for layer in self.layers[:-1]:
            for node in layer.nodes:
                for conn in node.downstream:
                    conn.calc_gradient()

    def get_gradient(self, label, sample):
        '''
        獲得網路在一個樣本下,每個連線上的梯度
        :param label: 樣本標籤
        :param sample: 樣本輸入
        :return:
        '''
        self.predict(sample)
        self.calc_delta(label)
        self.calc_gradient()

    def predict(self, sample):
        '''
        根據輸入的樣本預測輸出值
        :param sample: 陣列,樣本的特徵,也是網路的輸入向量
        :return:
        '''
        self.layers[0].set_output(sample)
        for i in range(1, len(self.layers)):
            self.layers[i].calc_output()
        return map(lambda node: node.output, self.layers[-1].nodes[: -1])

    def dump(self):
        '''
        列印網路資訊
        :return:
        '''
        for layer in self.layers:
            layer.dump()

# 梯度檢查
def gradient_check(network, sample_feature, sample_label):
    '''
    梯度檢查
    :param network: 神經網路物件
    :param sample_feature: 樣本的特徵
    :param sample_label: 樣本的標籤
    :return:
    '''
    # 計算網路誤差
    network_error = lambda vec1, vec2: \
        0.5 * reduce(lambda a, b: a + b, map(lambda v: (v[0] - v[1]) *
                                                       (v[0] - v[1]),
                                             zip(vec1, vec2)))
    # 獲取網路在當前樣本下每個連線的梯度
    network.get_gradient(sample_feature, sample_label)

    # 對每個權重對梯度檢查
    for conn in network.connections.connections:
        # 獲取指定連線的梯度
        actual_gradient = conn.get_gradient()

        # 增加一個很小的值,計算網路的誤差
        epsilon = 0.0001
        conn.weight += epsilon
        error1 = network_error(network.predict(sample_feature), sample_label)

        # 減去一個很小的值,計算網路的誤差
        conn.weight -= 2 * epsilon  # 剛在加過了一次,因此需要減去2倍
        error2 = network_error(network.predict(sample_label), sample_label)

        # 根據式子計算期望的梯度值
        expected_gradient = (error2 - error1) / (2 * epsilon)

        # 列印
        print('expected gradient: \t%f\nactual gradient: \t%f' % (
            expected_gradient, actual_gradient))

class Loader(object):
    # 資料載入器基類
    def __init__(self, path ,count):
        '''
        初始化載入器
        :param path:資料檔案路徑
        :param count: 檔案中的樣本個數
        '''
        self.path = path
        self.count = count

    def get_file_content(self):
        '''
        讀取檔案內容
        :return:
        '''
        f = open(self.path, 'rb')
        content = f.read()
        f.close()
        return content

    def to_int(self, byte):
        '''
        將unsigned byte字元轉換為整數
        :param byte:
        :return:
        '''
        # return struct.unpack('B', byte)[0]
        return byte

class ImageLoader(Loader):
    def get_picture(self, content, index):
        '''
        內部函式,從檔案中獲取影象
        :param content:
        :param index:
        :return:
        '''
        start = index * 28 * 28 + 16
        picture = []
        for i in range(28):
            picture.append([])
            for j in range(28):
                picture[i].append(
                    self.to_int(content[start + i * 28 + j])
                )
        return picture

    def get_one_sample(self, picture):
        '''
        內部函式,將影象轉化為樣本的輸入向量
        :param picture:
        :return:
        '''
        sample = []
        for i in range(28):
            for j in range(28):
                sample.append(picture[i][j])
        return sample

    def load(self):
        '''
        載入資料檔案,獲得全部樣本的輸入向量
        :return:
        '''
        content = self.get_file_content()
        data_set = []
        for index in range(self.count):
            data_set.append(
                self.get_one_sample(
                    self.get_picture(content, index)
                )
            )
        return data_set

class LabelLoader(Loader):
    '''
    標籤載入器
    '''
    def load(self):
        '''
        載入資料檔案,獲得全部樣本的標籤向量
        :return:
        '''
        content = self.get_file_content()
        labels = []
        for index in range(self.count):
            labels.append(self.norm(content[index + 8]))
        return labels

    def norm(self, label):
        '''
        內部函式,將一個值轉換為10維標籤向量
        :param label:
        :return:
        '''
        label_vec = []
        label_value = self.to_int(label)
        for i in range(10):
            if i == label_value:
                label_vec.append(0.9)
            else:
                label_vec.append(0.1)
        return label_vec


def get_training_data_set():
    '''
    獲得訓練資料集
    :return:
    '''
    image_loader = ImageLoader('MNIST_data/train-images-idx3-ubyte', 60000)
    label_loader = LabelLoader('MNIST_data/train-labels-idx1-ubyte', 60000)
    return image_loader.load(), label_loader.load()


def get_test_data_set():
    '''
    獲得測試資料集
    '''
    image_loader = ImageLoader('MNIST_data/t10k-images-idx3-ubyte', 10000)
    label_loader = LabelLoader('MNIST_data/t10k-labels-idx1-ubyte', 10000)
    return image_loader.load(), label_loader.load()


# 獲得輸出結果值
def get_result(vec):
    max_value_index = 0
    max_value = 0
    for i in range(len(vec)):
        if vec[i] > max_value:
            max_value = vec[i]
            max_value_index = i
    return max_value_index

# 採用錯誤率評估訓練結果
def evaluate(network, test_data_set, test_labels):
    error = 0
    total = len(test_data_set)

    for i in range(total):
        label = get_result(test_labels[i])
        predict = get_result(network.predict(test_data_set[i]))
        if label != predict:
            error += 1

    return float(error) / float(total)

# 每訓練10輪,評估一次準確率,當準確率下降時停止訓練
def train_and_evaluate():
    last_error_ratio = 1.0
    epoch = 0
    train_data_set, train_labels = get_training_data_set()
    test_data_set, test_labels = get_test_data_set()
    network = Network([784, 300, 10])
    while True:
        epoch += 1
        network.train(train_labels, train_data_set, 0.3, 1)
        print('%s epoch %d finished' % (datetime.now(), epoch))
        if epoch % 10 == 0:
            error_ratio = evaluate(network, test_data_set, test_labels)
            print('%s after epoch %d, error ratio is %f' % (datetime.now(), epoch, error_ratio))
            if error_ratio > last_error_ratio:
                break
            else:
                last_error_ratio = error_ratio
# Main
if __name__ == '__main__':
    train_and_evaluate()

執行結果:

沒有GPU,訓練時間太長了,就不給出訓練結果了,需要訓練資料可以發訊息到我郵箱。