1. 程式人生 > >Bobo老師機器學習筆記第六課-梯度下降法線上性迴歸中的應用

Bobo老師機器學習筆記第六課-梯度下降法線上性迴歸中的應用

在上一篇部落格中大概介紹了一下梯度下降法,那麼梯度下降法線上性迴歸中如何更好的應用了,本篇部落格做一介紹。

在BoBo老師的課程中,梯度下降法主要講了2中,批量梯度下降法(Batch Gradient Descent)和隨機梯度下降法(Stochastic Gradient Descent)。 

一、理論介紹

1、批量梯度下降法(Batch Gradient Descent)

損失函式以及未使用向量化的方程:

進行向量化後的梯度方程:

從上圖方程中可以看出,每一次求theta的值都要把所有的樣本遍歷一遍,所以這是為什麼成為批量梯度下降法。

2、隨機梯度下降法(Stonastic Gradient Descent)

損失函式不變,但計算計算梯度的方法如下:

從上圖的公式可以看出,計算梯度是隨機取出其中一個樣本進行計算的。此外還要注意學習率的區別:

a一般取5,b為50,i_iters表示當前迭代的次數。 而這個值在批量梯度學習演算法中是一個常量,一般是0.01 

二、線上性迴歸中的應用

# -*- coding: utf-8 -*-
import numpy as np
from metrics import r2_score


