1. 程式人生 > >Spark RDD 到 LabelPoint的轉換(包含構造臨時資料的方法)

Spark RDD 到 LabelPoint的轉換(包含構造臨時資料的方法)

題目: 將資料的某個特徵作為label, 其他特徵(或其他某幾個特徵)作為Feature, 轉為LabelPoint

  1. 首先構造資料
import scala.util.Random.{setSeed, nextDouble}
setSeed(1)

case class Record(foo: Double, target: Double, x1: Double, x2: Double, x3: Double)

val rows = sc.parallelize(
    (1 to 10).map(_ => Record(
        nextDouble, nextDouble, nextDouble, nextDouble, nextDouble
   ))
)
val
df = sqlContext.createDataFrame(rows) df.registerTempTable("df") sqlContext.sql(""" SELECT ROUND(foo, 2) foo, ROUND(target, 2) target, ROUND(x1, 2) x1, ROUND(x2, 2) x2, ROUND(x2, 2) x3 FROM df""").show

得到的資料如下:

+----+------+----+----+----+
| foo|target|  x1|  x2|  x3|
+----+------+----+----+----+
|0.73| 0.41|0.21|0.33|0.33| |0.01| 0.96|0.94|0.95|0.95| | 0.4| 0.35|0.29|0.51|0.51| |0.77| 0.66|0.16|0.38|0.38| |0.69| 0.81|0.01|0.52|0.52| |0.14| 0.48|0.54|0.58|0.58| |0.62| 0.18|0.01|0.16|0.16| |0.54| 0.97|0.25|0.39|0.39| |0.43| 0.23|0.89|0.04|0.04| |0.66| 0.12|0.65|0.98|0.98| +----+------+----+----+----+

假設我們想排除x2和foo, 抽取 LabeledPoint(target, Array(x1, x3)):

import org.apache.spark.mllib.linalg.{Vector, Vectors}  
import org.apache.spark.mllib.regression.LabeledPoint 

// Map feature names to indices
val featInd = List("x1", "x3").map(df.columns.indexOf(_))

// Or if you want to exclude columns
val ignored = List("foo", "target", "x2")
val featInd = df.columns.diff(ignored).map(df.columns.indexOf(_))

// Get index of target
val targetInd = df.columns.indexOf("target") 

df.rdd.map(r => LabeledPoint(
   r.getDouble(targetInd), // Get target value
   // Map feature indices to values
   Vectors.dense(featInd.map(r.getDouble(_)).toArray) 
))