Spark MLlib Deep Learning Neural Net(深度學習-神經網路)1.2
Spark MLlib Deep Learning Neural Net(深度學習-神經網路)1.2
第一章Neural Net(神經網路)
2基礎及原始碼解析
2.1 Neural Net神經網路基礎知識2.1.1 神經網路
基礎知識參照:
2.1.2 反向傳導演算法
基礎知識參照:
2.1.3 Denoise Autoencoder
當採用無監督的方法分層預訓練深度網路的權值時,為了學習到較魯棒的特徵,可以在網路的可視層(即資料的輸入層)引入隨機噪聲,這種方法稱為Denoise Autoencoder(簡稱dAE),具體加入的方法就是把訓練樣例中的一些資料調整變為0,inputZeroMaskedFraction表示了調整的比例
這部分請參見《Extracting and Composing Robust Features with Denoising Autoencoders》這篇論文。參照:
http://wenku.baidu.com/link?url=lhFEf7N3n2ZG2K-mfWsts2on9gN5K-KkrMuuNvHU2COdehkDv9vxVsw-F23e5Yiww_38kWYB56hskLXwVp0_9c7DLw7XZX_w8NoNXfxtoIm
2.1.4 Dropout
訓練神經網路模型時,如果訓練樣本較少,為了防止模型過擬合,Dropout可以作為一種trikc供選擇。Dropout是hintion最近2年提出的,源於其文章Improving neural networks by preventing co-adaptation of feature detectors
Dropout是指在模型訓練時隨機讓網路某些隱含層節點的權重不工作,不工作的那些節點可以暫時認為不是網路結構的一部分,但是它的權重得保留下來(只是暫時不更新而已),因為下次樣本輸入時它可能又得工作了。參照:
http://wenku.baidu.com/link?url=WpsRjVTrMIhCNqDSDnzm8M6nz2Q7AoNhpnY2XxM9SFYkGni8t94JOgsZUCbSuccOnO8mJyGx67RGLjPr8D9aoxhyOUkYtvfitU9ilaQ-Rqm
2.1.5 Sparsity Penalty
對神經網路每層節點的sparsity計算,對沒達到sparsitytarget的引數的懲罰係數的節點進行懲罰。
2.2 Deep Learning NN原始碼解析
2.2.1 NN程式碼結構
NN原始碼主要包括:NeuralNet,NeuralNetModel兩個類,原始碼結構如下:
NeuralNet結構:
NeuralNetModel結構:
2.2.2 NN訓練過程
2.2.3 NeuralNet解析
(1)NNLabel
/**
* label:目標矩陣
*nna:神經網路每層節點的輸出值,a(0),a(1),a(2)
* error:輸出層與目標值的誤差矩陣
*/
caseclass NNLabel(label: BDM[Double],nna: ArrayBuffer[BDM[Double]],error: BDM[Double])extends Serializable
NNLabel:自定義資料型別,儲存樣本資料,格式:目標值,輸出值,誤差。
(2) NNConfig
/**
*配置引數
*/
caseclassNNConfig(
size: Array[Int],
layer: Int,
activation_function: String,
learningRate: Double,
momentum: Double,
scaling_learningRate: Double,
weightPenaltyL2: Double,
nonSparsityPenalty: Double,
sparsityTarget: Double,
inputZeroMaskedFraction: Double,
dropoutFraction: Double,
testing: Double,
output_function: String)extends Serializable
NNConfig:定義引數配置,儲存配置資訊。引數說明:
size:神經網路結構
layer:神經網路層數
activation_function:隱含層函式
learningRate:學習率
momentum: Momentum因子
scaling_learningRate:學習迭代因子
weightPenaltyL2:正則化L2因子
nonSparsityPenalty:權重稀疏度懲罰因子
sparsityTarget:權重稀疏度目標值
inputZeroMaskedFraction:權重加入噪聲因子
dropoutFraction: Dropout因子
testing: testing
(3) InitialWeight
初始化權重
/**
* 初始化權重
* 初始化為一個很小的、接近零的隨機值
*/
def InitialWeight(size: Array[Int]): Array[BDM[Double]] = {
// 初始化權重引數
// weights and weight momentum
// nn.W{i - 1} = (rand(nn.size(i), nn.size(i - 1)+1) - 0.5) * 2 * 4 * sqrt(6 / (nn.size(i) + nn.size(i - 1)));
valn = size.length
valnn_W = ArrayBuffer[BDM[Double]]()
for (i <-1 ton - 1) {
vald1 = BDM.rand(size(i), size(i - 1) + 1)
d1 :-= 0.5
valf1 =2 *4 * sqrt(6.0 / (size(i) + size(i -1)))
vald2 =d1 :*f1
//val d3 = new DenseMatrix(d2.rows, d2.cols, d2.data, d2.isTranspose)
//val d4 = Matrices.dense(d2.rows, d2.cols, d2.data)
nn_W += d2
}
nn_W.toArray
}
(4) InitialWeightV
初始化權重vW
/**
* 初始化權重vW
* 初始化為0
*/
def InitialWeightV(size: Array[Int]): Array[BDM[Double]] = {
// 初始化權重引數
// weights and weight momentum
// nn.vW{i - 1} = zeros(size(nn.W{i - 1}));
valn = size.length
valnn_vW = ArrayBuffer[BDM[Double]]()
for (i <-1 ton - 1) {
vald1 = BDM.zeros[Double](size(i), size(i - 1) + 1)
nn_vW += d1
}
nn_vW.toArray
}
(5) InitialActiveP
初始神經網路啟用度
/**
* 初始每一層的平均啟用度
* 初始化為0
*/
def InitialActiveP(size: Array[Int]): Array[BDM[Double]] = {
// 初始每一層的平均啟用度
// average activations (for use with sparsity)
// nn.p{i} = zeros(1, nn.size(i));
valn = size.length
valnn_p = ArrayBuffer[BDM[Double]]()
nn_p += BDM.zeros[Double](1,1)
for (i <-1 ton - 1) {
vald1 = BDM.zeros[Double](1, size(i))
nn_p += d1
}
nn_p.toArray
}
(6) AddNoise
樣本資料增加隨機噪聲
/**
* 增加隨機噪聲
* 若隨機值>=Fraction,值不變,否則改為0
*/
def AddNoise(rdd: RDD[(BDM[Double], BDM[Double])], Fraction: Double): RDD[(BDM[Double], BDM[Double])] = {
valaddNoise = rdd.map { f =>
valfeatures = f._2
vala = BDM.rand[Double](features.rows,features.cols)
vala1 =a :>= Fraction
vald1 =a1.data.map { f =>if (f ==true)1.0else0.0 }
vala2 =new BDM(features.rows,features.cols,d1)
valfeatures2 =features :*a2
(f._1, features2)
}
addNoise
}
(7) DropoutWeight
神經網路權重隨機休眠。
/**
* 隨機讓網路某些隱含層節點的權重不工作
* 若隨機值>=Fraction,矩陣值不變,否則改為0
*/
def DropoutWeight(matrix: BDM[Double], Fraction: Double): Array[BDM[Double]] = {
valaa = BDM.rand[Double](matrix.rows, matrix.cols)
valaa1 =aa :> Fraction
vald1 =aa1.data.map { f =>if (f ==true)1.0else0.0 }
valaa2 =new BDM(matrix.rows: Int, matrix.cols: Int,d1: Array[Double])
valmatrix2 = matrix :*aa2
Array(aa2, matrix2)
}
(8) NNff
神經網路進行前向傳播,從輸入層->隱含層->輸出層,計算每一層每一個節點的輸出值,其中輸入值為樣本資料。輸入引數:
batch_xy2:樣本資料
bc_config:神經網路配置引數
bc_nn_W:神經網路當前權重引數
輸出引數:
RDD[(NNLabel, Array[BDM[Double]])],格式為(NNLabel(label, nn_a, error), dropOutMask)。
/**
* nnff是進行前向傳播
* 計算神經網路中的每個節點的輸出值;
*/
def NNff(
batch_xy2: RDD[(BDM[Double], BDM[Double])],
bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): RDD[(NNLabel, Array[BDM[Double]])] = {
// 第1層:a(1)=[1 x]
// 增加偏置項b
valtrain_data1 = batch_xy2.map { f =>
vallable = f._1
valfeatures = f._2
valnna = ArrayBuffer[BDM[Double]]()
valBm1 =new BDM(features.rows,1, Array.fill(features.rows *1)(1.0))
valfeatures2 = BDM.horzcat(Bm1,features)
valerror = BDM.zeros[Double](lable.rows,lable.cols)
nna += features2
NNLabel(lable, nna, error)
}
valtrain_data2 =train_data1.map { f =>
valnn_a = f.nna
valdropOutMask = ArrayBuffer[BDM[Double]]()
dropOutMask += new BDM[Double](1,1, Array(0.0))
for (j <-1 to bc_config.value.layer -2) {
// 計算每層輸出
// Calculate the unit's outputs (including the bias term)
// nn.a{i} = sigm(nn.a{i - 1} * nn.W{i - 1}')
// nn.a{i} = tanh_opt(nn.a{i - 1} * nn.W{i - 1}');
valA1 =nn_a(j -1)
valW1 = bc_nn_W.value(j -1)
valaw1 =A1 *W1.t
valnnai1 = bc_config.value.activation_functionmatch {
case"sigm" =>
valaw2 = NeuralNet.sigm(aw1)
aw2
case"tanh_opt" =>
valaw2 = NeuralNet.tanh_opt(aw1)
//val aw2 = Btanh(aw1 * (2.0 / 3.0)) * 1.7159
aw2
}
// dropout計算
// Dropout是指在模型訓練時隨機讓網路某些隱含層節點的權重不工作,不工作的那些節點可以暫時認為不是網路結構的一部分
// 但是它的權重得保留下來(只是暫時不更新而已),因為下次樣本輸入時它可能又得工作了
// 參照 http://www.cnblogs.com/tornadomeet/p/3258122.html
valdropoutai =if (bc_config.value.dropoutFraction >0) {
if (bc_config.value.testing ==1) {
valnnai2 =nnai1 * (1.0 - bc_config.value.dropoutFraction)
Array(new BDM[Double](1,1, Array(0.0)),nnai2)
} else {
NeuralNet.DropoutWeight(nnai1, bc_config.value.dropoutFraction)
}
} else {
valnnai2 =nnai1
Array(new BDM[Double](1,1, Array(0.0)),nnai2)
}
valnnai2 =dropoutai(1)
dropOutMask += dropoutai(0)
// Add the bias term
// 增加偏置項b
// nn.a{i} = [ones(m,1) nn.a{i}];
valBm1 = BDM.ones[Double](nnai2.rows,1)
valnnai3 = BDM.horzcat(Bm1,nnai2)
nn_a += nnai3
}
(NNLabel(f.label, nn_a, f.error),dropOutMask.toArray)
}
// 輸出層計算
valtrain_data3 =train_data2.map { f =>
valnn_a = f._1.nna
// nn.a{n} = sigm(nn.a{n - 1} * nn.W{n - 1}');
// nn.a{n} = nn.a{n - 1} * nn.W{n - 1}';
valAn1 =nn_a(bc_config.value.layer -2)
valWn1 = bc_nn_W.value(bc_config.value.layer -2)
valawn1 =An1 *Wn1.t
valnnan1 = bc_config.value.output_functionmatch {
case"sigm" =>
valawn2 = NeuralNet.sigm(awn1)
//val awn2 = 1.0 / (Bexp(awn1 * (-1.0)) + 1.0)
awn2
case"linear" =>
valawn2 =awn1
awn2
}
nn_a += nnan1
(NNLabel(f._1.label,nn_a, f._1.error), f._2)
}
// error and loss
// 輸出誤差計算
// nn.e = y - nn.a{n};
// val nn_e = batch_y - nnan
valtrain_data4 =train_data3.map { f =>
valbatch_y = f._1.label
valnnan = f._1.nna(bc_config.value.layer - 1)
valerror = (batch_y -nnan)
(NNLabel(f._1.label, f._1.nna,error), f._2)
}
train_data4
}
(9) ActiveP
通過神經網路進行前向傳播,計算每一層每一個節點的輸出值,計算每個節點的平均值,也即節點的稀疏度。輸入引數:
train_nnff:NNff的輸出資料
bc_config:神經網路配置引數
nn_p_old:更新前資料
輸出引數:
Array[BDM[Double]],輸出節點的平均值。
/**
* sparsity計算,網路稀疏度
* 計算每個節點的平均值
*/
def ActiveP(
train_nnff: RDD[(NNLabel, Array[BDM[Double]])],
bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],
nn_p_old: Array[BDM[Double]]): Array[BDM[Double]] = {
valnn_p = ArrayBuffer[BDM[Double]]()
nn_p += BDM.zeros[Double](1,1)
// calculate running exponential activations for use with sparsity
// sparsity計算,計算sparsity,nonSparsityPenalty是對沒達到sparsitytarget的引數的懲罰係數
for (i <-1 to bc_config.value.layer -1) {
valpi1 = train_nnff.map(f => f._1.nna(i))
valinitpi = BDM.zeros[Double](1, bc_config.value.size(i))
val (piSum,miniBatchSize) =pi1.treeAggregate((initpi,0L))(
seqOp = (c, v) => {
// c: (nnasum, count), v: (nna)
valnna1 = c._1
valnna2 = v
valnnasum =nna1 +nna2
(nnasum, c._2 +1)
},
combOp = (c1, c2) => {
// c: (nnasum, count)
valnna1 = c1._1
valnna2 = c2._1
valnnasum =nna1 +nna2
(nnasum, c1._2 + c2._2)
})
valpiAvg =piSum /miniBatchSize.toDouble
valoldpi = nn_p_old(i)
valnewpi = (piAvg *