class LinearRegression(object):
    def __init__(self):
        self.coef_ = None  # 表示係數
        self.intercept_ = None  # 表示截距
        self._theta = None  # 過程計算值,不需要暴露給外面

    def fit_normal(self, X_train, y_train):
        """根據訓練資料集X_train, y_train, 利用正規方程進行訓練Linear Regression模型,利用正規方程訓練的時候就不需要對資料進行向量化"""
        assert X_train is not None and y_train is not None, "訓練集X和Y不能為空"
        assert X_train.shape[0] == y_train.shape[0], "訓練集X和Y的樣本數要相等"
        # np.linalg.inv(X) 表示求X的逆矩陣

        # 不能忘了X要增加一列,第一列資料為0
        ones = np.ones(shape=(len(X_train), 1))
        X_train = np.hstack((ones, X_train))
        self._theta = np.linalg.inv(X_train.T.dot(X_train)).dot(X_train.T).dot(y_train)
        self.intercept_ = self._theta[0]
        self.coef_ = self._theta[1:]

    def fit_gd(self, X_train, y_train, eta=0.01, n_iters=1e4):
        """
        用批量梯度下降法訓練模型
        :param X_train: 經過向量化的特徵資料
        :param y_train:
        :param eta: 步長
        :param n_iters: 迭代次數
        :return:
        """
        def J(theta, X_b, y):
            """
            損失函式
            此處要注意:X_b相對於原特徵矩陣多了一列 n * 1的列向量, 所以X_b是(m, n+1)的矩陣
            :return:
            """
            return np.sum((y - X_b.dot(theta)) ** 2) / len(y)

        def DJ(theta, X_b, y):
            """
            獲取梯度
            :return:
            """
            # 註釋掉的演算法是不用向量計算的實現
            # res = np.empty(len(theta))
            # res[0] = np.sum(X_b.dot(theta) - y)
            # for i in range(1, len(theta)):
            #     res[i] = (X_b.dot(theta) - y).dot(X_b[:, i])
            # return res * 2 / len(X_b)
            return X_b.T.dot(X_b.dot(theta) - y) * 2 / len(X_b)

        def gredient_descent(X_b, y, theta, n_inters=1e4, eta=0.01, epsilon=1e-8):
            """
            利用批量梯度下降法訓練線性迴歸
            :param X_b: 是(m, n+1)的矩陣
            :param y:
            :param init_ethta: etha初始化值
            :param n_inters: 迭代次數
            :param eta: 變化率步長, 預設是0.01
            :param epsilon: 精度,用來比較當前etha和上一次etha差值
            :return:
            """
            cur_inter = 0
            while cur_inter < n_inters:
                last_theta = theta
                theta = theta - eta * DJ(theta, X_b, y)
                if abs(J(theta, X_b, y) - J(last_theta, X_b, y)) < epsilon:
                    break
                cur_inter += 1
            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
        init_ethta = np.zeros(X_b.shape[1])
        self._theta = gredient_descent(X_b, y_train, init_ethta, eta=eta, n_inters=n_iters)
        self.coef_ = self._theta[1:]
        self.intercept_ = self._theta[0]

        return self

    def fit_sgd(self, X_train, y_train, n_iters=5, t0=5.0, t1=50.0):
        """
        利用隨機梯度下降法訓練線性迴歸
        :param X_train: 向量化的特徵值
        :param y_train:
        :param n_iters: 迭代次數
        :param t0: 用來計算學習率
        :param t1: 用來計算學習率
        :return:
        """

        def DJ(theta, X_b_i, y_i):
            """
            獲取梯度
            X_b_i: 是X_b向量中的一個樣本
            :return:
            """
            return X_b_i.T.dot(X_b_i.dot(theta) - y_i) * 2


        def sgd(X_b, y, theta, n_inters, t0, t1):
            def learning_rate(t):
                return t0 / (t + t1)

            m = len(X_b)
            for cur_index in range(n_inters):
                indexs = np.random.permutation(m)
                X_b_new = X_b[indexs]
                y_new = y[indexs]
                for i in range(m):
                    gradient = DJ(theta, X_b_new[i], y_new[i])
                    theta = theta - learning_rate(cur_index * m + i) * gradient
            return theta

        X_b = np.hstack([np.ones((len(X_train), 1)), X_train])
        initial_theta = np.random.randn(X_b.shape[1])
        self._theta = sgd(X_b, y_train, initial_theta, n_iters, t0, t1)
        self.coef_ = self._theta[1:]
        self.intercept_ = self._theta[0]

    def predict(self, X_test):
        """給定待預測資料集X_test,返回表示X_test的結果向量"""
        assert X_test.shape[1] == self.coef_.shape[0], '測試集X的特徵值個數不對'
        ones = np.ones(shape=(len(X_test), 1))
        X_test = np.hstack((ones, X_test))
        return X_test.dot(self._theta)

    def score(self, X_test, y_test):
        """根據測試資料集 X_test 和 y_test 確定當前模型的準確度"""
        assert X_test.shape[0] == y_test.shape[0], '測試集X和Y的個數不相等'
        return r2_score(y_test, self.predict(X_test))

    def __repr__(self):
        return '%s(coef_:%s, intercept_:%s)' % (self.__class__.__name__, self.coef_, self.intercept_)

def test_regession_using_gd():
    from linearregression import LinearRegression
    # step 1: 建立訓練資料
    m = 10000  # 假設10000個樣本
    x = np.random.normal(size=m)
    X = x.reshape(-1, 1)
    y = 4. * x + 3. + np.random.normal(0, 3, size=m)

    # step 2: 資料標準化
    X_train, X_test, y_train, y_test = train_test_split(X, y)
    standardscaler = StandardScaler()
    standardscaler.fit(X_train)
    x_train_standard = standardscaler.transform(X_train)

    # step 3: 訓練模型
    lrg = LinearRegression()
    # 批量梯度下降法
    # lrg.fit_gd(x_train_standard, y_train, eta=0.001, n_iters=1e6)
    # 隨機梯度下降法
    lrg.fit_sgd(x_train_standard, y_train)

    # step 4: 獲取評分
    x_test_standard = standardscaler.transform(X_test)
    print 'score:', lrg.score(x_test_standard, y_test)

執行結果:

批量梯度下降法

LinearRegression(coef_:[4.01485425], intercept_:3.0079330321024687) score: 0.6569687845397328

隨機梯度下降法 LinearRegression(coef_:[3.96601229], intercept_:3.030732598906986) score: 0.644882902625483