python 實現神經網路 處理資料集cifar10

python 實現單隱層神經網路,處理cifar10資料集


# -*- coding: utf-8 -*-
# File name : forward_neural_network.py
# Create date : 2018-12-25 20:04
# Modified date : 2018-12-31 16:02
# Author : DARREN
# Describe : not set
# Email : [email protected]
##################################### from __future__ import division from __future__ import print_function import time import numpy as np #import matplotlib.pyplot as plt import cifar10_dataset import func import model_base import graph class DeepModel(model_base.ModelBase): def __init__(self)
: super(DeepModel, self).__init__() self.hyper = self._get_hyperparameters() self.graph = graph.NeuralGraph() def _get_hyperparameters(self): dic = {} dic["batch_size"] = 128 dic["epsilon"] = 0.0000001 dic["reg_lambda"] = 0.05 dic["learn_rate"
] = 0.005 dic["max_steps"] = 50000 dic["train_steps"] = 50 dic["max_epochs"] = 50 dic["input_dim"] = 32*32*3#img size dic["hidden_dim"] = 2048 dic["output_dim"] = 10 dic["file_path"] = "./data/" dic["activation"] = "sigmoid" dic["architecture"] = [32*32*3, 2048, 10] return dic def _train_model_with_epochs(self, model=None): data_generator = self._get_train_generator() while 1: X, Y, status = self._get_a_batch_data(data_generator) if status == False: model['epochs'] += 1 break model = self.graph.core_graph(model, X, Y, self.hyper) model["steps"] += 1 if model["steps"] % self.hyper["train_steps"] == 0: self._print_train_status(model) return model def _train_model_with_steps(self, model=None, data_generator=None): if data_generator == None: data_generator = self._get_train_generator() while 1: X, Y, status = self._get_a_batch_data(data_generator) if status == False: data_generator = self._get_train_generator() model['epochs'] += 1 model = self.graph.core_graph(model, X, Y, self.hyper) model["steps"] += 1 if model["steps"] % self.hyper["train_steps"] == 0: break return model, data_generator def _test_model(self, model): data_generator = self._get_test_generator() count = 1 all_correct_numbers = 0 all_loss = 0.0 while count: X, Y, status = self._get_a_batch_data(data_generator) if status == False: break start_time = time.time() model, prob, a1, Z1, loss, accuracy, comp = self.graph.forward_propagation(model, X, Y, self.hyper) end_time = time.time() model = self._record_test_speed(model, start_time, end_time) all_loss += loss all_correct_numbers += len(np.flatnonzero(comp)) count += 1 avg_loss = all_loss / count accuracy = all_correct_numbers / (count * self.hyper["batch_size"]) self._test_update_model(model, avg_loss, accuracy) self._print_test_status(model) self._record_model_status(model) return model


# -*- coding: utf-8 -*-
# File name : graph.py
# Create date : 2018-12-30 23:06
# Modified date : 2018-12-31 16:01
# Author : DARREN
# Describe : not set
# Email : [email protected]
from __future__ import division
from __future__ import print_function

import time
import numpy as np

import func

def _record_speed(model, start_time, end_time, hyper_dic):
    dual_time = end_time - start_time
    speed = hyper_dic["batch_size"]/dual_time
    model["train_speed"] = speed
    return model

class NeuralGraph(object):
    def __init__(self):
        super(NeuralGraph, self).__init__()
    def _init_model(self, hyper_dic, model=None):
        if not model:
            architecture = hyper_dic["architecture"]
            W1 = np.random.randn(architecture[0], architecture[1])
            b1 = np.ones((1, architecture[1]))
            W2 = np.random.randn(architecture[1], architecture[2])
            b2 = np.ones((1, architecture[2]))

            model = {}
            model["W1"] = W1
            model["b1"] = b1
            model["W2"] = W2
            model["b2"] = b2
            model["steps"] = 1
            model["epochs"] = 0
            model["record"] = {}
            model["best_test_accuracy"] = 0.0
            model["best_epoch"] = 0
        return model

    def _normalization(self, batch_img):
        return batch_img / 255.

    def forward_propagation(self, model, X, Y, hyper_dic):
        activation_str = hyper_dic["activation"]
        model = self._init_model(hyper_dic, model)
        W1 = model["W1"]
        b1 = model["b1"]
        W2 = model["W2"]
        b2 = model["b2"]

        X = self._normalization(X)
        Z1 = np.dot(X, W1)+b1
        a1 = func.activation(Z1, activation_str)
        logits = np.dot(a1, W2)+b2
        prob = func.softmax(logits)

        correct_probs = prob[range(X.shape[0]), np.argmax(Y, axis=1)]
        correct_logprobs = - func.log(correct_probs)

        data_loss = np.sum(correct_logprobs)
        loss = 1./X.shape[0] * data_loss

        pre_Y = np.argmax(prob, axis=1)
        comp = pre_Y == np.argmax(Y, axis=1)
        accuracy = len(np.flatnonzero(comp))/Y.shape[0]

        return model, prob, a1, Z1, loss, accuracy, comp

    def backward_propagation(self, model, prob, X, Y, a1, Z1, hyper_dic):
        activation_str = hyper_dic["activation"]
        learn_rate = hyper_dic["learn_rate"]

        W2 = model["W2"]
        dY_pred = prob - Y
        dW2 = np.dot(a1.T, dY_pred)
        da1 = np.dot(dY_pred, W2.T)
        dadZ = func.activation_derivative(Z1, activation_str)
        dZ1 = da1 * dadZ
        dW1 = np.dot(X.T, dZ1)
        model["W2"] += -learn_rate * dW2
        model["W1"] += -learn_rate * dW1

        return model

    def core_graph(self, model, X, Y, hyper_dic):
        start_time = time.time()
        model, prob, a1, Z1, loss, accuracy, comp = self.forward_propagation(model, X, Y, hyper_dic)
        model["train_loss"] = loss
        model["train_accuracy"] = accuracy
        model = self.backward_propagation(model, prob, X, Y, a1, Z1, hyper_dic)
        end_time = time.time()
        model = _record_speed(model, start_time, end_time, hyper_dic)
        return model


