1. 程式人生 > >Spark MLlib Deep Learning Neural Net(深度學習-神經網路)1.2

Spark MLlib Deep Learning Neural Net(深度學習-神經網路)1.2

Spark MLlib Deep Learning Neural Net(深度學習-神經網路)1.2

第一章Neural Net(神經網路)

2基礎及原始碼解析

2.1 Neural Net神經網路基礎知識

2.1.1 神經網路

基礎知識參照:

2.1.2 反向傳導演算法

基礎知識參照:

2.1.3 Denoise Autoencoder

當採用無監督的方法分層預訓練深度網路的權值時,為了學習到較魯棒的特徵,可以在網路的可視層(即資料的輸入層)引入隨機噪聲,這種方法稱為Denoise Autoencoder(簡稱dAE),具體加入的方法就是把訓練樣例中的一些資料調整變為0,inputZeroMaskedFraction表示了調整的比例

這部分請參見《Extracting and Composing Robust Features with Denoising Autoencoders》這篇論文。參照:

http://wenku.baidu.com/link?url=lhFEf7N3n2ZG2K-mfWsts2on9gN5K-KkrMuuNvHU2COdehkDv9vxVsw-F23e5Yiww_38kWYB56hskLXwVp0_9c7DLw7XZX_w8NoNXfxtoIm

2.1.4 Dropout

訓練神經網路模型時,如果訓練樣本較少,為了防止模型過擬合,Dropout可以作為一種trikc供選擇。Dropout是hintion最近2年提出的,源於其文章Improving neural networks by preventing co-adaptation of feature detectors

.中文大意為:通過阻止特徵檢測器的共同作用來提高神經網路的效能。

 Dropout是指在模型訓練時隨機讓網路某些隱含層節點的權重不工作,不工作的那些節點可以暫時認為不是網路結構的一部分,但是它的權重得保留下來(只是暫時不更新而已),因為下次樣本輸入時它可能又得工作了。參照:

http://wenku.baidu.com/link?url=WpsRjVTrMIhCNqDSDnzm8M6nz2Q7AoNhpnY2XxM9SFYkGni8t94JOgsZUCbSuccOnO8mJyGx67RGLjPr8D9aoxhyOUkYtvfitU9ilaQ-Rqm

2.1.5 Sparsity Penalty

對神經網路每層節點的sparsity計算,對沒達到sparsitytarget的引數的懲罰係數的節點進行懲罰。

2.2 Deep Learning NN原始碼解析

2.2.1 NN程式碼結構

NN原始碼主要包括:NeuralNet,NeuralNetModel兩個類原始碼結構如下:

NeuralNet結構:

NeuralNetModel結構:

2.2.2 NN訓練過程

2.2.3 NeuralNet解析

(1)NNLabel

/**

* label:目標矩陣

*nna:神經網路每層節點的輸出值,a(0),a(1),a(2)

* error:輸出層與目標值的誤差矩陣

*/

caseclass NNLabel(label: BDM[Double],nna: ArrayBuffer[BDM[Double]],error: BDM[Double])extends Serializable

NNLabel:自定義資料型別,儲存樣本資料,格式:目標值,輸出值,誤差。

(2) NNConfig

/**

*配置引數

*/

caseclassNNConfig(

size: Array[Int],

layer: Int,

activation_function: String,

learningRate: Double,

momentum: Double,

scaling_learningRate: Double,

weightPenaltyL2: Double,

nonSparsityPenalty: Double,

sparsityTarget: Double,

inputZeroMaskedFraction: Double,

dropoutFraction: Double,

testing: Double,

output_function: String)extends Serializable

NNConfig:定義引數配置,儲存配置資訊。引數說明:

size:神經網路結構

layer:神經網路層數

activation_function:隱含層函式

learningRate:學習率

momentum: Momentum因子

scaling_learningRate:學習迭代因子

weightPenaltyL2:正則化L2因子

nonSparsityPenalty:權重稀疏度懲罰因子

sparsityTarget:權重稀疏度目標值

inputZeroMaskedFraction:權重加入噪聲因子

dropoutFraction: Dropout因子

testing: testing

(3) InitialWeight

初始化權重

/**

* 初始化權重

* 初始化為一個很小的、接近零的隨機值

*/

def InitialWeight(size: Array[Int]): Array[BDM[Double]] = {

// 初始化權重引數

// weights and weight momentum

// nn.W{i - 1} = (rand(nn.size(i), nn.size(i - 1)+1) - 0.5) * 2 * 4 * sqrt(6 / (nn.size(i) + nn.size(i - 1)));

valn = size.length

valnn_W = ArrayBuffer[BDM[Double]]()

for (i <-1 ton - 1) {

vald1 = BDM.rand(size(i), size(i - 1) + 1)

d1 :-= 0.5

valf1 =2 *4 * sqrt(6.0 / (size(i) + size(i -1)))

vald2 =d1 :*f1

//val d3 = new DenseMatrix(d2.rows, d2.cols, d2.data, d2.isTranspose)

//val d4 = Matrices.dense(d2.rows, d2.cols, d2.data)

nn_W += d2

}

nn_W.toArray

}

