梯度下降法是求解無約束最優化問題最常見的方法,其基本思想是通過在負梯度方向通過一定的步長慢慢逼近最優解的過程。 

假設需要擬合函式:y = {f(\theta ,x)}{x}\in R^ny\in R

給定資料集 {(x^1,y^1),(x^2,y^2), ..., (x^m,y^m)}, 我們需要最小化損失函式J(\theta )=\frac{1}{2m}\sum_{i=1}^m(f(\theta ,x^{i})-y^i)^{2}來求得引數 \theta

求導:

\frac{\partial J}{\partial \theta_{j}}=\frac{1}{m}\sum_{i=1}^{m}(f(\theta,x^i)-y^i)x_{j}^{i}

可以令導數等於0求得 \theta 

但在實際情況中,我們可能無法通過導數等於0求出解析解,因為:

1、有些方程非常複雜,無法求導

2、可以求得導數,但是無法通過解方程得到解

因此我們還是需要通過梯度下降法來求得 \theta 的最優解,而且梯度下降法是一種迴圈迭代的方法,更適合用計算機去求解。 

一、批量梯度下降(Batch Gradient Descent)

    對 \theta 的每輪更新,需要用到所有的樣本:

     \theta_{j} := \theta_{j} - \lambda \frac{\partial J}{\partial \theta_{j}}

     \theta_{j} := \theta_{j} - \frac{\lambda}{m}\sum_{i=1}^{m}(f(\theta,x^i)-y^i)x_{j}^{i}

     \lambda 為步長,表示每次往梯度負方向移動距離的大小,步長的設定不能過大也不能過小,太大可能會導致無法收斂到最優解,太小則使得迭代過慢。

     批量梯度下降每次迭代會選擇全域性最優的梯度方向,因此收斂快,對於一個凸函式會得到全域性最優解,但是每輪迭代需要用到所有的樣本,當樣本數量很大時,效率會非常低。因此當樣本數很大的時候,一般使用隨機梯度下降法。

二、隨機梯度下降(Stochastic Gradient Descent)

    對 \theta 的每輪更新,計算梯度方向只考慮一個樣本:

     \theta_{j} := \theta_{j} - \lambda(f(\theta,x^i)-y^i)x_{j}^{i}

     每輪迭代的梯度方向可能不是最優的梯度方向,但是當樣本數目很大時,迭代足夠的次數一般也能收斂到最優解。

三、小批量梯度下降(mini-batch Gradient Descent)

    結合了批量梯度下降和隨機梯度下降的優點,對 \theta 的每輪更新,計算梯度考慮一部分樣本:

     \theta_{j} := \theta_{j} - \frac{\lambda}{n^{'}}\sum_{i=t}^{t+n^{'}}(f(\theta,x^i)-y^i)x_{j}^{i}

# -*- coding: utf-8 -*-

import random 
import numpy as np 
import time

def  linear(theta, x):
    return theta.dot(x.T)
  
def loss(pred_y, data_y):
    n = len(data_y)
    return  ((pred_y - data_y).T.dot(pred_y - data_y))/(2.0*n)

def  bgd(func, data_x, data_y, step, eps, max_iter):
    n_iter = 1 
    n, m = data_x.shape 
    theta = np.random.rand(m)
    cost, last_cost = 1, 0 
    while cost > eps and n_iter < max_iter:
        print('n_iter:', n_iter, '  theta = ', theta, '  loss=', last_cost)
        delta = 0
        for i in range(n):
            pred_y = func(theta, data_x[i])
            delta += step*(pred_y-data_y[i])*data_x[i]/n

        theta = theta - delta
        pred_y = func(theta, data_x)
        new_cost = loss(pred_y, data_y)            
        cost = abs(new_cost-last_cost)
        last_cost = new_cost
        n_iter += 1

    return theta 


def sgd(func, data_x, data_y, step, eps, max_iter):
    n_iter = 1
    n, m = data_x.shape
    theta = np.random.rand(m) 
    
    data = np.insert(data_x, m, data_y, axis=1)
    np.random.shuffle(data)
    data_x = data[:,0:m]
    data_y = data[:,m]
    
    cost, last_cost = 1, 0 
    for i in range(n):
        print('n_iter:', n_iter, '  theta = ', theta, '  loss=', last_cost)
        pred_y = func(theta, data_x[i])
        theta = theta - step*(pred_y - data_y[i])*data_x[i]
        
        pred_y = func(theta, data_x)
        new_cost = loss(pred_y, data_y)
        cost = abs(new_cost-last_cost)
        last_cost = new_cost

        if cost < eps:
            break

        n_iter += 1
        if n_iter > max_iter:
            break 

    return theta 
        

def mbgd(func, data_x, data_y, step, eps, max_iter):
    n_iter = 1 
    n, m = data_x.shape 
    theta = np.random.rand(m)
    data = np.insert(data_x, m, data_y, axis=1)
    np.random.shuffle(data)
    data_x = data[:,0:m]
    data_y = data[:,m]

    cost, last_cost = 1, 0
    for i in range(0,n,10):
        print('n_iter:', n_iter, '  theta = ', theta, '  loss=', last_cost)
        delta = 0
        for j in range(10):
            pred_y = func(theta, data_x[i+j])
            delta += step*(pred_y-data_y[i+j])*data_x[i+j]/10.0

        theta = theta - delta
    
        pred_y = func(theta, data_x)
        new_cost = loss(pred_y, data_y)
        cost = abs(new_cost-last_cost)
        last_cost = new_cost
        if cost < eps:
            break

        n_iter += 1
        if n_iter > max_iter:
            break

    return theta 


if __name__ == "__main__":
    n = 10000      # number of samples
    m = 2        # data dimension 
   
    data_x = np.random.normal(size = n*(m+1))
    data_x = data_x.reshape(-1,3) 
    data_x[:,0] = 1.0                  # set x0 = 1

    data_y = 3 + sum((np.array([2,1])*data_x[:,1:]).T) + np.random.normal(size=n)   # y = w0x0 + w1*x1 + w2*x2 + noise, x0 = 1
    
    t = time.time()
    theta = sgd(linear, data_x, data_y, step = 0.001, eps = 0.000000001, max_iter = 10000)
    print(theta)
    print('- Time consumed: ', time.time()-t, ' seconds.')