1. 程式人生 > >梯度下降法(BGD,SGD,MSGD)python+numpy具體實現

梯度下降法(BGD,SGD,MSGD)python+numpy具體實現

梯度下降是一階迭代優化演算法。為了使用梯度下降找到函式的區域性最小值,一個步驟與當前位置的函式的梯度(或近似梯度)的負值成正比。如果相反,一個步驟與梯度的正數成比例,則接近該函式的區域性最大值;該程式隨後被稱為梯度上升。梯度下降也被稱為最陡峭的下降,或最快下降的方法。(from wikipad)

首先,大家要明白它的本質:這是一個優化演算法!!!它是可以用來解決很多問題的,一般學習機器學習的朋友都會線上性迴歸的遇到這個名詞,但是要宣告的是,它和最小二乘法類似,是用於求解線性迴歸問題的一種方法。同時它的功能又不僅於此,它線上性迴歸中的意義在於通過尋找梯度最大的方向下降(或上升)來找到損失函式最小時候對應的引數值。

好了,繞來繞去的就拿線性迴歸的例子來和大家講講吧。

梯度下降方法

本質是每次迭代的時候都沿著梯度最大的地方更新引數。現在假設有函式(Rosenbrock函式:是一個用來測試最優化演算法效能的非凸函式,由Howard Harry Rosenbrock在1960年提出[1]。也稱為Rosenbrock山谷或Rosenbrock香蕉函式,也簡稱為香蕉函式)如下定義:

f(x,y)=(1x)2+100(yx2)2
很明顯,其最小最為f(1,1)=0 ,其三維圖片如下:
這裡寫圖片描述
函式f 分別對 xy 求導得到
f(x,y)x=2(1x)2100(yx2)2x
f(x,y)y=
2100(yx2)

在實現的過程中可以給出x, y初始值(例如設定為 0, 0) 然後計算函式在這個點的梯度,並按照梯度方向更新x, y的值。

這裡給出通過梯度下降法計算上述函式的最小值對應的x 和 y

import numpy as np


def cal_rosenbrock(x1, x2):
    """
    計算rosenbrock函式的值
    :param x1:
    :param x2:
    :return:
    """
    return (1 - x1) ** 2 + 100 * (x2 - x1 ** 2) ** 2


def cal_rosenbrock_prax
(x1, x2):
""" 對x1求偏導 """ return -2 + 2 * x1 - 400 * (x2 - x1 ** 2) * x1 def cal_rosenbrock_pray(x1, x2): """ 對x2求偏導 """ return 200 * (x2 - x1 ** 2) def for_rosenbrock_func(max_iter_count=100000, step_size=0.001): pre_x = np.zeros((2,), dtype=np.float32) loss = 10 iter_count = 0 while loss > 0.001 and iter_count < max_iter_count: error = np.zeros((2,), dtype=np.float32) error[0] = cal_rosenbrock_prax(pre_x[0], pre_x[1]) error[1] = cal_rosenbrock_pray(pre_x[0], pre_x[1]) for j in range(2): pre_x[j] -= step_size * error[j] loss = cal_rosenbrock(pre_x[0], pre_x[1]) # 最小值為0 print("iter_count: ", iter_count, "the loss:", loss) iter_count += 1 return pre_x if __name__ == '__main__': w = for_rosenbrock_func() print(w)

如果大家想執行這個演算法,建議使用預設的引數,效果還不錯。不要把step_size設定過大,會出問題的(可能是實現過程有問題,請指正)。

線性迴歸問題

這裡關於迴歸的前導介紹我建議大家取看周志華老師的西瓜書,介紹得通透明亮,但是周老師對線性迴歸問題給出的解決方法是通過最小二乘法來做的,而我們在這裡要用梯度下降。

這裡給出一般的定義吧~

一般的線性迴歸方程如下:

y=θ1x1+θ2x2++θnxn+b
轉換為:
y=θ1x1+θ2x2++θnxn+θ0b
這裡θ0=1轉換為向量的形式y=θTxθx,均為為行向量。

現在需要定義損函式,用於判斷最後得到的預測引數的預測效果。常用的損失函式是均方誤差:

J(θ)=12mj=1m(h(θ)iyi)2
i 是維度索引 j 是樣本索引,接下來對θ 求導得到
J(θ)θj=1mj=1m(h(θ)iyi)xij
更新公式為:
θi=θiα1mj=1m(h(θ)iyi)xij
α 就是學習的步長。

BGM(批量梯度下降法)

import numpy as np

def gen_line_data(sample_num=100):
    """
    y = 3*x1 + 4*x2
    :return:
    """
    x1 = np.linspace(0, 9, sample_num)
    x2 = np.linspace(4, 13, sample_num)
    x = np.concatenate(([x1], [x2]), axis=0).T
    y = np.dot(x, np.array([3, 4]).T)  # y 列向量
    return x, y