(4) InitialWeightV

初始化權重vW

/**

* 初始化權重vW

* 初始化為0

*/

def InitialWeightV(size: Array[Int]): Array[BDM[Double]] = {

// 初始化權重引數

// weights and weight momentum

// nn.vW{i - 1} = zeros(size(nn.W{i - 1}));

valn = size.length

valnn_vW = ArrayBuffer[BDM[Double]]()

for (i <-1 ton - 1) {

vald1 = BDM.zeros[Double](size(i), size(i - 1) + 1)

nn_vW += d1

}

nn_vW.toArray

}

(5) InitialActiveP

初始神經網路啟用度

/**

* 初始每一層的平均啟用度

* 初始化為0

*/

def InitialActiveP(size: Array[Int]): Array[BDM[Double]] = {

// 初始每一層的平均啟用度

// average activations (for use with sparsity)

// nn.p{i}     = zeros(1, nn.size(i)); 

valn = size.length

valnn_p = ArrayBuffer[BDM[Double]]()

nn_p += BDM.zeros[Double](1,1)

for (i <-1 ton - 1) {

vald1 = BDM.zeros[Double](1, size(i))

nn_p += d1

}

nn_p.toArray

}

(6) AddNoise

樣本資料增加隨機噪聲

/**

* 增加隨機噪聲

* 若隨機值>=Fraction,值不變,否則改為0

*/

def AddNoise(rdd: RDD[(BDM[Double], BDM[Double])], Fraction: Double): RDD[(BDM[Double], BDM[Double])] = {

valaddNoise = rdd.map { f =>

valfeatures = f._2

vala = BDM.rand[Double](features.rows,features.cols)

vala1 =a :>= Fraction

vald1 =a1.data.map { f =>if (f ==true)1.0else0.0 }

vala2 =new BDM(features.rows,features.cols,d1)

valfeatures2 =features :*a2

(f._1, features2)

}

addNoise

}

(7) DropoutWeight

神經網路權重隨機休眠。

/**

* 隨機讓網路某些隱含層節點的權重不工作

* 若隨機值>=Fraction,矩陣值不變,否則改為0

*/

def DropoutWeight(matrix: BDM[Double], Fraction: Double): Array[BDM[Double]] = {

valaa = BDM.rand[Double](matrix.rows, matrix.cols)

valaa1 =aa :> Fraction

vald1 =aa1.data.map { f =>if (f ==true)1.0else0.0 }

valaa2 =new BDM(matrix.rows: Int, matrix.cols: Int,d1: Array[Double])

valmatrix2 = matrix :*aa2

Array(aa2, matrix2)

}

(8) NNff

神經網路進行前向傳播,從輸入層->隱含層->輸出層,計算每一層每一個節點的輸出值,其中輸入值為樣本資料。輸入引數:

batch_xy2:樣本資料

bc_config:神經網路配置引數

bc_nn_W:神經網路當前權重引數

輸出引數:

RDD[(NNLabel, Array[BDM[Double]])],格式為(NNLabel(label, nn_a, error), dropOutMask)

/**

* nnff是進行前向傳播

* 計算神經網路中的每個節點的輸出值;

*/

def NNff(

batch_xy2: RDD[(BDM[Double], BDM[Double])],

bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],

