1. 程式人生 > >Machine Learning On Spark——第一節:基礎資料結構(一)

Machine Learning On Spark——第一節:基礎資料結構(一)

作者:周志湖
微訊號:zhouzhihubyond

本節主要內容

  1. 本地向量和矩陣
  2. 帶類標籤的特徵向量(Labeled point)
  3. 分散式矩陣

1. 本地向量和矩陣

本地向量(Local Vector)儲存在單臺機器上,索引採用0開始的整型表示,值採用Double型別的值表示。Spark MLlib中支援兩種型別的矩陣,分別是密度向量(Dense Vector)和稀疏向量(Spasre Vector),密度向量會儲存所有的值包括零值,而稀疏向量儲存的是索引位置及值,不儲存零值,在資料量比較大時,稀疏向量才能體現它的優勢和價值。下面給出其應用示例:

import org.apache
.spark.mllib.linalg.{Vector, Vectors} //密度矩陣,零值也儲存 scala> val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) dv: org.apache.spark.mllib.linalg.Vector = [1.0,0.0,3.0] // 建立稀疏矩陣,指定元素的個數、索引及非零值,陣列方式 scala> val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) sv1: org.apache.spark.mllib.linalg.Vector = (3
,[0,2],[1.0,3.0]) // 建立稀疏矩陣,指定元素的個數、索引及非零值,採用序列方式 scala> val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0))) sv2: org.apache.spark.mllib.linalg.Vector = (3,[0,2],[1.0,3.0])

本地矩陣(Local Matrix)指的也是儲存於單臺機器上的資料結構,本地矩陣採用整體的行列序號存取元素,本地矩陣也有密度矩陣(Dense Matrix)、稀疏矩陣(Sparse Matrix)兩種儲存方法,其使用程式碼如下:

//密度矩陣的儲存
scala> import org.apache
.spark.mllib.linalg.{Matrix, Matrices} import org.apache.spark.mllib.linalg.{Matrix, Matrices} //建立一個密度矩陣 scala> val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) dm: org.apache.spark.mllib.linalg.Matrix = 1.0 2.0 3.0 4.0 5.0 6.0

在Spark MLLib中,稀疏矩陣採用的是Compressed Sparse Column (CSC) 格式進行矩陣的儲存,具體參見(http://www.tuicool.com/articles/A3emmqi)對稀疏矩陣儲存的介紹,例如


//下列矩陣
    1.0 0.0 4.0

    0.0 3.0 5.0

    2.0 0.0 6.0
如果採用稀疏矩陣儲存的話,其儲存資訊包括:
實際儲存值: [1.0, 2.0, 3.0, 4.0, 5.0, 6.0]`,
矩陣元素對應的行索引:rowIndices=[0, 2, 1, 0, 1, 2]`
列起始位置索引: `colPointers=[0, 2, 3, 6]`.


scala> val sparseMatrix= Matrices.sparse(3, 3, Array(0, 2, 3, 6), Array(0, 2, 1, 0, 1, 2), Array(1.0, 2.0, 3.0, 4.0, 5.0, 6.0))
sparseMatrix: org.apache.spark.mllib.linalg.Matrix = 
3 x 3 CSCMatrix
(0,0) 1.0
(2,0) 2.0
(1,1) 3.0
(0,2) 4.0
(1,2) 5.0
(2,2) 6.0

2. 帶類標籤的特徵向量(Labeled point)

Labeled point是Spark MLlib中最重要的資料結構之一,它在無監督學習演算法中使用十分廣泛,它也是一種本地向量,只不過它提供了類的標籤,對於二元分類,它的標籤資料為0和1,而對於多類分類,它的標籤資料為0,1,2,…。它同本地向量一樣,同時具有Sparse和Dense兩種實現方式,例如:

scala> import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.regression.LabeledPoint

// LabeledPoint第一個引數是類標籤資料,第二引數是對應的特徵資料
//下面給出的是其密度向量實現方式
scala> val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
pos: org.apache.spark.mllib.regression.LabeledPoint = (1.0,[1.0,0.0,3.0])

 // LabeledPoint的稀疏向量實現方式
scala> val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
neg: org.apache.spark.mllib.regression.LabeledPoint = (0.0,(3,[0,2],[1.0,3.0]))

LabeledPoint的稀疏向量實現方式在實際中應用最為廣泛,這是因為某一特徵的維度可能達到上千,而這其中又存在大量對後期訓練無益的零值特徵資訊,如果對所有的零值特徵都進行儲存的話,會浪費大量的儲存空間,因此實際中常常使用稀疏的實現方式,使用的是LIBSVM格式:label index1:value1 index2:value2 …進行特徵標籤及特徵的儲存與讀取。

scala> val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "/data/sample_data.txt")

examples: org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] = MapPartitionsRDD[6] at map at MLUtils.scala:98

3. 分散式矩陣RowMatrix與CoordinateMatrix

下列程式碼演示了RowMatrix與CoordinateMatrix及其相關核心類的使用方法

package cn.ml.datastruct

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix


