深度學習基礎之線性迴歸學習
線性迴歸
理論
線性迴歸最簡單和最經典的機器學習模型之一。
任何一個機器學習模型都會有如下4個要素:
- 訓練資料
- 數學模型
- 損失函式
- 計算方法
訓練資料
下面先闡述一下訓練資料的數學定義,假設我們訓練資料中有 m
個樣本,每個輸入樣本中有 n
個特徵輸入和一個標註的輸出:
第 i
個樣本,記作:
一個模型的表現好壞,訓練資料的構建往往是最重要的一步,我們在做模型的時候往往會所這樣一句話:
Garbage In, Garbage Out
不過如何構造和優化訓練資料往往跟要解決的業務問題領域以及人工標註的質量有關,且跟採用哪種機器學習模型無關,所以就不在本文中詳細描述。
而其他三個 數學模型
損失函式
計算方法
都是定義了某一種機器學習模型的關鍵,所以本文就詳細介紹這幾個點。
數學模型
線性迴歸的數學模型定義很簡單,就是一維的線性方程:
而訓練資料中的第 i
個樣本,我們記作:
從上可以看出線性迴歸的數學假設是,訓練資料中所有輸出和所有輸入的特徵之間是一個線性表示式的關係。
您很有可能會問,那如果我輸入的訓練資料之間他本身不是一個線性關係的話,那這個線性迴歸不就完全失效了麼?
確實是這樣的,很多情況下我們拿到的資料並不一定會是線性關係,針對這種情況,我們一般會採取兩種辦法來解決:
- 調整數學模型,不用線性方程來表達,用二維,三維甚至更高維度的方程式來表達這個數學關係
- 調整輸入特徵,將輸入特徵做各種函式變化,比如可以將
當成一個特徵的話,我們就可以將一個二維模型才能表達的公式轉換成一維方程就能表達了
在實際生產環境中,我們一般不會升高數學模型的維度,但是可能會做變化,比如我們之後會講到的深度學習,就是將多個一維線性模型串聯起來作為一個大的模型。
但是不管採用什麼樣的模型,都是基於假設的,所以機器學習的領域不管怎麼優化,都是會有一個準確率和召回率的指標的,因為有可能這個假設就是不對的,當然學習出來的模型也是由偏差的。
說遠了,讓我們回到我們的線性迴歸的模型上來,在本文中,為了描述簡單,我們假設 n=2
來演示計算過程,比如我們要做一個房價預測系統,特徵有兩個,一個是房屋面積,一個是房屋年齡,那麼我們的計算公式如下:
那麼我們的任務就變成了 如何利用訓練資料求解 **[w1, w2, b]**
這幾個引數 。
在資料量比較少的情況下,我們可以通過初中學過的解方程的思想來求解這個引數,一般來說,只要有3個(即求解引數個數)樣本,我們就能求解這個引數了,這個解,我們叫 解析解
。
但是在實際情況中,我們的資料量遠遠比3個要多,選不同的3個樣本的時候求出來的解是不一樣的,而最終又無法通過一個 [w1, w2, b]
的引數值來滿足所有訓練資料集合,即通過解方程得到的結論是 無解
,那麼我們的任務就變成 在所有解當中找到一個最優的近似解,我們稱這個解為 數值解
。
下面兩章我們就來介紹,我們如何尋找到最優的近似解。
損失函式
要找到最優解,我們就需要來衡量一個解相比於另一個解到底哪個解更好。怎麼衡量呢,就是我們本節要講的 損失函式
了。
以如上的房屋面積的例子來距離,假設我麼有兩組解:

然後我們用訓練資料,分別用 w
和 w'
來計算出一個房屋價格的陣列,分別為:
而輸入的訓練樣本的真實的房屋面積的陣列為:

那麼我們就是比較 和 哪個離我們的真實資料 更相近,哪個就更好,那麼怎麼定義呢,一般我們採用每個元素之差的平方和來定義,如下:
這個函式,我們也就稱之為 損失函式 。如上兩組引數哪個好,只需要比較 和 對應的損失函式取得的值,哪個損失函式的值比較小,我們就選擇哪一組引數。
當然,損失函式也不一定要採用平方,用絕對值,用四次方都可以,用什麼函式,就跟下一步求解過程有關了,針對線性迴歸問題,我們一般都採用的是平方和損失函式。
那麼到目前為止,我們的任務已經變成了,在所有可能的 [w1, w2, b]
的組合中,找尋到一組解, 使得 loss(y)
損失函式的取值最小 。
計算過程
如上一節損失函式中描述,我們已經將我們的問題求解過程轉換成了一個數學求解的任務,找到損失函式最小的引數解。
這個求解過程,最廣泛被使用的方法就是 隨機梯度下降 演算法。該演算法的指導思想也很簡單,就是 隨機 選擇一組引數值 [w]
作為初始值,然後執行N次迴圈,每次迴圈都根據優化演算法改變一點點 w
的值,使得損失函式能 下降 一點點,經過有限次迴圈之後,如果發現損失函式基本上不怎麼變化了,就可以停止迴圈,將當前計算出來的 w
當成是近似最優解返回。
那麼您可能也看出來了,如上的描述中提到了隨機,也提到了下降,但是名字中的梯度沒提到,同時可能您也看出來了,上面描述中還有一個黑盒一般的描述: 每次根據優化演算法改變一點點 **w**
的值 。那其實這裡的優化演算法就是對損失函式求梯度,梯度是微積分上的一個數學定義,定義參考我們在數學基礎一章中的描述。這裡簡單說一下,就是針對每個 w
中的引數,我們針對損失函式求該引數的偏導數,然後將該引數沿著偏導數的方向往下走一小段,這樣大概率下一次我們的損失函式是會下降的。
以房價為例子,我們的優化演算法的數學表示式如下:
那麼我們根據如上定義的平方損失函式,針對 w1 計算偏導數如下:
注意我們在損失函式前面增加了一個 1/2
,主要為了求導更方便。
然後我們將偏導數代入上面的公式中,可以得到如下的公式:
如上公式中的 的定義為 學習率 ,即每次都將該引數的偏導數乘以一個係數再下降,如果學習率選得大,那麼迭代次數會更少,學習得會更快,但是有可能會導致跳過最優解,甚至無法收斂,而最終有可能訓練時間比更小的學習率的時間更長;如果學習率選得小,那麼會增加迴圈迭代次數,增加模型訓練的時間。而如何選擇這個學習率,並不是自動的,而是靠經驗主義,在訓練模型之初人工設定的,像這種靠人工一開始輸入而不是學習出來的引數,就叫 超引數 。
在如上的更新引數的過程中,我們每次都全量計算了所有的訓練資料集合,在實際應用場景中,為了提高模型訓練效率,我們一般不會這樣做,而是每次迭代只選擇一小部分資料進行計算,那麼怎麼選取這一小部分呢,我們一般會將訓練資料隨機打亂,按照視窗大小從前往後遍歷,每次都取一小部分資料進行計算,比如假設每次取100條資料,那麼第一次我們會取1~100條,第二次回取101~200條,以此類推,直到將訓練資料都遍歷完。當遍歷完了之後很有可能模型還是沒有訓練好的,那麼就再次重新打亂所有訓練資料,按照視窗從前往後取資料再進行第二輪訓練。所以綜上,我們的求最佳引數的過程都是 按輪 按次 重複進行訓練的。
在增加了按輪按次進行了之後,我們的迭代計算公式就需要調整了,如下:
如上公式中的 B ,就是對應輪次的選取的樣本集合,並且我們將學習率除以了視窗大小,來保證學習率的經驗數字不需要隨著視窗選擇大小而改變。
最後我們來說一下如何判斷模型訓練是否結束,一般有兩種方法:
- 超引數指定訓練的輪數和次數,到了次數就結束
- 每輪次根據計算出來的損失函式的變化率來判斷是否應該結束
向量表示
我們計算機在執行運算的時候,對矩陣預算是比較友好的,尤其是GPU機器,所以我們在實現模型訓練的程式碼的時候,儘量要用向量(矩陣)計算的方式,而不是用 for
迴圈逐個計算。
from mxnet import nd from time import time # 初始化幾個陣列 a = nd.ones(1000) b = nd.ones(1000) c = nd.zeros(1000) # 普通操作,通過迴圈賦值的方式操作 start1 = time() for i in range(1000): c[i] = a[i] + b[i] end1 = time() print("normal time: ", end1-start1) # 向量操作,利用API一次操作 start2 = time() d = a + b end2 = time() print("vector oper time: ", end2-start2)
該段程式碼的輸出為:
normal time:0.16321206092834473 vector oper time:0.0009930133819580078
可以看出,普通操作是通過向量操作方式的 200倍!
所以,我們儘可能的將所有數學操作都通過向量表示式來操作,來提高模型訓練效率。
線上性迴歸的計算式當中,主要是有兩個需要計算的數學公式,一個是計算損失函式,一個是計算損失函式的梯度。
首先用向量來定義訓練樣本,定義如下:
定義要學習的引數向量為:
那麼給定當前引數,計算出來的輸出陣列的向量計算表示式看起來就很簡單了,如下:
然後我們再來看如上線性迴歸定義的損失函式,原始定義如下:
調整為向量表示式之後變成如下了:
然後引數的遞迴梯度下降的公式也可以用向量公式來替換,先回憶一下原始的定義:
然後再來看一下新的向量計算方式:
這樣一來,可以看出,原本來起來還比較複雜的公式一下子看起來就很簡單清晰了。
下面我們就摩拳擦掌,開始來按照向量表示的方式自己從零實現一個線性迴歸的模型訓練吧。
普通實現
構造訓練資料
為了理解簡單,我們在實現過程中,演示一個只有兩個引數的例子,下面我們首先來構造一下訓練資料,具體請參考註釋:
from mxnet import autograd, nd import random true_w = nd.array([[2,-3.4]]) true_b = 4.2 num_inputs = 2 num_examples = 1000 # 生成隨機的特徵陣列 features = nd.random.normal(scale=1, shape=((num_examples, num_inputs))) # 根據特徵陣列生成label陣列 labels = nd.dot(features, true_w.T) + true_b # 再給label陣列加上一個隨機的噪音 labels += nd.random.normal(scale=0.01, shape=labels.shape)
定義遍歷函式和超引數
# 我們定義一個按輪按次輪詢資料的函式 def data_iter(batch_size, X, Y): # 定義一個下標陣列並打印出來,並將下標做隨機打亂 num_inputs = len(X) indices = list(range(num_inputs)) random.shuffle(indices) # 每次都從下標陣列中取一個 batch_size 視窗大小的資料出來 for i in range(0, num_inputs, batch_size): cur_indices = nd.array(indices[i:min(i+batch_size, num_inputs)]) yield X.take(cur_indices), Y.take(cur_indices) # 定義超引數輪和每輪需要遍歷的次 # 對訓練資料總共遍歷多少輪 loop_num = 3 # 每輪的小批量的視窗大小 batch_size = 10 # 梯度下降的學習率 learn_rate = 0.5 # 定義我們要學習的引數並做隨機初始化, shape (3, 1) W = nd.random.normal(scale=1, shape=(num_features+1, 1))
開始訓練
start_time = time() for epoch_num in range(loop_num): for batchX, batchY in data_iter(batch_size, X, Y): # 按照上面向量表示的梯度下降引數執行 W[:] = W - (learn_rate / batch_size) * nd.dot(batchX.T, nd.dot(batchX, W) - batchY) end_time = time() # 列印最初的初始值和訓練出來的值 print("train time: ", end_time-start_time) print(true_W, W)
到此為止,我們的線性迴歸實現就完成了,該指令碼的實現輸出為:
train time:0.09574174880981445 [[2.1] [3.7] [1.2]] <NDArray 3x1 @cpu(0)> [[2.1006684] [3.6992905] [1.1961068]] <NDArray 3x1 @cpu(0)>
可以看到針對一個1000條資料的訓練過程還是很快的,只需要不到 0.1
秒就訓練完了,同時訓練出來的結果跟一開始構造資料的引數是幾乎相同的。
下面,我們引入mxnet的自動求梯度的機制,利用這個機制,我們甚至可以不用瞭解損失函式求導的過程,只需要通過 mxnet
提供的API介面計算出損失函式,然後 mxnet
就能自動推匯出要計算的引數的梯度。
雖然用線上性迴歸上是有點大材小用了,但是對一些比較複雜的數學模型(比如深度學習),這個機制就會顯得非常有用了,大大提高了我們程式碼的編寫效率。
利用mxnet的自動求梯度
# 我們按照mxnet自動求梯度的介面先呼叫attach_grad()函式對要求梯度的矩陣/向量進行標記 W.attach_grad() start_time = time() for epoch_num in range(loop_num): for batchX, batchY in data_iter(batch_size, X, Y): # 在這個標記片段內所有計算操作都會被記錄下來並針對標記的引數求梯度 with autograd.record(): # 計算損失函式 batchY_hat = nd.dot(batchX, W) loss = (batchY_hat - batchY)**2/2 # 針對損失函式求梯度, 經過這個函式之後,後面就可以一共W.grad來獲取此次計算針對W的梯度了 loss.backward() # 然後將引數按照梯度進行下降 W[:] = W - (learn_rate / batch_size) * W.grad end_time = time() # 列印最初的初始值和訓練出來的值 print("train time: ", end_time-start_time) print(true_W, W)
這段程式碼的輸出如下:
train time:0.19647645950317383 [[2.1] [3.7] [1.2]] <NDArray 3x1 @cpu(0)> [[2.102761 ] [3.701326 ] [1.1980474]] <NDArray 3x1 @cpu(0)>
看起來程式碼要更復雜一些了,訓練效率也要低一些了,下降了差不多一倍,訓練引數的結果是差不多的,主要的好處是對將來的更復雜的模型打下了程式碼實現的基礎。
加上中間輸出
我們在如下的程式碼中,加上了一些中間輸出結果,來方便我們下一步來除錯不同的超引數對模型訓練過程的影響:
#求線性迴歸的損失函式 def get_loss(X, Y, W): YHat = nd.dot(X, W) return (YHat - Y)**2/2 # 我們按照mxnet自動求梯度的介面先呼叫attach_grad()函式對要求梯度的矩陣/向量進行標記 W.attach_grad() start_time = time() for epoch_num in range(loop_num): cur_batch_num = 0 for batchX, batchY in data_iter(batch_size, X, Y): # 在這個標記片段內所有計算操作都會被記錄下來並針對標記的引數求梯度 with autograd.record(): # 計算損失函式 loss = get_loss(batchX, batchY, W) # 針對損失函式求梯度, 經過這個函式之後,後面就可以一共W.grad來獲取此次計算針對W的梯度了 loss.backward() # 然後將引數按照梯度進行下降 W[:] = W - (learn_rate / batch_size) * W.grad cur_batch_num+=1 loss = get_loss(batchX, batchY, W) if(cur_batch_num%50 == 0): print("epoch_num: %d, batch_num: %d, loss: %f" % (epoch_num, cur_batch_num, loss.mean().asnumpy())) # 列印一些訓練過程中的資訊 cur_loss = get_loss(X, Y, W) print("## epoch_num: %d, loss: %f\n" % (epoch_num, cur_loss.mean().asnumpy())) end_time = time() # 列印最初的初始值和訓練出來的值 print("train time: ", end_time-start_time) print(true_W, W)
我們再來看看這個程式碼的輸出:
epoch_num: 0, batch_num: 50, loss: 0.000046 epoch_num: 0, batch_num: 100, loss: 0.000050 ## epoch_num: 0, loss: 0.000052 epoch_num: 1, batch_num: 50, loss: 0.000061 epoch_num: 1, batch_num: 100, loss: 0.000029 ## epoch_num: 1, loss: 0.000062 epoch_num: 2, batch_num: 50, loss: 0.000052 epoch_num: 2, batch_num: 100, loss: 0.000022 ## epoch_num: 2, loss: 0.000053 train time:0.2802696228027344 [[2.1] [3.7] [1.2]] <NDArray 3x1 @cpu(0)> [[2.1014354] [3.6990693] [1.1987416]] <NDArray 3x1 @cpu(0)>
不同超引數對訓練過程的影響
學習率
固定 輪次:3, 小批量視窗 10
我們來看不同的學習率對訓練過程的影響,如下表:
超引數 | 第一輪損失值 | 第二輪損失值 | 第三輪損失值 |
---|---|---|---|
0.01 | 1.89 | 0.26 | 0.03 |
0.02 | 0.156179 | 0.003019 | 0.000111 |
0.03 | 0.027368 | 0.000096 | 0.000049 |
0.05 | 0.000423 | 0.000045 | 0.000045 |
0.1 | 0.000051 | 0.000050 | 0.000051 |
0.5 | 0.000054 | 0.000065 | 0.000055 |
1 | 0.000073 | 0.000081 | 0.000107 |
1.5 | 0.000081 | 0.000101 | 0.000296 |
1.8 | 0.001779 | 0.001116 | 0.001254 |
1.9 | 22.656252 | 8493858816 | 4272635665084055552 |
2.0 | 35379048448 | nan | nan |
從如上表我們可以得出如下的結論:
- 在學習率數值較小的一段範圍內,學習率越大,模型收斂得就越快,理論上最終模型的模擬效果會越好
- 在學習率數值適中的一段範圍內,最終學習效果都差不多
- 當學習率數值較大的範圍內,效果反而更不可控了,甚至到最後損失函式的值都超過了整數範圍值
所以學習率千萬不能設定得太大,設定太大之後不收斂了;設定太小模型還是能收斂只是訓練時間要更長,最好是要選擇得適中,比如在我們的實現例子中, 0.05
就是一個不錯的case。
小批量視窗
固定學習率0.05,選擇不同的小批量視窗對應的結果如下:
批量視窗 | 第一輪損失值 | 第二輪損失值 | 第三輪損失值 |
---|---|---|---|
10 | 0.000288 | 0.000047 | 0.000047 |
20 | 0.076830 | 0.000432 | 0.000048 |
50 | 0.759479 | 0.098075 | 0.012729 |
可以看到,在合適範圍內,視窗值越小就收斂得越快。
通用實現
雖然我們在如上的程式碼中實現線性迴歸的邏輯已經很簡單了,但是我們還是自己寫了不少的一些函式,比如我們自己實現了資料遍歷的函式,自己實現了損失計算函式,自己實現了梯度下降等邏輯。
而實際上一些通用的深度學習基礎庫,比如 mxnet
已經為我們預先實現了很多常用的函式庫了,我們可以直接呼叫他們的服務來重新實現一個線性迴歸訓練指令碼。
並且在這個實現中,我們會按照典型的訓練模型的程式碼框架來實現線性迴歸的訓練,按照這個程式碼框架,即使後面我們實現更復雜的深度學習的多層神經網路模型的時候,程式碼看起來都是類似的。
from mxnet import autograd, nd import random # 構造資料 true_w = nd.array([[2,-3.4]]) true_b = 4.2 num_inputs = 2 num_examples = 1000 # 生成隨機的特徵陣列 features = nd.random.normal(scale=1, shape=((num_examples, num_inputs))) # 根據特徵陣列生成label陣列 labels = nd.dot(features, true_w.T) + true_b # 再給label陣列加上一個隨機的噪音 labels += nd.random.normal(scale=0.01, shape=labels.shape) # 引入gluon的管理訓練集的庫 from mxnet.gluon import data as gdata batch_size = 10 # 將樣本的特徵值和標籤值都傳入 dataset = gdata.ArrayDataset(features, labels) # 利用gdata提供的資料抽取小批量遍歷函式指標 data_iter = gdata.DataLoader(dataset, batch_size, shuffle=True) # 接下來將是最主要的區別,在手動實現中,我們自己實現了線性函式和損失函式 # 而gluon當中提供了一系列預設的函式,可以快速的模型訓練 # 第一步,搭建一個神經網路, 雖然我們現在的神經網路還很簡單,只有一層 from mxnet.gluon import nn net = nn.Sequential() net.add(nn.Dense(1)) # 第二步,將引數矩陣給初始化好 from mxnet import init net.initialize(init.Normal(sigma=0.01)) # 定義損失函式 from mxnet.gluon import loss as gloss loss = gloss.L2Loss() # 定義引數梯度下降的函式 from mxnet import gluon trainer = gluon.Trainer(net.collect_params(), 'sgd', {'learning_rate': 0.03}) # 開始訓練, 跟手動實現的就很相似了 num_epochs = 3 for epoch in range(1, num_epochs + 1): for X, y in data_iter: with autograd.record(): l = loss(net(X), y) l.backward() trainer.step(batch_size) l = loss(net(features), labels) print('epoch %d, loss: %f' % (epoch, l.mean().asnumpy())) # 對比訓練出來的值的效果 dense = net[0] print(true_w, dense.weight.data()) print(true_b, dense.bias.data())