def bgd(samples, y, step_size=0.01, max_iter_count=10000):
    sample_num, dim = samples.shape
    y = y.flatten()
    w = np.ones((dim,), dtype=np.float32)
    loss = 10
    iter_count = 0
    while loss > 0.001 and iter_count < max_iter_count:
        loss = 0
        error = np.zeros((dim,), dtype=np.float32)
        for i in range(sample_num):
            predict_y = np.dot(w.T, samples[i])
            for j in range(dim):
                error[j] += (y[i] - predict_y) * samples[i][j]

        for j in range(dim):
            w[j] += step_size * error[j] / sample_num

        for i in range(sample_num):
            predict_y = np.dot(w.T, samples[i])
            error = (1 / (sample_num * dim)) * np.power((predict_y - y[i]), 2)
            loss += error

        print("iter_count: ", iter_count, "the loss:", loss)
        iter_count += 1
    return w

if __name__ == '__main__':
    samples, y = gen_line_data()
    w = bgd(samples, y)
    print(w)  # 會很接近[3, 4]

SGB(隨機梯度下降法)

import numpy as np

def gen_line_data(sample_num=100):
    """
    y = 3*x1 + 4*x2
    :return:
    """
    x1 = np.linspace(0, 9, sample_num)
    x2 = np.linspace(4, 13, sample_num)
    x = np.concatenate(([x1], [x2]), axis=0).T
    y = np.dot(x, np.array([3, 4]).T)  # y 列向量
    return x, y

def sgd(samples, y, step_size=0.01, max_iter_count=10000):
    """
    隨機梯度下降法
    :param samples: 樣本
    :param y: 結果value
    :param step_size: 每一接迭代的步長
    :param max_iter_count: 最大的迭代次數
    :param batch_size: 隨機選取的相對於總樣本的大小
    :return:
    """
    sample_num, dim = samples.shape
    y = y.flatten()
    w = np.ones((dim,), dtype=np.float32)
    loss = 10
    iter_count = 0
    while loss > 0.001 and iter_count < max_iter_count:
        loss = 0
        error = np.zeros((dim,), dtype=np.float32)
        for i in range(sample_num):
            predict_y = np.dot(w.T, samples[i])
            for j in range(dim):
                error[j] += (y[i] - predict_y) * samples[i][j]
                w[j] += step_size * error[j] / sample_num

        # for j in range(dim):
        #     w[j] += step_size * error[j] / sample_num

        for i in range(sample_num):
            predict_y = np.dot(w.T, samples[i])
            error = (1 / (sample_num * dim)) * np.power((predict_y - y[i]), 2)
            loss += error

        print("iter_count: ", iter_count, "the loss:", loss)
        iter_count += 1
    return w

if __name__ == '__main__':
    samples, y = gen_line_data()
    w = sgd(samples, y)
    print(w)  # 會很接近[3, 4]

MBGB(小批量梯度下降法)

import numpy as np
import random

def gen_line_data(sample_num=100):
    """
    y = 3*x1 + 4*x2
    :return:
    """
    x1 = np.linspace(0, 9, sample_num)
    x2 = np.linspace(4, 13, sample_num)
    x = np.concatenate(([x1], [x2]), axis=0).T
    y = np.dot(x, np.array([3, 4]).T)  # y 列向量
    return x, y

def mbgd(samples, y, step_size=0.01, max_iter_count=10000, batch_size=0.2):
    """
    MBGD(Mini-batch gradient descent)小批量梯度下降:每次迭代使用b組樣本
    :param samples:
    :param y:
    :param step_size:
    :param max_iter_count:
    :param batch_size:
    :return:
    """
    sample_num, dim = samples.shape
    y = y.flatten()
    w = np.ones((dim,), dtype=np.float32)
    # batch_size = np.ceil(sample_num * batch_size)
    loss = 10
    iter_count = 0
    while loss > 0.001 and iter_count < max_iter_count:
        loss = 0
        error = np.zeros((dim,), dtype=np.float32)

        # batch_samples, batch_y = select_random_samples(samples, y,
        # batch_size)

        index = random.sample(range(sample_num),
                              int(np.ceil(sample_num * batch_size)))
        batch_samples = samples[index]
        batch_y = y[index]

        for i in range(len(batch_samples)):
            predict_y = np.dot(w.T, batch_samples[i])
            for j in range(dim):
                error[j] += (batch_y[i] - predict_y) * batch_samples[i][j]

        for j in range(dim):
            w[j] += step_size * error[j] / sample_num

        for i in range(sample_num):
            predict_y = np.dot(w.T, samples[i])
            error = (1 / (sample_num * dim)) * np.power((predict_y - y[i]), 2)
            loss += error

        print("iter_count: ", iter_count, "the loss:", loss)
        iter_count += 1
    return w

if __name__ == <