object RowMatrixDedmo extends App {
  val sparkConf = new SparkConf().setAppName("RowMatrixDemo").setMaster("spark://sparkmaster:7077")
  val sc = new SparkContext(sparkConf)
  // 建立RDD[Vector]
  val rdd1= sc.parallelize(
      Array(
          Array(1.0,2.0,3.0,4.0),
          Array(2.0,3.0,4.0,5.0),
          Array(3.0,4.0,5.0,6.0)
          )
      ).map(f => Vectors.dense(f))
   //建立RowMatrix
   val rowMatirx = new RowMatrix(rdd1)
   //計算列之間的相似度,返回的是CoordinateMatrix,採用
   //case class MatrixEntry(i: Long, j: Long, value: Double)儲存值
   var coordinateMatrix:CoordinateMatrix= rowMatirx.columnSimilarities()
   //返回矩陣行數、列數
   println(coordinateMatrix.numCols())
   println(coordinateMatrix.numRows())
   //檢視返回值,檢視列與列之間的相似度
   //Array[org.apache.spark.mllib.linalg.distributed.MatrixEntry] 
   //= Array(MatrixEntry(2,3,0.9992204753914715), 
   //MatrixEntry(0,1,0.9925833339709303), 
   //MatrixEntry(1,2,0.9979288897338914), 
   //MatrixEntry(0,3,0.9746318461970762), 
   //MatrixEntry(1,3,0.9946115458726394), 
   //MatrixEntry(0,2,0.9827076298239907))
   println(coordinateMatrix.entries.collect())

   //轉成後塊矩陣,下一節中詳細講解
   coordinateMatrix.toBlockMatrix()
   //轉換成索引行矩陣,下一節中詳細講解
   coordinateMatrix.toIndexedRowMatrix()
   //轉換成RowMatrix
   coordinateMatrix.toRowMatrix()

   //計算列統計資訊
    var mss:MultivariateStatisticalSummary=rowMatirx.computeColumnSummaryStatistics()
   //每列的均值, org.apache.spark.mllib.linalg.Vector = [2.0,3.0,4.0,5.0]
   mss.mean
   // 每列的最大值org.apache.spark.mllib.linalg.Vector = [3.0,4.0,5.0,6.0]
   mss.max
   // 每列的最小值 org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0,4.0]
   mss.min
   //每列非零元素的個數org.apache.spark.mllib.linalg.Vector = [3.0,3.0,3.0,3.0]
   mss.numNonzeros
   //矩陣列的1-範數,||x||1 = sum(abs(xi));
   //org.apache.spark.mllib.linalg.Vector = [6.0,9.0,12.0,15.0]
   mss.normL1
   //矩陣列的2-範數,||x||2 = sqrt(sum(xi.^2));
   // org.apache.spark.mllib.linalg.Vector = [3.7416573867739413,5.385164807134504,7.0710678118654755,8.774964387392123]
   mss.normL2
   //矩陣列的方差
   //org.apache.spark.mllib.linalg.Vector = [1.0,1.0,1.0,1.0]
   mss.variance
   //計算協方差
   //covariance: org.apache.spark.mllib.linalg.Matrix = 
   //1.0  1.0  1.0  1.0  
   //1.0  1.0  1.0  1.0  
   //1.0  1.0  1.0  1.0  
   //1.0  1.0  1.0  1.0  
   var covariance:Matrix=rowMatirx.computeCovariance()
    //計算拉姆矩陣rowMatirx^T*rowMatirx,T表示轉置操作
   //gramianMatrix: org.apache.spark.mllib.linalg.Matrix = 
    //14.0  20.0  26.0  32.0  
    //20.0  29.0  38.0  47.0  
    //26.0  38.0  50.0  62.0  
    //32.0  47.0  62.0  77.0  
   var gramianMatrix:Matrix=rowMatirx.computeGramianMatrix()
   //對矩陣進行主成分分析,引數指定返回的列數,即主分成個數
   //PCA演算法是一種經典的降維演算法
   //principalComponents: org.apache.spark.mllib.linalg.Matrix = 
  //-0.5000000000000002  0.8660254037844388    
  //-0.5000000000000002  -0.28867513459481275  
  //-0.5000000000000002  -0.28867513459481287  
  //-0.5000000000000002  -0.28867513459481287  
   var principalComponents=rowMatirx.computePrincipalComponents(2)

/**
   * 對矩陣進行奇異值分解,設矩陣為A(m x n). 奇異值分解將計算三個矩陣,分別是U,S,V
   * 它們滿足 A ~= U * S * V', S包含了設定的k個奇異值,U,V為相應的奇異值向量
   */
  //   svd: org.apache.spark.mllib.linalg.SingularValueDecomposition[org.apache.spark.mllib.linalg.distributed.RowMatrix,org.apache.spark.mllib.linalg.Matrix] = 
  //SingularValueDecomposition(org.apache.spark.mllib.linalg.distributed.RowMatrix@688884e,[13.011193721236575,0.8419251442105343,7.793650306633694E-8],-0.2830233037672786  -0.7873358937103356  -0.5230588083704528  
  //-0.4132328277901395  -0.3594977469144485  0.5762839813994667   
  //-0.5434423518130005  0.06834039988143598  0.4166084623124157   
  //-0.6736518758358616  0.4961785466773299   -0.4698336353414313  )
   var svd:SingularValueDecomposition[RowMatrix, Matrix]=rowMatirx.computeSVD(3,true)


   //矩陣相乘積操作
   var multiplyMatrix:RowMatrix=rowMatirx.multiply(Matrices.dense(4, 1, Array(1,2,3,4)))
}