# -*- coding: utf-8 -*-
# File name : func.py
# Create date : 2018-12-30 21:33
# Modified date : 2018-12-31 14:23
# Author : DARREN
# Describe : not set
# Email : [email protected]
from __future__ import division
from __future__ import print_function

import numpy as np

def sig(x):
    pos = np.where(x >= 0)
    p = 1.0 / (1 + np.exp(-x[pos]))

    neg = np.where(x < 0)
    n = np.exp(x[neg]) / (1 + np.exp(x[neg]))

    x[pos] = p
    x[neg] = n
    return x

def relu(x):
    return (np.abs(x) + x) / 2.0

def relu_derivative(x):
    x[x <= 0] = 0
    x[x > 0] = 1
    return x

def sig_deirvative(x):
    return sig(x) * (1 - sig(x))

def log(x, epsilon=0.0000001):
    x = x + epsilon
    return np.log(x)

def softmax(logits, epsilon=0.0000001):
    logits_max = np.max(logits, axis=1)
    for i in range(len(logits)):
        logits[i] = logits[i] - logits_max[i]
    logits = logits + epsilon
    exp_score = np.exp(logits)
    prob = exp_score/np.sum(exp_score, axis=1, keepdims=1)
    return prob

def activation(x, activation_str="relu"):
    activation_dic = {}
    activation_dic["relu"] = relu
    activation_dic["sigmoid"] = sig
    f = activation_dic[activation_str]
    return f(x)

def activation_derivative(x, activation_str="relu"):
    activation_derivative_dic = {}
    activation_derivative_dic["relu"] = relu_derivative
    activation_derivative_dic["sigmoid"] = sig_deirvative
    f = activation_derivative_dic[activation_str]
    return f(x)


# -*- coding: utf-8 -*-
# File name : model_base.py
# Create date : 2018-12-25 20:04
# Modified date : 2018-12-31 16:02
# Author : DARREN
# Describe : not set
# Email : [email protected]
from __future__ import division
from __future__ import print_function

import numpy as np
import matplotlib.pyplot as plt

import cifar10_dataset
import func

class ModelBase(object):
    def __init__(self):
        super(ModelBase, self).__init__()
        self.hyper = self._get_hyperparameters()

    def _get_dataset(self):
        dataset = cifar10_dataset.Cifar10Set(self.hyper["file_path"])
        return dataset

    def _get_train_generator_with(self, dataset):
        data_generator = dataset.get_train_data_generator(self.hyper["batch_size"])
        return data_generator

    def _get_test_generator_with(self, dataset):
        data_generator = dataset.get_test_data_generator(self.hyper["batch_size"])
        return data_generator

    def _get_train_generator(self):
        dataset = self._get_dataset()
        data_generator = self._get_train_generator_with(dataset)
        return data_generator

    def _get_test_generator(self):
        dataset = self._get_dataset()
        data_generator = self._get_test_generator_with(dataset)
        return data_generator

    def _get_a_batch_data(self, data_generator):
        dataset = self._get_dataset()
        batch_img, batch_labels, status = dataset.get_a_batch_data(data_generator)
        return batch_img, batch_labels, status

    def _record_test_speed(self, model, start_time, end_time):
        dual_time = end_time - start_time
        speed = self.hyper["batch_size"]/dual_time
        model["test_speed"] = speed
        return model

    def show_hyperparameters(self):
        for key in self.hyper:
            print("%s:%s" % (key, self.hyper[key]))

    def _normalization(self, batch_img):
        return batch_img / 255.

    def _print_train_status(self, model):
        print("epoch:%s steps:%s Train_Loss:%2.5f Train_Acc:%2.5f" % (model["epochs"], model["steps"], model["train_loss"], model["train_accuracy"]))

    def _print_test_status(self, model):
        print("E:%s S:%s Train_Loss:%2.5f Test_Loss:%2.5f Train_Acc:%2.5f Test_Acc:%2.5f gap:%2.5f Train_Speed:%s Test_Speed:%s" % (model["epochs"], model["steps"], model["train_loss"], model["test_loss"], model["train_accuracy"], model["test_accuracy"], model["train_test_gap"], model["train_speed"], model["test_speed"]))
        print("best_epoch:%s best_test_acc:%s" % (model["best_epoch"], model["best_test_accuracy"]))

    def _test_update_model(self, model, avg_loss, accuracy):
        if accuracy > model["best_test_accuracy"]:
            model["best_test_accuracy"] = accuracy
            model["best_epoch"] = model["epochs"]

        model["test_loss"] = avg_loss
        model["test_accuracy"] = accuracy
        model["train_test_gap"] = model["train_accuracy"] - model["test_accuracy"]
        return model

    def _record_model_status(self, model):
        steps_dic = {}
        steps_dic["epochs"] = model["epochs"]
        steps_dic["steps"] = model["steps"]
        steps_dic["train_loss"] = model["train_loss"]
        steps_dic["train_accuracy"] = model["train_accuracy"]
        steps_dic["test_loss"] = model["test_loss"]
        steps_dic["test_accuracy"] = model["test_accuracy"]
        steps_dic["train_test_gap"] = model["train_test_gap"]
        record = model["record"]
        record[model["steps"]] = steps_dic