bc_nn_W: org.apache.spark.broadcast.Broadcast[Array[BDM[Double]]]): RDD[(NNLabel, Array[BDM[Double]])] = {

// 1:a(1)=[1 x]

// 增加偏置項b

valtrain_data1 = batch_xy2.map { f =>

vallable = f._1

valfeatures = f._2

valnna = ArrayBuffer[BDM[Double]]()

valBm1 =new BDM(features.rows,1, Array.fill(features.rows *1)(1.0))

valfeatures2 = BDM.horzcat(Bm1,features)

valerror = BDM.zeros[Double](lable.rows,lable.cols)

nna += features2

NNLabel(lable, nna, error)

}

valtrain_data2 =train_data1.map { f =>

valnn_a = f.nna

valdropOutMask = ArrayBuffer[BDM[Double]]()

dropOutMask += new BDM[Double](1,1, Array(0.0))

for (j <-1 to bc_config.value.layer -2) {

// 計算每層輸出

// Calculate the unit's outputs (including the bias term)

// nn.a{i} = sigm(nn.a{i - 1} * nn.W{i - 1}')

// nn.a{i} = tanh_opt(nn.a{i - 1} * nn.W{i - 1}');           

valA1 =nn_a(j -1)

valW1 = bc_nn_W.value(j -1)

valaw1 =A1 *W1.t

valnnai1 = bc_config.value.activation_functionmatch {

case"sigm" =>

valaw2 = NeuralNet.sigm(aw1)

aw2

case"tanh_opt" =>

valaw2 = NeuralNet.tanh_opt(aw1)

//val aw2 = Btanh(aw1 * (2.0 / 3.0)) * 1.7159

aw2

}

// dropout計算

// Dropout是指在模型訓練時隨機讓網路某些隱含層節點的權重不工作,不工作的那些節點可以暫時認為不是網路結構的一部分

// 但是它的權重得保留下來(只是暫時不更新而已),因為下次樣本輸入時它可能又得工作了

// 參照 http://www.cnblogs.com/tornadomeet/p/3258122.html  

valdropoutai =if (bc_config.value.dropoutFraction >0) {

if (bc_config.value.testing ==1) {

valnnai2 =nnai1 * (1.0 - bc_config.value.dropoutFraction)

Array(new BDM[Double](1,1, Array(0.0)),nnai2)

} else {

NeuralNet.DropoutWeight(nnai1, bc_config.value.dropoutFraction)

}

} else {

valnnai2 =nnai1

Array(new BDM[Double](1,1, Array(0.0)),nnai2)

}

valnnai2 =dropoutai(1)

dropOutMask += dropoutai(0)

// Add the bias term

// 增加偏置項b

// nn.a{i} = [ones(m,1) nn.a{i}];

valBm1 = BDM.ones[Double](nnai2.rows,1)

valnnai3 = BDM.horzcat(Bm1,nnai2)

nn_a += nnai3

}

(NNLabel(f.label, nn_a, f.error),dropOutMask.toArray)

}

// 輸出層計算

valtrain_data3 =train_data2.map { f =>

valnn_a = f._1.nna

// nn.a{n} = sigm(nn.a{n - 1} * nn.W{n - 1}');

// nn.a{n} = nn.a{n - 1} * nn.W{n - 1}';         

valAn1 =nn_a(bc_config.value.layer -2)

valWn1 = bc_nn_W.value(bc_config.value.layer -2)

valawn1 =An1 *Wn1.t

valnnan1 = bc_config.value.output_functionmatch {

case"sigm" =>

valawn2 = NeuralNet.sigm(awn1)

//val awn2 = 1.0 / (Bexp(awn1 * (-1.0)) + 1.0)

awn2

case"linear" =>

valawn2 =awn1

awn2

}

nn_a += nnan1

(NNLabel(f._1.label,nn_a, f._1.error), f._2)

}

// error and loss

// 輸出誤差計算

// nn.e = y - nn.a{n};

// val nn_e = batch_y - nnan

valtrain_data4 =train_data3.map { f =>

valbatch_y = f._1.label

valnnan = f._1.nna(bc_config.value.layer - 1)

valerror = (batch_y -nnan)

(NNLabel(f._1.label, f._1.nna,error), f._2)

}

train_data4

}

(9) ActiveP

通過神經網路進行前向傳播,計算每一層每一個節點的輸出值,計算每個節點的平均值,也即節點的稀疏度。輸入引數:

train_nnff:NNff的輸出資料

bc_config:神經網路配置引數

nn_p_old:更新前資料

輸出引數:

Array[BDM[Double]],輸出節點的平均值。

/**

* sparsity計算,網路稀疏度

* 計算每個節點的平均值

*/

def ActiveP(

train_nnff: RDD[(NNLabel, Array[BDM[Double]])],

bc_config: org.apache.spark.broadcast.Broadcast[NNConfig],

nn_p_old: Array[BDM[Double]]): Array[BDM[Double]] = {

valnn_p = ArrayBuffer[BDM[Double]]()

nn_p += BDM.zeros[Double](1,1)

// calculate running exponential activations for use with sparsity

// sparsity計算,計算sparsitynonSparsityPenalty是對沒達到sparsitytarget的引數的懲罰係數

for (i <-1 to bc_config.value.layer -1) {

valpi1 = train_nnff.map(f => f._1.nna(i))

valinitpi = BDM.zeros[Double](1, bc_config.value.size(i))

val (piSum,miniBatchSize) =pi1.treeAggregate((initpi,0L))(

seqOp = (c, v) => {

// c: (nnasum, count), v: (nna)

valnna1 = c._1

valnna2 = v

valnnasum =nna1 +nna2

(nnasum, c._2 +1)

},

combOp = (c1, c2) => {

// c: (nnasum, count)

valnna1 = c1._1

valnna2 = c2._1

valnnasum =nna1 +nna2

(nnasum, c1._2 + c2._2)

})

valpiAvg =piSum /miniBatchSize.toDouble

valoldpi = nn_p_old(i)

valnewpi = (piAvg *